Skip to content
redlion99 edited this page Mar 29, 2020 · 4 revisions

通过Maven方式引入

        <dependency>
            <groupId>com.github.pidata</groupId>
            <artifactId>pipin-web</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>com.github.pidata</groupId>
            <artifactId>pipin-scheduler</artifactId>
            <version>${project.version}</version>
        </dependency>

启动PiPin服务

        //启动管理服务器,用于管理集成项目,任务等。端口为8989
        new ManagementServer().start();

        //启动PiPin服务器,用于接收外部数据。端口为8080
        new PipinServer().start();

        //启动Job调度器,调度器运行与单独线程。
        Scheduler scheduler = new Scheduler();
        scheduler.scheduleAll();

详见 实例项目

环境依赖:

运行pipin-core 需要mongodb

运行pipin-scheduler 需要zookeeper

可以根据需要自行选择

以推的方式收取数据

启动PipinServer后,可以通过对应项目的endpoint接口推送上传数据

URL: http://localhost:8080/projects/[projectId]/endpoints 支持CSV(使用参数csv)和Json(使用参数json)方式上传文件

关于Converter

Converter是用来做格式转换的类,可以实现io.pipin.core.ext.Converter后,设置在项目里。

public class AsIsConverter implements Converter {
    private String entity;
    private String[] fields;

    public AsIsConverter(String entity, String[] fields) {
        this.entity = entity;
        this.fields = fields;
    }


    public Map<String, Map<String, Object>> convert(Map<String, Object> doc) {
        Map<String, Map<String, Object>> result = new HashMap<String, Map<String, Object>>();
        result.put(entity, doc);
        return result;
    }
}

默认的Converter为FlatAndFilterConverter, FlatAndFilterConverter会把输入的JSON格式的树状数据转换成适合关系型数据库存储的格式,并过滤掉不需要的字段。

关于去重和Identifier

PiPin通过为指定的字段计算HASH的方式来去重。在项目的MergeSettings设置中需要为entity指定HASH计算的字段名数组。

{
   "mergeSettings":{"keyMap":{
        "[entityName]":["_id"]
       }
   }
}

关于EntitySink

EntitySink用于将最终结果写入目标数据库,默认的实现是MongoEntitySink

MongoEntitySink会以entity名做为collection的名字,写入Mongo DB中。

以拉的方式收取数据

如果在项目中设置pollSettings,会定期拉取外部数据。

{
  "pollSettings": {
    "startUri": "https://www.teambitionapis.com/tbs/core/v2/tasks:query",
    "pageParameter": "page",
    "pageStartFrom": 1,
    "method": "POST",
    "importer": "",
    "traversalClass": "io.pipin.example.CustomPageableTraversal",
    "extraParams": {
      "tenantId": "111",
      "projectId": "222"
    }
  }
}

关于Traversal

Traversal提供了数据拉取的实现。

默认的Traversal为 SimpleTraversal。

SimpleTraversal可以从Spring风格的Restful接口中拉取数据。

自定义的Travesal可能需要实现以下的方法。

public class CustomPageableTraversal extends SimpleTraversal {

    public CustomPageableTraversal(String uri, String pageParameter, int pageStartFrom, PollSettings pollSettings, ActorSystem actorSystem, Logger log) {
        super(uri, pageParameter, pageStartFrom, pollSettings, actorSystem, log);
    }

    /***
     * 组装POST请求中的BODY
     * @return
     */
    @Override
    public String getBody() 

    /***
     * 额外的参数,比如分页参数
     * @return
     */
    @Override
    public Map<String, String> extraParamsMap() 

    /***
     * 指定HTTP headers
     * @return
     */
    @Override
    public String[][] headers() 

    /***
     * 生成请求中的所需要的token
     * @return
     */
    @Override
    public TokenAuthorizator getTokenAuthorizator()

    /***
     * 结果【列表】所在的字段
     * @return
     */
    @Override
    public String getContentField() 

    /***
     * 判断是否为最后一页的依据
     * @return
     */
    @Override
    public boolean endPage(Document doc)
    
    /***
     * 进入下一页前的回调
     * @return
     */
    @Override
    public void onPageNext(Document doc) 
}

关于JobTrigger

jobTrigger用于指定Scheduler的触发机制,目前支持CRON形式触发。

{
  "jobTrigger": {
    "key": "abc",
    "cron": "0/15 * * * * ?",
    "priority": 0,
    "misfireInstruction": 0
  }
}