Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Improvements #3189

Closed
wants to merge 8 commits into from
Closed

Conversation

simplesteph
Copy link

@simplesteph simplesteph commented Apr 11, 2017

Upgraded Kafka dependencies to 0.10.2,
Added dynamic configuration of producer for scala collector, which enables security configs to be passed (motivation of this PR)
Fixed configuration as well

this fixes #2973
this fixes #2974
this supersedes #3177
this fixes #3173
this fixes #2994

Fixed Kafka configuration for the Producer as well.
Added tests
Fully ready to enable security

This breaks the current recommended Kafka configuration, but as it was declared being in a "beta" state, I believe this is fine. The new config will be much more intuitive to Kafka adepts.

@simplesteph simplesteph changed the title [WIP] Kafka Improvements Kafka Improvements Apr 11, 2017
@simplesteph
Copy link
Author

Alright done. I left a few goodies in there, including a revamp of the sample config using the true semantics from typesafe config (instead of {{ }} which definitely won't work).

Let me know your review, but I'd love to see this merged for the next release so I don't have to maintain my own Snowplow in the meantime.

The whole purpose of this was to fix a few issues and enable security configurations for Kafka

@snowplowcla
Copy link

@simplesteph has signed the Individual Contributor License Agreement

@simplesteph
Copy link
Author

Just realized I'm addressing #2994 as well.
This jaas can be set by properties sasl.jaas.config as of kafka clients 0.10.2 (https://kafka.apache.org/documentation/#security_client_dynamicjaas)

The expected jaas is

KafkaClient {
    com.ibm.messagehub.login.MessageHubLoginModule required
    serviceName="kafka"
    username="username"
    password="password";
};

@simplesteph
Copy link
Author

@alexanderdean @jbeemster have you had a chance to look at this?

@simplesteph
Copy link
Author

@alexanderdean @jbeemster ping

@simplesteph
Copy link
Author

@BenFradet feel free to use what you like in this for your R92, but this should cover a bunch of stuff

@BenFradet
Copy link
Contributor

will do 👍

@BenFradet BenFradet added this to the R92 Virunum (Stream refresh) milestone Aug 9, 2017
@BenFradet BenFradet self-assigned this Aug 10, 2017
@BenFradet BenFradet self-requested a review August 10, 2017 09:26
Copy link
Contributor

@BenFradet BenFradet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, please keep your changes to a minimum by only fixing the issues described in the tickets you are referencing.

time-limit: {{collectorSinkBufferTimeThreshold}}
byte-limit = 4000000
record-limit = 500
time-limit = 60000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you leave the config as is?

@@ -33,7 +33,7 @@ object Dependencies {
val awsSdk = "1.6.10"
val yodaTime = "2.1"
val yodaConvert = "1.2"
val kafka = "0.10.1.0"
val kafka = "0.10.2.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be handled in a separate ticket

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BenFradet no not really. If you don't support this upgrade, you can't support security by passing the jaas conf as a property from producerProps (sasl.jaas.config added in 10.2)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that there is a ticket dedicated to the upgrade and it's separate from the tickets you're trying to solve (e.g. #3325).

Adding the security feature is a separate concern from updating the library.

merged
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be overhauled in a separate ticket

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's important that it's this there if you intend to allow dynamic settings and therefore support security

val props = new Properties()
props.put("acks", "all")
props.put("buffer.memory", byteLimit.toString)
props.put("batch.size", recordLimit.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't translate, same as in the other PR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah true, that's bytes. byteLimit.toString then, right? And buffer.memory removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any current buffer setting applies to batch.size, and it doesn't make sense to have a batch.size equals to buffer.memory.

props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep your changes to a minimum and modify the configuration where it originally was, ie here

method = "GET"
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming a typesafe config with .sample is invalid. It messes with the compiler, and other stuff when you pass configs as arguments to your programs (especially when using docker java base image). To prevent users from making the same mistake that took me about 4 hours to solve, after debugging typesafe config itself, I'd rather rename this to config.sample.conf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean but this is the convention that has been adopted throughout all projects in Snowplow so we might as well keep things coherent.

@@ -22,7 +22,7 @@ object BuildSettings {
organization := "com.snowplowanalytics",
version := "0.10.0",
description := "The Snowplow Enrichment process, implemented as an Amazon Kinesis app",
scalaVersion := "2.10.1",
scalaVersion := "2.10.5",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be handled separately

@@ -35,7 +35,7 @@ object Dependencies {
val slf4j = "1.7.5"
val awsSdk = "1.6.11"
val kinesisClient = "1.6.1"
val kafkaClients = "0.10.1.0"
val kafkaClients = "0.10.2.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

props.putAll(map.filterNot(elem => blacklist.contains(elem._1)))
props
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please modify the configuration in place

}
}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necesarry either

@simplesteph
Copy link
Author

Hi @BenFradet , thanks for the feedback. I understand this may be more than the tickets being fixed. I don't use Snowplow anymore unfortunately, so I won't be editing this PR further. I'm happy to give you control of my branch if you want to do further commits on this branch, or you can cherry pick what you need in your own PR.
The idea was

  1. to add dynamic config (which allows security)
  2. properly configure Kafka defaults (minus the batch.size param)
  3. refactor the config for extensibility.

Good luck, hope that helped!

@BenFradet BenFradet removed this from the R92 Virunum (Stream refresh) milestone Aug 11, 2017
@BenFradet BenFradet removed their assignment Aug 11, 2017
@BenFradet
Copy link
Contributor

@simplesteph works for me 👍 , closing.

@BenFradet BenFradet closed this Aug 11, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment