Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance documentation #61

Merged
merged 2 commits into from
Nov 30, 2021

Conversation

chulucninh09
Copy link
Contributor

No description provided.

Copy link
Member

@ismailsimsek ismailsimsek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @chulucninh09 it looks great. added few minor questions/comments

docs/CAVEATS.md Outdated Show resolved Hide resolved
This connector only packages with support for `hadoop` catalog.

## No automatic schema evolution
Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you had a chance to test it? i think schema change events are not processed and should not cause any error. schema change events are saved to file if its configured with debezium.source.database.history and debezium.source.database.history.file.filename.

I believe we could remove this section:
Schema change events can make the connector throw error. To workaround this, turn off schema change event in source setting.

and link this page to give more details about current schema change behavior
Schema Change Behaviour : https://github.com/memiiso/debezium-server-iceberg/blob/master/docs/DOCS.md#schema-change-behaviour

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I found when using this with SQL Server, you can check the following log

2021-11-28 14:38:51,416 INFO  [io.deb.ser.ice.tab.IcebergTableOperatorUpsert] (pool-7-thread-1) Committed 2047 events to table! s3a://test-iceberg/iceberg_warehouse8/debeziumevents/debeziumcdc_tutorial_dbo_person
2021-11-28 14:38:51,538 WARN  [io.deb.ser.ice.IcebergChangeConsumer] (pool-7-thread-1) Table not found: debeziumevents.debeziumcdc_tutorial
2021-11-28 14:38:51,539 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) {"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"int64","optional":true,"field":"event_serial_no"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":true,"field":"databaseName"},{"type":"string","optional":true,"field":"schemaName"},{"type":"string","optional":true,"field":"ddl"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"field":"id"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"defaultCharsetName"},{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"primaryKeyColumnNames"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"jdbcType"},{"type":"int32","optional":true,"field":"nativeType"},{"type":"string","optional":false,"field":"typeName"},{"type":"string","optional":true,"field":"typeExpression"},{"type":"string","optional":true,"field":"charsetName"},{"type":"int32","optional":true,"field":"length"},{"type":"int32","optional":true,"field":"scale"},{"type":"int32","optional":false,"field":"position"},{"type":"boolean","optional":true,"field":"optional"},{"type":"boolean","optional":true,"field":"autoIncremented"},{"type":"boolean","optional":true,"field":"generated"}],"optional":false,"name":"io.debezium.connector.schema.Column"},"optional":false,"field":"columns"}],"optional":false,"name":"io.debezium.connector.schema.Table","field":"table"}],"optional":false,"name":"io.debezium.connector.schema.Change"},"optional":false,"field":"tableChanges"}],"optional":false,"name":"io.debezium.connector.sqlserver.SchemaChangeValue"}
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Converting Schema of: ::struct
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [1] .source::struct
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Converting Schema of: source::struct
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [2] source.version::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [3] source.connector::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [4] source.name::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [5] source.ts_ms::int64
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [6] source.snapshot::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [7] source.db::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [8] source.sequence::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [9] source.schema::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [10] source.table::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [11] source.change_lsn::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [12] source.commit_lsn::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [13] source.event_serial_no::int64
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [14] .databaseName::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [15] .schemaName::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [16] .ddl::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [17] .tableChanges::array
2021-11-28 14:38:51,541 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) Stopping down connector
2021-11-28 14:40:21,542 WARN  [io.deb.pip.ChangeEventSourceCoordinator] (pool-7-thread-1) Coordinator didn't stop in the expected time, shutting down executor now
2021-11-28 14:40:24,324 WARN  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Snapshot was interrupted before completion
2021-11-28 14:40:24,325 INFO  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Snapshot - Final stage
2021-11-28 14:40:24,325 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Removing locking timeout
2021-11-28 14:40:24,327 WARN  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Change event source executor was interrupted: java.lang.InterruptedException
        at java.base/java.lang.Object.wait(Native Method)
        at io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
        at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
        at io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:446)
        at io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:176)
        at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:89)
        at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:49)
        at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:165)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:386)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:315)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:135)
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

2021-11-28 14:40:24,328 INFO  [io.deb.pip.met.StreamingChangeEventSourceMetrics] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Connected metrics set to 'false'
2021-11-28 14:40:24,329 INFO  [io.deb.jdb.JdbcConnection] (pool-14-thread-1) Connection gracefully closed
2021-11-28 14:40:24,330 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-7-thread-1) Stopped FileOffsetBackingStore
2021-11-28 14:40:24,331 INFO  [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: 'tableChanges' has Array type, Array type not supported!', error = '{}': java.lang.RuntimeException: 'tableChanges' has Array type, Array type not supported!
        at io.debezium.server.iceberg.IcebergUtil.getIcebergSchema(IcebergUtil.java:74)
        at io.debezium.server.iceberg.IcebergUtil.getIcebergSchema(IcebergUtil.java:35)
        at io.debezium.server.iceberg.IcebergUtil.getIcebergFieldsFromEventSchema(IcebergUtil.java:199)
        at io.debezium.server.iceberg.IcebergChangeConsumer.createIcebergTable(IcebergChangeConsumer.java:199)
        at io.debezium.server.iceberg.IcebergChangeConsumer.lambda$handleBatch$2(IcebergChangeConsumer.java:159)
        at java.base/java.util.Optional.orElseGet(Optional.java:369)
        at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:159)
        at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
        at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:821)
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
        at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:145)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

2021-11-28 14:40:24,348 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2021-11-28 14:40:24,348 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2021-11-28 14:40:24,367 INFO  [io.quarkus] (main) debezium-server-iceberg-dist stopped in 0.035s

This is the config when I didn't turn off schema change capture for SQL Server

# sql server source
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=52.221.232.4
debezium.source.database.port=1433
debezium.source.database.user=debezium
debezium.source.database.password=debezium
debezium.source.database.dbname=dms_sample
debezium.source.database.server.name=tutorial
debezium.source.table.include.list=dbo.person
# mandate for sql server source, avoid error when snapshot and schema change
#debezium.source.include.schema.changes=false

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like consumer is trying to create table debeziumevents.debeziumcdc_tutorial to store schema changes. but its failing because field 'tableChanges' is in Array type. Currently iceberg consumer is not supporting Array data type that's why exception thrown.

documentation

not sure what is the best way to explain it. maybe something like
Schema change topic has unsupported data type Array, its recommended to disable it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like even with array data type support its not possible to save Schema change topic . seems like tableChanges has kind of special type. it makes sense to recommend disable it for all connectors.

its failing with

Cannot deserialize value of type `java.util.LinkedHashMap<java.lang.Object,java.lang.Object>` from Array value (token `JsonToken.START_ARRAY`)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I also checked iceberg document when I found that error and decided to avoid that by turn off schema change event

docs/CAVEATS.md Outdated Show resolved Hide resolved
docs/CAVEATS.md Outdated Show resolved Hide resolved
docs/CAVEATS.md Outdated
By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many table that you don't really want to.

## AWS S3 credentials
You should inject environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY` to write to S3 or setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe its is possible to use application.properties to configure AWS S3
all the settings starting with debezium.sink.iceberg.<my.config> are passed to iceberg and from there used by iceberg

example from unit test
https://github.com/memiiso/debezium-server-iceberg/blob/master/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java#L131

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, saw that too, by default, I suggest turn off debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain to provide easier way to inject aws credential


## AWS S3 credentials
You can setup aws credentials in the following ways:
- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key` in `application.properties`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we highlight that is possible to pass any iceberg configuration using debezium.sink.iceberg.<my.iceberg.config>=xyz format? .it could be useful to know there are many iceberg configs and its possible to set them with application.properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This connector only packages with support for `hadoop` catalog.

## No automatic schema evolution
Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like consumer is trying to create table debeziumevents.debeziumcdc_tutorial to store schema changes. but its failing because field 'tableChanges' is in Array type. Currently iceberg consumer is not supporting Array data type that's why exception thrown.

documentation

not sure what is the best way to explain it. maybe something like
Schema change topic has unsupported data type Array, its recommended to disable it

@ismailsimsek ismailsimsek merged commit d80c235 into memiiso:master Nov 30, 2021
@ismailsimsek
Copy link
Member

merged, Thank you @chulucninh09

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants