-
Notifications
You must be signed in to change notification settings - Fork 40
Description
I am trying to migrate my data from cassandra to elastic. I have two fields in my cassandra table i:e
technify_id which is string and products which is frozen which I created like this
CREATE TABLE technify.product12 (
technify_id text PRIMARY KEY,
products frozen<product>
)
The definition of frozen product is below:
CREATE TYPE technify.product (
additional_images list<frozen<additionalimages>>,
brand text,
brand_id text,
description text,
entity text,
height text,
id_uuid text,
image text,
length text,
manufacturer_id text,
meta_description text,
meta_keyword text,
meta_title text,
minimum text,
model text,
name text,
options list<frozen<options>>,
price text,
product_attributes list<frozen<product_attribute>>,
product_categories list<text>,
product_specials list<frozen<product_special>>,
product_url text,
quantity text,
shipping text,
sku text,
sort_order text,
status text,
store_date_added text,
store_date_available text,
store_date_modified text,
store_id text,
store_product_id text,
tag text,
tax_class_id text,
viewed text,
weight text,
width text
)
I made a cassandra source connector with following configurations in distributed mode which is working fine:
{
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
"connect.cassandra.key.space": "technify",
"tasks.max": "2",
"topics": "yousuf_replicate",
"connect.cassandra.kcql": "UPSERT INTO yousuf_replicate SELECT products,technify_id FROM product12 pk technify_id",
"connect.cassandra.password": "5Rivers..",
"connect.cassandra.username": "farhan",
"key.converter.schemas.enable": "true",
"connect.cassandra.contact.points": "10.0.0.242,10.0.0.243",
"connect.cassandra.port": "9042",
"name": "CassandraYousufReplicateSourceConnector",
"value.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
And this is the data what I am getting in my topic,
{
"topic": "yousuf_replicate",
"key": {
"schema": null,
"payload": null
},
"value": {
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "products"
},
{
"type": "string",
"optional": true,
"field": "technify_id"
}
],
"optional": false,
"name": "technify.product12"
},
"payload": {
"products": "{additional_images:[{image:'asdasd',product_image_id:'asdasd',store_product_id:'asdasdas'}],brand:'asdasdsad',brand_id:'asdasd',description:'asdasdasddas',entity:'asdasd',height:'asdasdas',id_uuid:'asdasd',image:'asdasd',length:'asdasdasd',manufacturer_id:'asdasd',meta_description:'asdasd',meta_keyword:'asdasd',meta_title:'asdasdasd',minimum:'asdasd',model:'asdasd',name:'ASDas',options:[{product_option_id:'asdasdas',product_option_value:{{name:'asdasd',quantity:'asdasd',option_value_id:'asdasd',price:'asdas',product_option_value_id:'asdads',price_prefix:'asddas',weight:'asdasdas',weight_prefix:'asdasd'}},name:'asdasd',option_id:'asdasd',value:'asdasd',required:'aasdasd'}],price:'asdasda',product_attributes:[{attribute_id:'asdasd',text:'asdads'}],product_categories:['abc','abs'],product_specials:[{date_start:'asdasd',price:'asdasd',product_special_id:'aadsadsas',date_end:'asdasdx'}],product_url:'asdasd',quantity:'asdasda',shipping:'asdasd',sku:'asdasd',sort_order:'asdasd',status:'asasdasd',store_date_added:'asdasdasd',store_date_available:'dadsaasd',store_date_modified:'asdasd',store_id:'12ads',store_product_id:'asdasdqw',tag:'asdasda',tax_class_id:'asdasdas',viewed:'asdasd',weight:'aas',width:'asdasd'}",
"technify_id": "asdada34"
}
},
"partition": 0,
"offset": 0
},
Note that I am receiving string format in 'products' field but I was expecting it in json or struct form.
However, then I made an elastic source connector because I want to maintain data in elastic as well, I made that connector by following @simplesteph's tutorial and he suggested me to open an issue here
Following are the configuration of elastic search sink connector:
{
"connector.class": "com.datamountaineer.streamreactor.connect.elastic.ElasticSinkConnector",
"type.name": "kafka-connect",
"topics": "yousuf_replicate",
"tasks.max": "2",
"connect.elastic.url.prefix": "elasticsearch",
"connect.elastic.cluster.name": "elasticsearch",
"key.ignore": "true",
"key.converter.schemas.enable": "true",
"connect.elastic.sink.kcql": "UPSERT INTO yousuf_test SELECT * from yousuf_replicate PK technify_id",
"connect.elastic.url": "elasticsearch://localhost:9300",
"value.converter.schemas.enable": "true",
"name": "ElasticYousufSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
and this is what I am getting in elasticsearch
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [{
"_index": "yousuf_test",
"_type": "yousuf_test",
"_id": "asdada34",
"_score": 1,
"_source": {
"products": "{additional_images:[{image:'asdasd',product_image_id:'asdasd',store_product_id:'asdasdas'}],brand:'asdasdsad',brand_id:'asdasd',description:'asdasdasddas',entity:'asdasd',height:'asdasdas',id_uuid:'asdasd',image:'asdasd',length:'asdasdasd',manufacturer_id:'asdasd',meta_description:'asdasd',meta_keyword:'asdasd',meta_title:'asdasdasd',minimum:'asdasd',model:'asdasd',name:'ASDas',options:[{product_option_id:'asdasdas',product_option_value:{{name:'asdasd',quantity:'asdasd',option_value_id:'asdasd',price:'asdas',product_option_value_id:'asdads',price_prefix:'asddas',weight:'asdasdas',weight_prefix:'asdasd'}},name:'asdasd',option_id:'asdasd',value:'asdasd',required:'aasdasd'}],price:'asdasda',product_attributes:[{attribute_id:'asdasd',text:'asdads'}],product_categories:['abc','abs'],product_specials:[{date_start:'asdasd',price:'asdasd',product_special_id:'aadsadsas',date_end:'asdasdx'}],product_url:'asdasd',quantity:'asdasda',shipping:'asdasd',sku:'asdasd',sort_order:'asdasd',status:'asasdasd',store_date_added:'asdasdasd',store_date_available:'dadsaasd',store_date_modified:'asdasd',store_id:'12ads',store_product_id:'asdasdqw',tag:'asdasda',tax_class_id:'asdasdas',viewed:'asdasd',weight:'aas',width:'asdasd'}",
"technify_id": "asdada34"
}
}]
}
}
What i expected is that the products field will be in json in elasticsearch.
Please guide me what I have to do to get data in json in elastic.