Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't get Redis GeoAdd working #990

Closed
sebastienblanc opened this issue Sep 27, 2023 · 2 comments
Closed

Can't get Redis GeoAdd working #990

sebastienblanc opened this issue Sep 27, 2023 · 2 comments

Comments

@sebastienblanc
Copy link

What version of the Stream Reactor are you reporting this issue for?

4.2.0

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

yes

What is the expected behaviour?

I'm trying to insert this kafka record :

{"latitude":60.203144,"longitude":24.966624,"oper":"22","veh":"1190"}

into Redis with the kcql : INSERT INTO buses: SELECT veh from geo_test PK oper STOREAS GEOADD

What was observed?

An exception is throw right after startup of the connector

What is your connector properties configuration ?

{
    "connect.redis.kcql": "INSERT INTO buses: SELECT veh from geo_test PK oper STOREAS GeoAdd",
    "name": "redis-embeddings-devrel-sebastien.aivencloud.com",
    "connect.redis.host": "##",
    "connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector",
    "connect.redis.port": "22234",
    "connect.redis.password": "##",
    "errors.log.enable": "true",
    "connect.redis.ssl.enabled": "true",
    "connect.redis.error.policy": "NOOP",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.log.include.messages": "true",
    "connect.redis.retry.interval": "6000",
    "topics": "geo_test",
    "connect.redis.max.retries": "20",
    "value.converter.schemas.enable": "false"
}

Please provide full log files (redact and sensitive information)

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalArgumentException: requirement failed: ErrorTracker is not set call. Initialize.
	at scala.Predef$.require(Predef.scala:337)
	at com.datamountaineer.streamreactor.common.errors.ErrorHandler.handleTry(ErrorHandler.scala:48)
	at com.datamountaineer.streamreactor.common.errors.ErrorHandler.handleTry$(ErrorHandler.scala:47)
	at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisWriter.handleTry(RedisWriter.scala:31)
	at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisGeoAdd.$anonfun$insert$1(RedisGeoAdd.scala:117)
	at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisGeoAdd.$anonfun$insert$1$adapted(RedisGeoAdd.scala:54)
	at scala.collection.immutable.HashMap.foreach(HashMap.scala:1076)
	at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisGeoAdd.insert(RedisGeoAdd.scala:54)
	at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisGeoAdd.write(RedisGeoAdd.scala:49)
	at com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkTask.$anonfun$put$2(RedisSinkTask.scala:239)
	at com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkTask.$anonfun$put$2$adapted(RedisSinkTask.scala:239)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkTask.put(RedisSinkTask.scala:239)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
	... 11 more
@Ugbot
Copy link

Ugbot commented Sep 27, 2023

Also seen the same behavoir

@stheppi
Copy link
Collaborator

stheppi commented Apr 4, 2024

This has been fixed.Please use the latest 6.3.0 release.

@stheppi stheppi closed this as completed Apr 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants