Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impossible to sync 2(two) kafka topics to same cassandra table #284

Closed
mgergalov opened this issue Sep 26, 2017 · 2 comments
Closed

Impossible to sync 2(two) kafka topics to same cassandra table #284

mgergalov opened this issue Sep 26, 2017 · 2 comments

Comments

@mgergalov
Copy link

mgergalov commented Sep 26, 2017

It's impossible to sync two kafka topics(trades-a,trades-b) in one cassandra table (trades).
If i try to sync it's separately (for example trades-a to trades) it work perfect.

that is props for separetely attemp:

connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
connect.cassandra.key.space=markets
topics=TRADES-a
tasks.max=1
connect.cassandra.kcql=INSERT INTO trade SELECT * FROM TRADES-a;
connect.cassandra.fetch.size=5000
connect.cassandra.max.retries=20
connect.progress.enabled=false
connect.cassandra.error.policy=throw
connect.cassandra.contact.points=docker.for.mac.localhost
connect.cassandra.port=9042
connect.cassandra.threadpool.size=10
name=CassandraSinkConnector
connect.cassandra.retry.interval=30000
connect.progress.enabled=true
connect.cassandra.consistency.level=LOCAL_ONE
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter.schemas.enable=false

As I Say it’s work fine.
And if i delete this connector and create TRADES-b to trade connector it’s work fine too!

but If I try to create this

connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
connect.cassandra.key.space=markets
topics=TRADES-a,TRADES-b
tasks.max=1
connect.cassandra.kcql=INSERT INTO trade SELECT * FROM TRADES-a;INSERT INTO trade SELECT * FROM TRADES-b;
connect.cassandra.fetch.size=5000
connect.cassandra.max.retries=20
connect.progress.enabled=false
connect.cassandra.error.policy=throw
connect.cassandra.contact.points=docker.for.mac.localhost
connect.cassandra.port=9042
connect.cassandra.threadpool.size=10
name=CassandraSinkConnector
connect.cassandra.retry.interval=30000
connect.progress.enabled=true
connect.cassandra.consistency.level=LOCAL_ONE
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter.schemas.enable=false

It’s doesn’t work..

TRACE:

org.apache.kafka.connect.errors.ConnectException: **No tables found in Cassandra for topics trades,trades**
	at com.datamountaineer.streamreactor.connect.cassandra.utils.CassandraUtils$.checkCassandraTables(CassandraUtils.scala:61)
	at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.<init>(CassandraJsonWriter.scala:54)
	at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraWriter$.apply(CassandraWriter.scala:43)
	at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask.start(CassandraSinkTask.scala:58)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Strange log..

I repeat, the error is obtained only if I make the insertion into the same table from different topics.
If I do an insertion into different tables from different topics, this works well

@andrewstevenson
Copy link
Contributor

@mgergalov Give us 10 minutes, I see the issue.

andrewstevenson pushed a commit that referenced this issue Sep 26, 2017
A diff at startup was taken from the existing cassandra tables vs the kcql targets using a list instead of set.

fixes #284
@andrewstevenson
Copy link
Contributor

@mgergalov Please try the latest from master and reopen if required

cd kafka-connect-cassandra
gradle fatJar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants