Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing Configuration Variable: cluster.key.deserializer #28

Closed
alexk2000 opened this issue Mar 8, 2020 · 8 comments
Closed

Missing Configuration Variable: cluster.key.deserializer #28

alexk2000 opened this issue Mar 8, 2020 · 8 comments

Comments

@alexk2000
Copy link

alexk2000 commented Mar 8, 2020

Hello,
Could be please help with this.
I use kafka 2.4.0 (http://apache.volia.net/kafka/2.4.0/kafka_2.12-2.4.0.tgz), connectors running in standalone mode.
Backup Sink Connector is working find for me and backups saved in /backup dir.
For restore I use two configs:

  1. connect-standalone-restore.properties
bootstrap.servers=kafka-test-2:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
rest.port=8084
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
  1. connect-backup-source.properties
name=backup-source
connector.class=de.azapps.kafkabackup.source.BackupSourceConnector
tasks.max=1
topics=test5
cluster.key.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
cluster.value.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
source.dir=/backup
batch.size=500
cluster.bootstrap.servers=kafka-test-2:9092

run restore

./bin/connect-standalone.sh restore-config/connect-standalone-restore.properties restore-config/connect-backup-source.properties

error

[2020-03-08 09:18:50,991] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser:117)
[2020-03-08 09:18:50,991] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser:118)
[2020-03-08 09:18:50,991] INFO Kafka startTimeMs: 1583659130991 (org.apache.kafka.common.utils.AppInfoParser:119)
[2020-03-08 09:18:50,996] INFO Created connector backup-source (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2020-03-08 09:18:50,996] INFO BackupSourceConfig values:
        batch.size = 500
        source.dir = /backup
        topics = test5
 (de.azapps.kafkabackup.source.BackupSourceConfig:347)
[2020-03-08 09:18:50,996] INFO WorkerSourceTask{id=backup-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-03-08 09:18:50,996] INFO WorkerSourceTask{id=backup-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-03-08 09:18:50,997] ERROR WorkerSourceTask{id=backup-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
java.lang.RuntimeException: Missing Configuration Variable: cluster.key.deserializer
        at de.azapps.kafkabackup.source.BackupSourceConfig.<init>(BackupSourceConfig.java:41)
        at de.azapps.kafkabackup.source.BackupSourceTask.start(BackupSourceTask.java:41)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-03-08 09:18:50,998] ERROR WorkerSourceTask{id=backup-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-03-08 09:18:50,998] INFO Stopped BackupSourceTask (de.azapps.kafkabackup.source.BackupSourceTask:159)
[2020-03-08 09:18:50,998] INFO [Producer clientId=connector-producer-backup-source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)
@LBmaster001
Copy link
Contributor

Add

cluster.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
cluster.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

To Your connect-backup-source.properties
And

cluster.key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
cluster.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

To Your connect-backup-sink.properties

@alexk2000
Copy link
Author

alexk2000 commented Mar 9, 2020

[2020-03-09 12:07:42,273] INFO WorkerSourceTask{id=backup-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-03-09 12:07:42,273] INFO WorkerSourceTask{id=backup-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-03-09 12:07:42,274] ERROR WorkerSourceTask{id=backup-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:287)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value
        at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:600)
        at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:581)
        at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:335)
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:287)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 11 more
[2020-03-09 12:07:42,275] ERROR WorkerSourceTask{id=backup-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-03-09 12:07:42,276] INFO Stopped BackupSourceTask (de.azapps.kafkabackup.source.BackupSourceTask:159)
[2020-03-09 12:07:42,276] INFO [Producer clientId=connector-producer-backup-source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)
[2020-03-09 12:07:52,192] INFO WorkerSourceTask{id=backup-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-03-09 12:07:52,192] INFO WorkerSourceTask{id=backup-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-03-09 12:08:02,192] INFO WorkerSourceTask{id=backup-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)

@itadventurer
Copy link
Owner

itadventurer commented Mar 9, 2020

Hi,
you need to add following properties to the connect-backup-source.properties file:

key.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
value.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter

Kafka Backup supports only ByteArrays ;)

[edit]: Fixed key/value.converter classes

@alexk2000
Copy link
Author

alexk2000 commented Mar 9, 2020

Sorry, I am little bit consufed.
According to your documentation https://github.com/itadventurer/kafka-backup/blob/master/docs/Usage.md
Configure a Backup Source Connector (e.g. create a file connect-backup-source.properties):

…
key.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
value.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
...

My connect-backup-source.properties:

name=backup-source
connector.class=de.azapps.kafkabackup.source.BackupSourceConnector
tasks.max=1
topics=test3
cluster.key.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
cluster.value.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
source.dir=/backup
batch.size=500
cluster.bootstrap.servers=kafka-test-2:9092
cluster.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
cluster.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

So what key.converter/value.converter should be ?

@itadventurer
Copy link
Owner

Hi,
sorry for the confusion.

You need to use the de.azapps.kafkabackup.common.AlreadyBytesConverter as the key.converter and value.converter (not cluster.key.converter).

So your connect-backup-source.properties file should be:

name=backup-source
connector.class=de.azapps.kafkabackup.source.BackupSourceConnector
tasks.max=1
topics=test3
key.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
value.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
source.dir=/backup
batch.size=500
cluster.bootstrap.servers=kafka-test-2:9092
cluster.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
cluster.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

PS: Could you please use markdown syntax for your issues ;)

Hope this helps :)

@alexk2000
Copy link
Author

alexk2000 commented Mar 9, 2020

Thanks, it helped, no java errors, but not data restored, event topic no created (topic name - test2):

[2020-03-09 17:09:29,450] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser:117)
[2020-03-09 17:09:29,450] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser:118)
[2020-03-09 17:09:29,450] INFO Kafka startTimeMs: 1583773769450 (org.apache.kafka.common.utils.AppInfoParser:119)
[2020-03-09 17:09:29,456] INFO Created connector backup-source (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2020-03-09 17:09:29,456] INFO BackupSourceConfig values:
        batch.size = 500
        source.dir = /backup
        topics = test2
 (de.azapps.kafkabackup.source.BackupSourceConfig:347)
[2020-03-09 17:09:29,489] INFO Registered topic test2 partition Optional[0] (de.azapps.kafkabackup.source.BackupSourceTask:90)
[2020-03-09 17:09:29,534] INFO WorkerSourceTask{id=backup-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:209)
[2020-03-09 17:09:29,534] INFO All records read. Restore was successful (de.azapps.kafkabackup.source.BackupSourceTask:119)
[2020-03-09 17:09:29,545] INFO [Producer clientId=connector-producer-backup-source-0] Cluster ID: zz7F_G2tRuWu_gwxMBiGNA (org.apache.kafka.clients.Metadata:261)
[2020-03-09 17:09:34,535] INFO All records read. Restore was successful (de.azapps.kafkabackup.source.BackupSourceTask:119)
[2020-03-09 17:09:39,456] INFO WorkerSourceTask{id=backup-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-03-09 17:09:39,456] INFO WorkerSourceTask{id=backup-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-03-09 17:09:39,535] INFO All records read. Restore was successful (de.azapps.kafkabackup.source.BackupSourceTask:119)

@itadventurer
Copy link
Owner

Things to test:

  • Do you have any data actually backed up?
  • Try to delete /tmp/connect.offsets (as defined in your connect-standalone-restore.properties) file to reset your offsets

@alexk2000
Copy link
Author

Thank you very much, removing /tmp/connect.offsets helped, restored successfully.
Restore of consumer groups/offsets is awesome. Thank you for this tool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants