Skip to content

datax httpheader插件介绍

zhzava edited this page Apr 24, 2020 · 9 revisions

httpreader 插件文档


1 快速介绍

datax插件,httpreader插件实现了从Http(支持Webservice)读取数据。在底层实现上,httpreader通过Apache的Http客户端(ws通过cxf)去连接远程的接口,并根据配置的参数获取结果。

2 实现原理

简而言之,httpreader通过http客户端去访问远程的接口,并根据用户配置的信息解析请求返回的结果,将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

3 功能说明

3.1 配置样例

  • 配置一个从Http请求的接口通过分页的方式(自动遍历所有页数),并同步抽取数据到Mysql的作业(mysql需要提前手动创建表):
{
    "job": {
        "setting": {
            "speed": {
            //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它.
                "byte":10485760
            },  
            //出错限制
            "errorLimit": {
                //出错的record条数上限,当大于该值即报错。
                "record": 0,
                //出错的record百分比上限 1.0表示100%,0.02表示2%
                "percentage": 0.02
            }   
        },  
        "content": [ { 
                "reader": {
                  //指定插件为httpreader
                  "name": "httpreader",
                  "parameter": {
                    //指定需要访问的http接口地址
                    "httpUrl": "http://192.168.0.201:8121/keyPoint/queryByPage",
                    //请求接口的方式,目前支持get,post
                    "requestType": "get",
                    //请求接口要传的参数集合(get、post都可以通过集合传参)
                    "requestParams": {
                      "keyPointTypes": "8"
                    },
                    //在接口返回的结果需要获取的字段,只需要指定需要落地的字段(属性名),将会且只会从http请求返回的结果中解析column中的字段
                    "column": [
                      "pointCode",
                      "keyPointName",
                      "createTime"
                    ],
                    //指定返回结果需要解析(落地)的数据对象的key,以json为例,如下结果例子,要使记录落地,需要配置记录在结果中的Key(多层级时用‘->’表示)
                    "resultChannel": "msg->records",
                    //设置结果返回类型(未来将支持xml)
                    "resultType": "json",
                    //如果接口为分页接口,可以设置开启分页查询,如果不需要可设置为false,后续的参数可以不配置,默认为false,将会根据分页参数配置查询接口所有的页内容(配置一次即可同步分页接口所有数据)
                    "isPageable": "true",
                    //指定返回结果分页的数据对象的key,参考resultChannel
                    "resultPageChannel": "msg",
                    //分页查询参数名配置,主要是配置页数、页码(查接口时传第几页、每页条数的两个参数的属性名)
                    "pageRequestConfig": {
                      //页码
                      "pageNumField": "pageNo",
                      //页数
                      "pageSizeField": "pageSize"
                    },
                    //分页属性在结果中的配置,主要是针对部分分页查询接口结果分页参数名与请求时的参数名不一致,因此单独配置(一致的也需要配置)
                    "pageResultConfig": {
                      "currentPageField": "current",
                      "pageSizeField": "size",
                      "pagesNumField": "pages",
                      "totalField": "total",
                      "recordsField": "records"
                    }
                  }  
                },
                "writer": {
                  //参考https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
                  "name": "mysqlwriter",
                  "parameter": {
                    //如果需要增量更新,需要确保表中设置了主键,且在reader中也有对应的主键字段属性值
                    "writeMode": "update",
                    "username": "root",
                    "password": "123456",
                    "column": [
                      "data_Id",
                      "POINT_CODE",
                      "KEY_POINT_NAME",
                      "CREATE_TIME"
                    ],
                    "connection": [
                      {
                        "table": [
                          "T_INFO_KEY_POINT"
                        ],
                        "jdbcUrl": "jdbc:mysql://192.168.0.104:3306/Hive?characterEncoding=utf8"
                      }
                    ]
                  }
                }
              }   
            ]   
      }   
}
  • 配置一个从Http请求的接口(设置头部tokent),并同步抽取数据到Mysql的作业(mysql需要提前手动创建表):
{
  "job": {
    "setting": {
      "speed": {
        "byte": 10485760
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "httpreader",
          "parameter": {
            "tokenHttpConfig": {
              "httpUrl": "http://wj.iot.zjagr.com:9999/accounts/tokens",
              "requestType": "post",
              "requestParams": {
                "username": "lz",
                "password": "123456"
              },
              "redisKey": "zhnyToken",
              "tokenColumn": "token",
              "tokenValuePrefix": "Bearer "
            },
            "httpUrl": "http://wj.iot.zjagr.com:9999/devices",
            "requestType": "get",
            "requestParams": {
              "type_id": "1"
            },
            "column": [
              "id",
              "type_id",
              "account_id",
              "area_id",
              "name",
              "device_sn",
              "position.lat",
              "position.lng",
              "created_at",
              "updated_at",
              "type.name",
              "area.name"
            ],
            "resultChannel": "items",
            "resultType": "json"
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "update",
            "username": "root",
            "password": "123456",
            "column": [
              "ID",
              "TYPE_ID",
              "ACCOUNT_ID",
              "AREA_ID",
              "NAME",
              "DEVICE_SN",
              "LAT",
              "LNG",
              "CREATED_AT",
              "UPDATED_AT",
              "TYPE_NAME",
              "AREA_NAME"
            ],
            "connection": [
              {
                "table": [
                  "T_ZHNY_DEVICE"
                ],
                "jdbcUrl": "jdbc:mysql://192.168.0.105:3306/citybrain?characterEncoding=utf8"
              }
            ]
          }
        }
      }
    ]
  }
}
  • 配置一个从webservice请求的接口,并同步抽取数据到Mysql的作业(mysql需要提前手动创建表):
{
  "job": {
    "setting": {
      "speed": {
        "byte": 10485760
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "httpreader",
          "parameter": {
            "httpUrl": "http://5g_museum.smart-wo.com:8090/webservice/pubWebservice?wsdl",
            "requestType": "webservice",
            "requestParams": [
              {
                "arg3": 1
              },
              {
                "arg4": 10
              }
            ],
            "wsMethod": "cpjbxxList",
            "wsTargetNamespace": "http://control.webservice.crrs.com/",
            "wsServiceName": "ISubWebserviceService",
            "column": [
              "id",
              "bzxlname",
              "wwmc",
              "bzdlname",
              "dqztname",
              "constant()",
              "decode(bzxlname,瓷器,1,玉石器、宝石,2,3)"
            ],
            "columnCaseSensitivity": false,
            "resultChannel": "resultMap->entry->value",
            "resultType": "json",
            "isPageable": "true",
            "resultPageChannel": "resultMap->entry->value",
            "pageRequestConfig": {
              "pageNumField": "arg3",
              "pageSizeField": "arg4"
            }
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "update",
            "username": "root",
            "password": "123456",
            "column": [
              "id",
              "bzxl_name",
              "wwmc",
              "bzdl_name",
              "dqzt_name",
              "type_code",
              "bzxl_code"
            ],
            "connection": [
              {
                "table": [
                  "T_INFO_CP"
                ],
                "jdbcUrl": "jdbc:mysql://192.168.0.104:3306/Hive?characterEncoding=utf8"
              }
            ]
          }
        }
      }
    ]
  }
}
  • 从一个http请求获取数据,并把某些列保存到redis,在下一个http请求中使用这些参数,以下提供两个接口分别的配置:
{
  "job": {
    "setting": {
      "speed": {
        "byte": 10485760
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "httpreader",
          "parameter": {
            "tokenHttpConfig": {
              "redisKey": "accessToken",
              "httpUrl": "http://106.54.37.109:3000/api/auth/gettoken",
              "requestType": "post",
              "requestParams": {
                "appKey": "e54242e58a07072572a25f6c984f8501",
                "appSecret": "5c06cb79a4a376877999ce45413b25ee"
              },
              "tokenColumn": "data->accessToken",
              "tokenKey": "accessToken",
              "inHeader": false
            },
            "httpUrl": "http://106.54.37.109:3000/api/live/video/list",
            "requestType": "post",
            "requestParams": {
              "pageIndex": 1,
              "pageSize": 10
            },
            "column": [
              "deviceSerial",
              "channelNo",
              "deviceName",
              "liveAddress",
              "hdAddress",
              "rtmp",
              "rtmpHd",
              "flvAddress",
              "hdFlvAddress",
              "status",
              "exception",
              "beginTime",
              "endTime"
            ],
            "resultChannel": "data",
            "resultType": "json",
            "isPageable": "true",
            "resultPageChannel": "",
            "pageRequestConfig": {
              "pageNumField": "pageIndex",
              "pageSizeField": "pageSize"
            },
            "pageResultConfig": {
              "totalField": "total",
              "recordsField": "data"
            },
            "redisParamsConfig": {
              "isEnable": true,
              "column": [
                "deviceSerial",
                "channelNo"
              ],
              "paramsKey": "liveVideoSerialAndNo",
              "ttlTime": -1
            }
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "update",
            "username": "root",
            "password": "123456",
            "column": [
              "DEVICE_SERIAL",
              "CHANNEL_NO",
              "DEVICE_NAME",
              "LIVE_ADDRESS",
              "HD_ADDRESS",
              "RTMP",
              "RTMP_HD",
              "FLV_ADDRESS",
              "HD_FLV_ADDRESS",
              "STATUS",
              "EXCEPTION",
              "BEGIN_TIME",
              "END_TIME"
            ],
            "connection": [
              {
                "table": [
                  "t_data_live_video"
                ],
                "jdbcUrl": "jdbc:mysql://192.168.0.105:3306/citybrain?characterEncoding=utf8"
              }
            ]
          }
        }
      }
    ]
  }
}
{
  "job": {
    "setting": {
      "speed": {
        "byte": 10485760
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "httpreader",
          "parameter": {
            "tokenHttpConfig": {
              "redisKey": "accessToken",
              "httpUrl": "http://106.54.37.109:3000/api/auth/gettoken",
              "requestType": "post",
              "requestParams": {
                "appKey": "e54242e58a07072572a25f6c984f8501",
                "appSecret": "5c06cb79a4a376877999ce45413b25ee"
              },
              "tokenColumn": "data->accessToken",
              "tokenKey": "accessToken",
              "inHeader": false
            },
            "httpUrl": "http://106.54.37.109:3000/api/device/getstatus",
            "requestType": "post",
            "requestParams": {
              "deviceSerial": "redis(liveVideoSerialAndNo,deviceSerial)",
              "channel": "redis(liveVideoSerialAndNo,channelNo)"
            },
            "column": [
              "deviceSerial",
              "channel",
              "privacyStatus",
              "pirStatus",
              "alarmSoundMode"
            ],
            "resultChannel": "data",
            "resultType": "json"
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "update",
            "username": "root",
            "password": "123456",
            "column": [
              "DEVICE_SERIAL",
              "CHANNEL",
              "PRIVACY_STATUS",
              "PIR_STATUS",
              "ALARM_SOUND_MODE"
            ],
            "connection": [
              {
                "table": [
                  "t_data_device_state"
                ],
                "jdbcUrl": "jdbc:mysql://192.168.0.105:3306/citybrain?characterEncoding=utf8"
              }
            ]
          }
        }
      }
    ]
  }
}

3.2 参数说明

  • httpUrl

    • 描述:http接口地址,尽量不要带上参数。

    • 必选:是

    • 默认值:无

  • requestType

    • 描述:请求方式(webservice时,httpUrl需要填写对应的wsdl地址)

    • 必选:是

    • 默认值:无

    • 例子:get|post|webservice

  • requestParams

    • 描述:请求参数,get|post以JSON对象({key:value})的形式配置,webservice心JSON数组([{key1:value1},{key2:value2},{key3-1:value3-1,key3-2:value3-2}]),由于ws使用soap协议,因此参数需要以数组形式配置。支持redis()表达式,从Redis中读取参数,格式:redis(keyInRedis)|redis(keyInRedis,columnInRedis)

    • 必选:否

    • 默认值:无

  • column

    • 描述:填写需要从请求结果中获取的属性,使用JSON的数组描述字段信息,由于http请求结果容易无序,所以此字段不支持[*]配置。支持columnParentKey.cloumnChildKey方式,支持表达式:constant(常量),decode(column1,condition1,val1,condition2,val2...,default val)

    • 必选:是

    • 默认值:无

  • resultType

    • 描述:设置结果返回类型(未来将支持xml)。

    • 必选:是

    • 默认值:无

  • resultChannel

    • 描述:指定返回结果需要解析(落地)的数据对象的key,以json为例,如下结果例子,要使记录落地,需要配置记录在结果中的Key(多层级时用‘->’表示)

    • 必选:是

    • 默认值:无

    • 结果例子: {"code":200,"type":"success","msg":{"offset":0,"total":23,"size":20,"pages":2,"current":1,"searchCount":true,"openSort":true,"ascs":null,"descs":null,"orderByField":null,"records":[{"dataId":303,"pointCode":"440400200000007017","keyPointName":"斗门收费站","x":null,"y":null,"createTime":"2019-09-25 18:00:00","createUser":"zhz","updateTime":"2019-09-25 18:00:00","remarks":"","keyPointType":"8"}],"condition":null,"asc":true},"timestamp":1587095207330}

  • requestHeader

    • 描述:请求头

    • 必选:否

    • 默认值:无

  • tokenHttpConfig

    • 描述:请求${httpUrl}如果需要带上token时可以在这里配置token的获取(http方式获取)。

    • 必选:否

    • 默认值:无

      • redisKey

        • 描述:token缓存在redis中的Key。

        • 必选:否

        • 默认值:无

      • redisTimeOut

        • 描述:token缓存的Key的过期时间 单位s 默认2天过期。

        • 必选:否

        • 默认值:无

      • httpUrl

        • 描述:token获取的http请求地址。

        • 必选:否

        • 默认值:无

      • requestParams

        • 描述:token获取的http请求参数。

        • 必选:否

        • 默认值:无

      • tokenColumn

        • 描述:token获取的http请求响应数据 token对应字段,"->"分隔,参考resultChannl。

        • 必选:否

        • 默认值:无

      • inHeader

        • 描述:token获取之后的传值方式 默认true,放在请求头中。

        • 必选:否

        • 默认值:true

      • tokenKey

        • 描述:token的key在请求头中的key, 默认 Authorization。

        • 必选:否

        • 默认值:Authorization

      • tokenValuePrefix

        • 描述:token在请求头中的value前缀。

        • 必选:否

        • 默认值:无

  • columnCaseSensitivity

    • 描述:${column}字段大小写敏感,即是否区分大小写,默认区分。

    • 必选:否

    • 默认值:true

  • wsMethod

    • 描述:webservice专用-调用方法名(<wsdl:operation name="{wsMethod}">)

    • 必选:否(webservice请求时必填)

    • 默认值:无

  • wsTargetNamespace

    • 描述:webservice专用-wsdl的命名空间(targetNamespace)

    • 必选:否(webservice请求时必填)

    • 默认值:无

  • wsServiceName

    • 描述:webservice专用-wsdl的service的名字(<wsdl:service name="{wsServiceName}">)

    • 必选:否(webservice请求时必填)

    • 默认值:无

  • isPageable

    • 描述:如果接口为分页接口,可以设置开启分页查询,如果不需要可设置为false,后续的参数可以不配置,默认为false,将会根据分页参数配置查询接口所有的页内容(配置一次即可同步分页接口所有数据)

    • 必选:否

    • 默认值:false

  • resultPageChannel

    • 描述:指定返回结果分页的数据对象的key,参考resultChannel

    • 必选:isPageable为true时必选

    • 默认值:无

  • pageRequestConfig

    • 描述:分页查询参数名配置,主要是配置页数、页码(查接口时传第几页、每页条数的两个参数的属性名)

    • 必选:isPageable为true时必选

    • 默认值:无

      • pageNumField

        • 描述:页码字段名,表示查询第几页的属性

        • 必选:isPageable为true时必选

        • 默认值:无

      • pageSizeField

        • 描述:页数字段名,表示每页大小的属性

        • 必选:isPageable为true时必选

        • 默认值:无

  • pageResultConfig

    • 描述:分页属性在结果中的字段名配置,主要是针对部分分页查询接口结果分页参数名与请求时的参数名不一致,因此单独配置(一致的也需要配置)

    • 必选:isPageable为true时必选

    • 默认值:无

      • currentPageField

        • 描述:页码字段名,表示当前为第几页的属性

        • 必选:isPageable为true时必选

        • 默认值:无

      • pageSizeField

        • 描述:页数字段名,表示每页大小的属性

        • 必选:isPageable为true时必选

        • 默认值:无

      • pagesNumField

        • 描述:总页数字段名,表示有多少页的属性

        • 必选:isPageable为true时必选

        • 默认值:无

      • totalField

        • 描述:数据总量字段名,表示所有记录多少的属性

        • 必选:isPageable为true时必选

        • 默认值:无

      • recordsField

        • 描述:记录实体字段名,表示分页具体数据实例的属性

        • 必选:isPageable为true时必选

        • 默认值:无

  • redisParamsConfig

    • 描述:将请求结果缓存到Reids中

    • 必选:否

    • 默认值:无

      • isEnable

        • 描述:是否启用

        • 必选:否

        • 默认值:false

      • column

        • 描述:从响应结果中保存哪些列到Redis中

        • 必选:是

        • 默认值:无

      • paramsKey

        • 描述:是否启用

        • 必选:是

        • 默认值:无

      • ttlTime

        • 描述:缓存过期时长

        • 必选:否

        • 默认值:-1

3.3 类型转换

都以字符串作为最终类型

4 性能报告

5 约束限制

基于datax的架构上实现,因此一个Reader只支持一种格式规范的输出;

6 FAQ