Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for collections #21

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ The connector has the following limitations:
- Only row-level operations are produced (`INSERT`, `UPDATE`, `DELETE`):
- Partition deletes - those changes are ignored
- Row range deletes - those changes are ignored
- No support for collection types (`LIST`, `SET`, `MAP`) and `UDT` - columns with those types are omitted from generated messages
- No support for preimage and postimage - changes only contain those columns that were modified, not the entire row before/after change. More information [here](#cell-representation)

## Connector installation
Expand Down Expand Up @@ -192,6 +191,27 @@ If the operation did not modify the `v` column, the data event will contain the

See `UPDATE` example for full data change event's value.

#### Collections
Connector supports both frozen and non-frozen collections.
Format for frozen collections is as follows (those structs will be stored in "Cell" mentioned above):
- `List` and `Set` of type T are represented as `Schema.array(T)`. In the JSON format, this is also an array.
- `Map` with key type K and value type V is represented as `Schema.map(K, V)`. In JSON, this is an array (not object!) of 2-element arrays (first element is key, second is value).
- `UDT` is represented as a struct. In JSON, this is an object.

Non-frozen collections are a bit more complicated. `scylla.collections.mode` config defines which representation will be used. Currently, only `delta` mode is supported. In the future, more modes (e.g. preimage / postimage) may be added.

##### Non-frozen collections: delta mode.
Each non-frozen collection column is represented as a struct, with fields `mode` and `elements`. This struct will be stored in "Cell" described previously.
`mode` can be:
- `MODIFY` - elements were added or deleted.
- `OVERWRITE` - whole content of collection was removed, and new elements were added. If no elements were added (meaning the collection was just removed), this mode won't be used - instead, whole struct (stored in `field` value of "Cell" struct, as mentioned previously) will be null.

Type of `elements` field depends on collection type:
- For `Set` of type T it will be `Schema.map(T, Schema.BOOLEAN_SCHEMA)`. The boolean value signals wheter value was added (true) or removed (false) from set.
- For `List` of type T, it will be `Schema.map(Schema.STRING_SCHEMA, T)` - key of this map is timeuuid, as described in https://docs.scylladb.com/using-scylla/cdc/cdc-advanced-types/#lists. Removed elements are marked by null value.
- For `Map` with key K and value V, it will be `Schema.map(K, V)` (same as in frozen collection). Removed elements are marked by null value.
- For `UDT` it will be struct representing this UDT, bit a bit differently than in frozen UDT: each field of this struct is a "Cell" (a struct with a single field, `value`). "Cell" is used the same way as with columns - null means that the field wasn't changed, "Cell" with null value means field was removed, field with non-null value means that field was overwritten.

#### ScyllaExtractNewState transformer
Connector provides one single message transformation (SMT), `ScyllaExtractNewState` (class: `com.scylladb.cdc.debezium.connector.transforms.ScyllaExtractNewState`).
This SMT works like exactly like `io.debezium.transforms.ExtractNewRecordState` (in fact it is called underneath), but also flattens structure by extracting values from aforementioned single-field structures.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.scylladb.cdc.debezium.connector;

public enum CollectionOperation {
MODIFY, // Add or remove elements
OVERWRITE, // Overwrite entire collection
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.scylladb.cdc.debezium.connector;

public enum CollectionsMode {
DELTA,
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
import com.scylladb.cdc.model.worker.ChangeSchema;
import com.scylladb.cdc.model.worker.cql.Cell;
import com.scylladb.cdc.model.worker.cql.CqlDate;
import com.scylladb.cdc.model.worker.cql.CqlDuration;
import com.scylladb.cdc.model.worker.cql.Field;
import io.debezium.data.Envelope;
import io.debezium.pipeline.AbstractChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.util.Clock;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
import java.util.*;
import java.util.stream.Collectors;

public class ScyllaChangeRecordEmitter extends AbstractChangeRecordEmitter<ScyllaCollectionSchema> {

Expand Down Expand Up @@ -96,53 +98,162 @@ protected void emitDeleteRecord(Receiver receiver, ScyllaCollectionSchema scylla

private void fillStructWithChange(ScyllaCollectionSchema schema, Struct keyStruct, Struct valueStruct, RawChange change) {
for (ChangeSchema.ColumnDefinition cdef : change.getSchema().getNonCdcColumnDefinitions()) {
if (!ScyllaSchema.isSupportedColumnSchema(cdef)) continue;

Object value = translateCellToKafka(change.getCell(cdef.getColumnName()));
if (!ScyllaSchema.isSupportedColumnSchema(change.getSchema(), cdef)) continue;

if (cdef.getBaseTableColumnType() == ChangeSchema.ColumnType.PARTITION_KEY || cdef.getBaseTableColumnType() == ChangeSchema.ColumnType.CLUSTERING_KEY) {
Object value = translateFieldToKafka(change.getCell(cdef.getColumnName()), schema.keySchema().field(cdef.getColumnName()).schema());
valueStruct.put(cdef.getColumnName(), value);
keyStruct.put(cdef.getColumnName(), value);
} else {
Boolean isDeleted = this.change.getCell("cdc$deleted_" + cdef.getColumnName()).getBoolean();
if (value != null || (isDeleted != null && isDeleted)) {
Struct cell = new Struct(schema.cellSchema(cdef.getColumnName()));
cell.put(ScyllaSchema.CELL_VALUE, value);
valueStruct.put(cdef.getColumnName(), cell);
continue;
}

Schema cellSchema = schema.cellSchema(cdef.getColumnName());
if (ScyllaSchema.isNonFrozenCollection(change.getSchema(), cdef)) {
Struct value = translateNonFrozenCollectionToKafka(valueStruct, change, cellSchema, cdef);
valueStruct.put(cdef.getColumnName(), value);
continue;
}

Schema innerSchema = cellSchema.field(ScyllaSchema.CELL_VALUE).schema();
Object value = translateFieldToKafka(change.getCell(cdef.getColumnName()), innerSchema);
Boolean isDeleted = this.change.getCell("cdc$deleted_" + cdef.getColumnName()).getBoolean();

if (value != null || (isDeleted != null && isDeleted)) {
Struct cell = new Struct(cellSchema);
cell.put(ScyllaSchema.CELL_VALUE, value);
valueStruct.put(cdef.getColumnName(), cell);
}
}
}

private Struct translateNonFrozenCollectionToKafka(Struct valueStruct, RawChange change, Schema cellSchema, ChangeSchema.ColumnDefinition cdef) {
Schema innerSchema = cellSchema.field(ScyllaSchema.CELL_VALUE).schema();
Struct cell = new Struct(cellSchema);
Struct value = new Struct(innerSchema);

Cell elementsCell = change.getCell(cdef.getColumnName());
Cell deletedElementsCell = change.getCell("cdc$deleted_elements_" + cdef.getColumnName());
boolean isDeleted = Boolean.TRUE.equals(change.getCell("cdc$deleted_" + cdef.getColumnName()).getBoolean());

Object elements;
boolean hasModified = false;
switch (elementsCell.getDataType().getCqlType()) {
case SET: {
Schema elementsSchema = innerSchema.field(ScyllaSchema.ELEMENTS_VALUE).schema();
Schema scyllaElementsSchema = SchemaBuilder.array(elementsSchema.keySchema()).build();
List<Object> addedElements = (List<Object>) translateFieldToKafka(elementsCell, scyllaElementsSchema);
List<Object> deletedElements = (List<Object>) translateFieldToKafka(deletedElementsCell, scyllaElementsSchema);
Map<Object, Boolean> delta = new HashMap<>();
if (addedElements != null) {
addedElements.forEach(element -> delta.put(element, true));
}
if (deletedElements != null) {
deletedElements.forEach(element -> delta.put(element, false));
}

hasModified = !delta.isEmpty();
elements = delta;
break;
}
case LIST:
case MAP: {
Schema elementsSchema = innerSchema.field(ScyllaSchema.ELEMENTS_VALUE).schema();
Schema deletedElementsScyllaSchema = SchemaBuilder.array(elementsSchema.keySchema()).optional().build();
Map<Object, Object> addedElements = (Map<Object, Object>) ObjectUtils.defaultIfNull(translateFieldToKafka(elementsCell, elementsSchema), new HashMap<>());
List<Object> deletedKeys = (List<Object>)translateFieldToKafka(deletedElementsCell, deletedElementsScyllaSchema);
if (deletedKeys != null) {
deletedKeys.forEach((key) -> {
addedElements.put(key, null);
});
}

hasModified = !addedElements.isEmpty();
elements = addedElements;
break;
}
case UDT: {
List<Short> deletedKeysList = (List<Short>) translateFieldToKafka(deletedElementsCell, SchemaBuilder.array(Schema.INT16_SCHEMA).optional().build());
Set<Short> deletedKeys;
if (deletedKeysList == null) {
deletedKeys = new HashSet<>();
} else {
deletedKeys = new HashSet<>(deletedKeysList);
}

Map<String, Field> elementsMap = elementsCell.getUDT();
assert elementsMap instanceof LinkedHashMap;

Schema udtSchema = innerSchema.field(ScyllaSchema.ELEMENTS_VALUE).schema();
Struct udtStruct = new Struct(udtSchema);
Short index = -1;
for (Map.Entry<String, Field> element : elementsMap.entrySet()) {
index++;
if ((!element.getValue().isNull()) || deletedKeys.contains(index)) {
hasModified = true;
Schema fieldCellSchema = udtSchema.field(element.getKey()).schema();
Struct fieldCell = new Struct (fieldCellSchema);
if (element.getValue().isNull()) {
fieldCell.put(ScyllaSchema.CELL_VALUE, null);
} else {
fieldCell.put(ScyllaSchema.CELL_VALUE, translateFieldToKafka(element.getValue(), fieldCellSchema.field(ScyllaSchema.CELL_VALUE).schema()));
}
udtStruct.put(element.getKey(), fieldCell);
}
}

elements = udtStruct;
break;
}
default:
throw new RuntimeException("Unreachable");
}

if (!hasModified) {
if (isDeleted) {
cell.put(ScyllaSchema.CELL_VALUE, null);
return cell;
} else {
return null;
}
}

CollectionOperation mode = isDeleted ? CollectionOperation.OVERWRITE : CollectionOperation.MODIFY;
value.put(ScyllaSchema.MODE_VALUE, mode.toString());
value.put(ScyllaSchema.ELEMENTS_VALUE, elements);

cell.put(ScyllaSchema.CELL_VALUE, value);
return cell;
}

private Object translateCellToKafka(Cell cell) {
ChangeSchema.DataType dataType = cell.getColumnDefinition().getCdcLogDataType();
private Object translateFieldToKafka(Field field, Schema resultSchema) {
ChangeSchema.DataType dataType = field.getDataType();

if (cell.getAsObject() == null) {
if (field.getAsObject() == null) {
return null;
}

if (dataType.getCqlType() == ChangeSchema.CqlType.DECIMAL) {
return cell.getDecimal().toString();
return field.getDecimal().toString();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.UUID) {
return cell.getUUID().toString();
return field.getUUID().toString();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.TIMEUUID) {
return cell.getUUID().toString();
return field.getUUID().toString();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.VARINT) {
return cell.getVarint().toString();
return field.getVarint().toString();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.INET) {
return cell.getInet().getHostAddress();
return field.getInet().getHostAddress();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.DATE) {
CqlDate cqlDate = cell.getDate();
CqlDate cqlDate = field.getDate();
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
calendar.clear();
// Months start from 0 in Calendar:
Expand All @@ -151,9 +262,51 @@ private Object translateCellToKafka(Cell cell) {
}

if (dataType.getCqlType() == ChangeSchema.CqlType.DURATION) {
return cell.getDuration().toString();
return field.getDuration().toString();
}

return cell.getAsObject();
if (dataType.getCqlType() == ChangeSchema.CqlType.LIST) {
Schema innerSchema = resultSchema.valueSchema();
return field.getList().stream().map((element) -> this.translateFieldToKafka(element, innerSchema)).collect(Collectors.toList());
}

if (dataType.getCqlType() == ChangeSchema.CqlType.SET) {
Schema innerSchema = resultSchema.valueSchema();
return field.getSet().stream().map((element) -> this.translateFieldToKafka(element, innerSchema)).collect(Collectors.toList());
}

if (dataType.getCqlType() == ChangeSchema.CqlType.MAP) {
Map<Field, Field> map = field.getMap();
Map<Object, Object> kafkaMap = new LinkedHashMap<>();
Schema keySchema = resultSchema.keySchema();
Schema valueSchema = resultSchema.valueSchema();
map.forEach((key, value) -> {
Object kafkaKey = translateFieldToKafka(key, keySchema);
Object kafkaValue = translateFieldToKafka(value, valueSchema);
kafkaMap.put(kafkaKey, kafkaValue);
});
return kafkaMap;
}

if (dataType.getCqlType() == ChangeSchema.CqlType.TUPLE) {
List<org.apache.kafka.connect.data.Field> fields_schemas = resultSchema.fields();
Struct tupleStruct = new Struct(resultSchema);
List<Field> tuple = field.getTuple();
for (int i = 0; i < tuple.size(); i++) {
tupleStruct.put("tuple_member_" + i, translateFieldToKafka(tuple.get(i), fields_schemas.get(i).schema()));
}
return tupleStruct;
}

if (dataType.getCqlType() == ChangeSchema.CqlType.UDT) {
Struct udtStruct = new Struct(resultSchema);
Map<String, Field> udt = field.getUDT();
udt.forEach((name, value) -> {
udtStruct.put(name, translateFieldToKafka(value, resultSchema.field(name).schema()));
});
return udtStruct;
}

return field.getAsObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
"the connection to Scylla to prioritize sending requests to " +
"the nodes in the local datacenter. If not set, no particular datacenter will be prioritized.");

public static final CollectionsMode DEFAULT_COLLECTIONS_MODE = CollectionsMode.DELTA;
public static final Field COLLECTIONS_MODE = Field.create("scylla.collections.mode")
.withDisplayName("Collections format")
.withEnum(CollectionsMode.class, DEFAULT_COLLECTIONS_MODE)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("How to represent non-frozen collections. Currently, only 'delta' mode is supported - in the future " +
"support for more modes may be added. 'Delta' mode: change in collection is represented as a struct " +
"with 2 fields, 'mode' and 'elements'. 'mode' describes what type of change happened (modifying collection, overwriting collection), " +
"'elements' contains added/removed elements.");
/*
* Scylla CDC Source Connector relies on heartbeats to move the offset,
* because the offset determines if the generation ended, therefore HEARTBEAT_INTERVAL
Expand All @@ -122,7 +132,7 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
private static final ConfigDefinition CONFIG_DEFINITION =
CommonConnectorConfig.CONFIG_DEFINITION.edit()
.name("Scylla")
.type(CLUSTER_IP_ADDRESSES, USER, PASSWORD, LOGICAL_NAME, CONSISTENCY_LEVEL, LOCAL_DC_NAME)
.type(CLUSTER_IP_ADDRESSES, USER, PASSWORD, LOGICAL_NAME, CONSISTENCY_LEVEL, LOCAL_DC_NAME, COLLECTIONS_MODE)
.connector(QUERY_TIME_WINDOW_SIZE, CONFIDENCE_WINDOW_SIZE)
.events(TABLE_NAMES)
.excluding(Heartbeat.HEARTBEAT_INTERVAL).events(CUSTOM_HEARTBEAT_INTERVAL)
Expand Down Expand Up @@ -192,6 +202,15 @@ public String getLocalDCName() {
return config.getString(ScyllaConnectorConfig.LOCAL_DC_NAME);
}

public CollectionsMode getCollectionsMode() {
String collectionsModeValue = config.getString(ScyllaConnectorConfig.COLLECTIONS_MODE);
try {
return CollectionsMode.valueOf(collectionsModeValue.toUpperCase());
} catch (IllegalArgumentException ex) {
return DEFAULT_COLLECTIONS_MODE;
}
}

@Override
public String getContextName() {
return "Scylla";
Expand Down
Loading