Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加 source & sink 端kafka #179

Closed
listonejava opened this issue Dec 29, 2022 · 6 comments
Closed

增加 source & sink 端kafka #179

listonejava opened this issue Dec 29, 2022 · 6 comments
Labels
3.7.1 enhancement New feature or request
Milestone

Comments

@listonejava
Copy link

TIS 开源平台中 没有实现 kafks端。请求实现。谢谢

@baisui1981 baisui1981 added the enhancement New feature or request label Feb 28, 2023
@baisui1981
Copy link
Member

baisui1981 commented Mar 2, 2023

查看airbyte 的kafka source端配置,主要有以下几项:

  1. kafka source端的基本信息,如groupId
  2. 通讯协议 protocol,有 plaintext,saslplantext,sasl ssl 三种
  3. Subscription Method
  4. MessageFormat,目前支持 json 和avro 两种

具体配置可以查看 https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json

根据spec.json 文件可以生成对应的 TIS plugin 脚手架代码:参考代码 plugins/tis-datax/tis-datax-kafka-plugin/src/main/java/com/qlangtech/tis/plugins/datax/kafka/reader/Conver2TISProps.java

屏幕快照 2023-03-02 上午10 11 03

kafka的sink端 配置基本如 source端,配置清单:https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json

屏幕快照 2023-03-02 上午10 26 53

@baisui1981 baisui1981 changed the title 增加 source sink 端kafka 增加 source & sink 端kafka Mar 16, 2023
@baisui1981
Copy link
Member

baisui1981 commented Mar 20, 2023

kafka 发送数据样例:

{
  "id": "76d6d59f-5d08-41e8-a460-4ec3e03ad4d1",
  "tableName": "orderinfo",
  "occure_time": "1679274580528",
  "event": "insert" / "update" / "delete"
  "data": {
    "order_id": "aabbbdddd",
    "product_name": "aabbcc"
  }
}

可以参考debezium 数据格式:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/formats/debezium/

@baisui1981
Copy link
Member

baisui1981 commented Mar 29, 2023

目前已经实现了一个一个雏形,不足的是 在发送到kafka 中的update 消息目前之后 after状态的数据,这样用户没法根据before的状态判断数据是否已经更改

可以参考: https://github.com/qlangtech/flink/blob/f850f556f7f26f99636058481e57a251a9b654fb/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

内部有两种debeziumJson和CanalJson两种格式实现:

  1. org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
  2. org.apache.flink.formats.json.canal.CanalJsonFormatFactory

@baisui1981 baisui1981 added this to the v3.7.1 milestone Apr 13, 2023
@baisui1981
Copy link
Member

baisui1981 commented Apr 16, 2023

已经测试通过:

采用debezium-json 格式:

{
	"before": null,
	"after": {
		"instance_id": "000102514520a464014",
		"order_id": "10000015361171282085747c1f92005a",
		"batch_msg": "",
		"type": 0,
		"ext": "{\"hitPrice\":0,\"isWait\":0,\"optionalType\":0}",
		"waitinginstance_id": "",
		"kind": 3,
		"parent_id": "",
		"pricemode": 1,
		"name": "test02",
		"makename": "",
		"taste": "",
		"spec_detail_name": "",
		"num": 1,
		"account_num": 1,
		"unit": "?",
		"account_unit": "",
		"price": 1,
		"member_price": 1,
		"fee": 1,
		"ratio": 100,
		"ratio_fee": 1,
		"ratio_cause": "",
		"status": 2,
		"kindmenu_id": "8000001564d710600164f9cd0c510172",
		"kindmenu_name": "??",
		"menu_id": "0",
		"memo": "",
		"is_ratio": 0,
		"entity_id": "80000015",
		"is_valid": 1,
		"create_time": 1535257651481,
		"op_time": 1535257670907,
		"last_ver": 3,
		"load_time": 1536117832,
		"modify_time": 1678704159,
		"draw_status": 0,
		"bookmenu_id": "",
		"make_id": "",
		"make_price": 0,
		"prodplan_id": "",
		"is_wait": 0,
		"specdetail_id": "",
		"specdetail_price": 0,
		"makeprice_mode": 1,
		"original_price": "1.0",
		"is_buynumber_changed": 1,
		"ratio_operator_id": "",
		"child_id": "",
		"kind_bookmenu_id": "",
		"specprice_mode": 2,
		"worker_id": "05eb689d72304a8d9064b20f58953d91",
		"is_backauth": 1,
		"service_fee_mode": 0,
		"service_fee": "0.0",
		"orign_id": "",
		"addition_price": 0,
		"has_addition": 0,
		"seat_id": ""
	},
	"op": "c",
	"source": {
		"table": "instancedetail"
	},
	"ts_ms": 1681615789046
}

使用canal-json 传输的消息例子:

{
	"data": [{
		"instance_id": "000102514520a464010",
		"order_id": "10000015361171282085747c1f92005a",
		"batch_msg": "",
		"type": 0,
		"ext": "{\"hitPrice\":0,\"isWait\":0,\"optionalType\":0}",
		"waitinginstance_id": "",
		"kind": 3,
		"parent_id": "",
		"pricemode": 1,
		"name": "test02",
		"makename": "",
		"taste": "",
		"spec_detail_name": "",
		"num": 1,
		"account_num": 1,
		"unit": "?",
		"account_unit": "",
		"price": 1,
		"member_price": 1,
		"fee": 1,
		"ratio": 100,
		"ratio_fee": 1,
		"ratio_cause": "",
		"status": 2,
		"kindmenu_id": "8000001564d710600164f9cd0c510172",
		"kindmenu_name": "美丽",
		"menu_id": "0",
		"memo": "",
		"is_ratio": 0,
		"entity_id": "80000015",
		"is_valid": 1,
		"create_time": 1535257651481,
		"op_time": 1535257670907,
		"last_ver": 3,
		"load_time": 1536117832,
		"modify_time": 1678704159,
		"draw_status": 0,
		"bookmenu_id": "",
		"make_id": "",
		"make_price": 0,
		"prodplan_id": "",
		"is_wait": 0,
		"specdetail_id": "",
		"specdetail_price": 0,
		"makeprice_mode": 1,
		"original_price": "1.0",
		"is_buynumber_changed": 1,
		"ratio_operator_id": "",
		"child_id": "",
		"kind_bookmenu_id": "",
		"specprice_mode": 2,
		"worker_id": "05eb689d72304a8d9064b20f58953d91",
		"is_backauth": 1,
		"service_fee_mode": 0,
		"service_fee": "0.0",
		"orign_id": "",
		"addition_price": 0,
		"has_addition": 0,
		"seat_id": ""
	}],
	"type": "INSERT",
	"table": "instancedetail",
	"ts": 1681616142062
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3.7.1 enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants