-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Starting new code for Producer #38
Conversation
@@ -1,8 +1,5 @@ | |||
package kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.neo4j.streams.kafka
log.debug("Event with txId ${event.meta.txId} sent successfully") | ||
} catch (e: ProducerFencedException) { | ||
log.error("Error:", e) | ||
producer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if we call producer.close() on the next event?
Will we recover? And how?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps just abort the tx
producer.commitTransaction() | ||
log.debug("Event with txId ${event.meta.txId} sent successfully") | ||
} catch (e: ProducerFencedException) { | ||
log.error("Error:", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better error messages not just "Error"
log.error("Error:", e) | ||
producer.close() | ||
} catch (e: AuthorizationException) { | ||
log.error("Error:", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auth exception should have happend on initial connect or?
try { | ||
log.debug("Trying to send the event with txId ${event.meta.txId} to kafka") | ||
producer.beginTransaction() | ||
events.forEach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should test this with larger amounts of data, e.g. creating 1M nodes and relationships in one tx
UNWIND range(1,100000) as id create (n:Node {id:id, name:"name "+id})-[:REL {since:id}]->(n);
Needs about 3.5G heap for the transaction.
return object : LifecycleAdapter() { | ||
|
||
override fun start() { | ||
db.registerTransactionEventHandler(StreamsTransactionEventHandler(StreamsDataEventRouter())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
store the handler in a var and on stop remove it from the listener
import org.apache.kafka.common.internals.Topic | ||
import streams.events.* | ||
|
||
object RoutingConfigurationConstants { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passed in from the outside e.g. in the handler for kafka
@@ -25,6 +22,7 @@ class KafkaExtensionFactory : KernelExtensionFactory<KafkaExtensionFactory.Depen | |||
|
|||
return object : LifecycleAdapter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the old code
Moved RoutingConfigurationConstants into KafkaConfiguration
"type": "relationship", | ||
"name": "KNOWS", | ||
"label": "KNOWS", | ||
"before": { | ||
"properties": { | ||
"since": "2018-04-05T12:34:00[Europe/Berlin]" | ||
}, | ||
"startNode": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed to change "start" and "end" in these files :)
In order to be more testable and maintainable