Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to write JSON schemaless events #185

Open
mroiter-larus opened this issue Jun 16, 2021 · 3 comments
Open

Unable to write JSON schemaless events #185

mroiter-larus opened this issue Jun 16, 2021 · 3 comments

Comments

@mroiter-larus
Copy link

Hi @jcustenborder,

I’m having some trouble trying to use SMT functions with SpoolDirSchemaLessJsonSourceConnector.
I would like to simply ingest some schemaless JSON events from a file into a topic, applying the ReplaceField SMT function. Here is the connector configuration:

name=testReplaceField
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector
tasks.max=1
topic=testReplaceField
input.path=/tmp/data
input.file.pattern=test-replaceField-no-schema.json
error.path=/tmp/data/error
finished.path=/tmp/data/finished
halt.on.error=false
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
transforms=RenameField
transforms.RenameField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames=foo:c1,bar:c2

my sample source file is populated as follow:

{"foo": 1, "bar": "test1"}
{"foo": 2, "bar": "test2"}
{"foo": 3, "bar": "test3"}

Despite i disabled the schemas for both the key and the value, it seems the SMT function is still interpreting my JSON events as if they had a schema. I got the following exception from the connector logs:

[2021-06-16 17:23:36,933] ERROR WorkerSourceTask{id=testReplaceField-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field replacement], found: java.lang.String
	at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
	at org.apache.kafka.connect.transforms.ReplaceField.applyWithSchema(ReplaceField.java:167)
	at org.apache.kafka.connect.transforms.ReplaceField.apply(ReplaceField.java:146)
	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	... 11 more

As you can see into the stacktrace, it keep going into the applyWithSchema method, which obviously fails!
As suggested here, i already tried to use StringConverter instead of JsonConverter but with no luck. Same error.

Am i doing something wrong??

Thanks in advance!

Regards,

Mauro

@jcustenborder
Copy link
Owner

Hi @mroiter-larus,

Kafka Connect is a little weird. It has it's own type system that is independent to how the data is serialized. For example a Struct is basically a row like structure. In Json this would be an object like your examples. In Avro it's a Record. So basically what is happening is this connector is using jackson to stream the file breaking by each object boundary. It hands this off as a string. If you were using only the string converter you would be done. In your example you're running a transformation so it looks like this SpoolDirSchemaLessJsonSourceConnector -> RenameField -> Converter It's breaking at the RenameField part. That SMT is saying that it doesn't support Strings. You have a couple options. You could use something like FromJson and parse the data by a json schema, then run your rename field. Another option would be convert it post with something like KSQL.

https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema/transformations/examples/FromJson.inline.html

@mroiter-larus
Copy link
Author

Hi @jcustenborder ,

Thanks a lot for your answer!

I tried the first approach you suggested (using FromJson). Actually it works, but doing so the JSON events, that were initially schemaless, are treated as they had schema (and i think this exactly what is expected by the FromJson transformation). I mean, the RenameField step is still running the applyWithSchema method and i would like to prevent this.

Is there a way to do the ReplaceField step so that the executed method should be the applySchemaless?

Thanks a lot!

@jcustenborder
Copy link
Owner

Off the top of my head I'm not sure. I don't know how much I would worry about it being schemaless or with schema. The converter could be Apache Kafka Json Converter which would remove the schema or you can use the JsonSchemaConverter I wrote which will attach the schema as a header. It's in the same project. That would give you json in your topic.

https://github.com/jcustenborder/kafka-connect-json-schema/blob/master/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants