Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Support for frozen collections and UDTs #12

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

avelanarius
Copy link
Member

@avelanarius avelanarius commented Sep 9, 2021

Add support for including frozen collections in generated Kafka changes:

  • frozen MAP
  • frozen LIST
  • frozen SET
  • frozen UDTs
  • tuples (at the moment, all tuples are frozen in Scylla)

Refs #9.

This is just a draft PR. Things left to do:

  • Finalize the representation of collections. For example, even though standard SchemaBuilder.map is used, maps are represented as arrays when using JSON converter class. Similarly, maybe there is a neater way to represent tuples (instead of struct with tuple_member_ fields). Finally, research is needed whether such a format works well with most Sink Connectors.
  • Additional testing. Only JSON converter was tested and it's very likely Avro does not work at the moment.
  • Documentation (fix README, etc.)

Piotr Grabowski added 5 commits September 9, 2021 18:35
Add support for including frozen lists in generated changes. Made
necessary changes to support nested data types.
Add support for including frozen sets in generated changes.
Add support for including frozen maps in generated changes.
Add support for including tuples in generated changes. For a tuple,
a Kafka Connect struct is created with "tuple_member_*" for each member
of a tuple (as they can have different data types inside).
Add support for including frozen UDTs in generated changes.
Copy link

@hartmut-co-uk hartmut-co-uk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with suggested changes applied (see comments) - I've successfully tested the connector with a list of UDT field

comments FROZEN<list<FROZEN<comment_type>>>,

I didn't test with Avro (yet) but with

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

I'll try to setup a more comprehensive example covering all new types + combinations.

case TUPLE:
case LIST: {
Schema innerSchema = computeColumnSchema(type.getTypeArguments().get(0));
return SchemaBuilder.array(innerSchema);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return SchemaBuilder.array(innerSchema);
return SchemaBuilder.array(innerSchema).optional().build();

case MAP: {
Schema keySchema = computeColumnSchema(type.getTypeArguments().get(0));
Schema valueSchema = computeColumnSchema(type.getTypeArguments().get(1));
return SchemaBuilder.map(keySchema, valueSchema);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return SchemaBuilder.map(keySchema, valueSchema);
return SchemaBuilder.map(keySchema, valueSchema).optional().build();

@hartmut-co-uk
Copy link

Does the kafka-connect-scylladb (sink) support collections and UDTs?
I'll try to find time to experiment and see if I can get a full pipeline working~:

scylla table -> cdc -> connect source -> kafka topic -> connect sink -> scylla table2

@hartmut-co-uk
Copy link

agree TUPLE/MAP types are not straightforward since data standards JSON/Avro do not support 'non-string' keys. (Protobuf Maps allow to define key type)
None of JSON/Avro/Proto come with native support for tuples.

Having this in mind I think as mentioned (and implemented) using SchemaBuilder.map should be the most appropriate way to support Maps?!?

@hartmut-co-uk
Copy link

I'll also try with Avro Converter and report back...

@hartmut-co-uk
Copy link

I've successfully tested a setup with Avro Converter with a Scylla table of various types.
(using a custom packaged scylla connector with the above proposed changes applied)

Findings

  • FROZEN collections all work fine (list, set, map)
  • maybe collection/udt types should be 'validated' to be FROZEN?
    (~ScyllaSchema.computeColumnSchema -> type.isFrozen())
  • UDT fundamentally work, but struct schema currently results to be io.confluent.connect.avro.ConnectDefault which will clash as soon as more than a single UDT exists in the cluster.
    -> struct schema needs to be named for UDT
  • Tuples work e.g. JSON schema-less, but not with Avro
    • struct need to be named
    • a separate struct schema/name needs to be created for each table.column - so I think the struct schema name (Avro 'record') also needs to include the column name (+namespaced as per collector config.logicalName+keyspace+table)

Example

Table/Setup tested with (exlc. Tuple which caused connector failure with AvroConverter...)

CREATE TYPE xudt (
    c1 text,
    c2 text
);

CREATE TABLE tbl2 (
    pk1        bigint,
    pk2        bigint,
    ck1        bigint,
    ck2        bigint,
    xascii     ascii,
    xbigint    bigint,
    xblob      blob,
    xboolean   boolean,
    xdecimal   decimal,
    xdouble    double,
    xfloat     float,
    xint       int,
    xtext      text,
    xtimestamp timestamp,
    xuuid      uuid,
    xvarchar   varchar,
    xvarint    varint,
    xtimeuuid  timeuuid,
    xinet      inet,
    xdate      date,
    xtime      time,
    xsmallint  smallint,
    xtinyint   tinyint,
    xduration  duration,
    xudt       FROZEN<xudt>,
    l1         FROZEN<list<bigint>>,
    l2         FROZEN<list<FROZEN<xudt>>>,
    s1         FROZEN<set<bigint>>,
    m1         FROZEN<map<text, text>>,
    PRIMARY KEY ((pk1, pk2), ck1, ck2)
) WITH CDC = { 'enabled': true, 'preimage': false, 'postimage': false, 'ttl': 3600 };

INSERT INTO tbl2 (pk1, pk2, ck1, ck2, xascii, xbigint, xblob, xboolean, xdecimal, xdouble, xfloat, xint, xtext, xtimestamp, xuuid, xvarchar, xvarint, xtimeuuid, xinet, xdate, xtime, xsmallint, xtinyint, xduration, xudt, l1, l2, s1, m1)
VALUES (1, 2, 3, 4, 'xascii', 5, intAsBlob(6), true, 7, 8.9, 10.11, 12, 'text', '2011-02-03 04:05:01.321+0000', 123e4567-e89b-12d3-a456-426655440000, 'varchar', 13, 123e4567-e89b-12d3-a456-426655440000, '127.0.0.1', '2021-12-18', '08:12:54.123456789', 14, 1, 12h30m, {c1: 'c1', c2: 'c2'}, [3,2,1], [{c1: 'c1.1', c2: 'c2.1'}, {c1: 'c1.2', c2: 'c2.2'}], {2,3,1}, {'fruit': 'Apple', 'band': 'Beatles'});

CDC table...
image

Connector config

{
  "name": "avro-15",
  "config": {
    "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector",
    "scylla.name": "avro15",
    "scylla.cluster.ip.addresses": "poc1-scylla-1:9042,poc1-scylla-2:9042,poc1-scylla-3:9042",
    "scylla.table.names": "poc.tbl2",
    "scylla.query.time.window.size": "60000",
    "scylla.confidence.window.size": "10000",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://poc1-redpanda-1:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://poc1-redpanda-1:8081",
    "auto.create.topics.enable": "true",
    "heartbeat.interval.ms": "30000",
    "offset.flush.timeout.ms": "10000",
    "offset.flush.interval.ms": "10000",
    "poll.interval.ms": "1000",
    "tombstones.on.delete": "true",
    "transforms": "unwrap",
    "transforms.unwrap.type": "com.scylladb.cdc.debezium.connector.transforms.ScyllaExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "topic.creation.default.replication.factor": "3",
    "topic.creation.default.partitions": "3"
  }
}

Deserialized Avro sample output (using Kafdrop)

image

Excerpt of the resulting Avro schema

image

@hartmut-co-uk
Copy link

Please advise how to proceed.
Would a contribution (updated PR) be welcome or are there other plans with this?

@Lorak-mmk
Copy link
Contributor

Hi,
Thank you for all your testing and findings, it's really helpful.
We want to finish implementing frozen collections, and then start work on non-frozen collections. If you'd like to create an updated PR, that would be most welcome.

* maybe collection/udt types should be 'validated' to be FROZEN?
  (_~ScyllaSchema.computeColumnSchema_ -> `type.isFrozen()`)

Correct me if I'm wrong, but I think that they are validated - although in a non-straightforward way, which is explained in the comment. However, the PR mentioned in the comment () is now merged, so we can probably switch to isFrozen()?

@hartmut-co-uk
Copy link

note: scylladb/scylla-cdc-java#60 has been merged but a new version of the lib hasn't been released yet.

return tupleSchema.optional().build();
}
case UDT: {
SchemaBuilder udtSchema = SchemaBuilder.struct();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SchemaBuilder udtSchema = SchemaBuilder.struct();
SchemaBuilder udtSchema = SchemaBuilder.struct().name(type.getUdtType().getName());

@hansh0801
Copy link

track

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants