Skip to content

Commit

Permalink
Allow externalizing AWS credentials
Browse files Browse the repository at this point in the history
  • Loading branch information
lenny committed Nov 17, 2018
1 parent fd5f1fb commit 028a00e
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 45 deletions.
6 changes: 5 additions & 1 deletion config/AwsLambdaSinkConnector.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ connector.class=com.tm.kafka.connect.aws.lambda.AwsLambdaSinkConnector

aws.region=us-west-2
aws.function.name=kafka-aws-lambda-test
aws.credentials.provider.class=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
aws.lambda.payload.converter.class=com.tm.kafka.connect.aws.lambda.converter.JsonPayloadConverter
# aws.lambda.payload.converter.class=com.tm.kafka.connect.aws.lambda.converter.DefaultPayloadConverter
# retry.backoff.ms=5000
# aws.lambda.invoke.async=RequestResponse
# aws.lambda.invoke.async=Event
# aws.lambda.invoke.async=DryRun

# aws.credentials.provider.class=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
aws.credentials.provider.class=com.tm.kafka.connect.aws.lambda.ConfigurationAWSCredentialsProvider
aws.credentials.provider.aws.access.key.id=${file:/root/.aws/credentials:aws_access_key_id}
aws.credentials.provider.aws.secret.access.key=${file:/root/.aws/credentials:aws_secret_access_key}
5 changes: 5 additions & 0 deletions config/connect-json-docker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ offset.storage.file.filename=/tmp/connect.offsets
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
config.providers.file.param.secrets=/root/.aws/credentials
config.reload.action=restart
84 changes: 46 additions & 38 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
version: "2"
services:

zookeeper:
image: confluentinc/cp-zookeeper:3.2.2-1
image: confluentinc/cp-zookeeper:5.0.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
zk_id: "1"

kafka:
hostname: kafka
image: confluentinc/cp-kafka:3.2.2-1
image: confluentinc/cp-kafka:5.0.1
container_name: kafka
links:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./:/data
- ~/.aws:/root/.aws
schema_registry:
hostname: schema_registry
image: confluentinc/cp-schema-registry:3.2.2-1

schema-registry:
hostname: schema-registry
image: confluentinc/cp-schema-registry:5.0.1
container_name: schema-registry
links:
- kafka
- zookeeper
Expand All @@ -29,36 +36,37 @@ services:
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
SCHEMA_REGISTRY_HOST_NAME: schema-registry
connect:
image: confluentinc/cp-kafka-connect
# network_mode: host
hostname: connect
depends_on:
- zookeeper
- kafka
- schema_registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_PLUGIN_PATH: /usr/share/java
volumes:
- ./:/data
- ~/.aws:/root/.aws

# connect:
# hostname: connect
# image: confluentinc/cp-kafka-connect:5.0.1
# container_name: connect
# depends_on:
# - zookeeper
# - kafka
# - schema-registry
# ports:
# - "8083:8083"
# environment:
# CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
# CONNECT_REST_ADVERTISED_HOST_NAME: connect
# CONNECT_REST_PORT: 8083
# CONNECT_GROUP_ID: compose-connect-group
# CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
# CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
# CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
# CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
# CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
# CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
# CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# CONNECT_PLUGIN_PATH: /usr/share/java
# volumes:
# - ./:/data
# - ~/.aws:/root/.aws
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>
<version>1.11.447</version>
<version>1.11.452</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.tm.kafka.connect.aws.lambda.converter.JsonPayloadConverter;
import com.tm.kafka.connect.aws.lambda.converter.SinkRecordToPayloadConverter;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down Expand Up @@ -41,6 +42,16 @@ public class AwsLambdaSinkConnectorConfig extends AbstractConfig {
+ "the connector uses 'DefaultAWSCredentialsProviderChain'.";
private static final String CREDENTIALS_PROVIDER_DISPLAY_CONFIG = "AWS Credentials Provider Class";

/**
* The properties that begin with this prefix will be used to configure a class, specified by
* {@code s3.credentials.provider.class} if it implements {@link Configurable}.
*/
public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX =
CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(
0,
CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1
);

static final String FUNCTION_NAME_CONFIG = "aws.function.name";
private static final String FUNCTION_NAME_DOC = "The AWS Lambda function name.";
private static final String FUNCTION_NAME_DISPLAY = "AWS Lambda function Name";
Expand Down Expand Up @@ -165,8 +176,14 @@ public String getAwsRegion() {
@SuppressWarnings("unchecked")
public AWSCredentialsProvider getAwsCredentialsProvider() {
try {
return ((Class<? extends AWSCredentialsProvider>)
AWSCredentialsProvider awsCredentialsProvider = ((Class<? extends AWSCredentialsProvider>)
getClass(CREDENTIALS_PROVIDER_CLASS_CONFIG)).getDeclaredConstructor().newInstance();
if (awsCredentialsProvider instanceof Configurable) {
Map<String, Object> configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX);
configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()));
((Configurable) awsCredentialsProvider).configure(configs);
}
return awsCredentialsProvider;
} catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) {
throw new ConnectException("Invalid class for: " + CREDENTIALS_PROVIDER_CLASS_CONFIG, e);
}
Expand Down Expand Up @@ -218,8 +235,7 @@ public String toString() {
private static class CredentialsProviderValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object provider) {
if (provider != null && provider instanceof Class
&& AWSCredentialsProvider.class.isAssignableFrom((Class<?>) provider)) {
if (provider instanceof Class && AWSCredentialsProvider.class.isAssignableFrom((Class<?>) provider)) {
return;
}
throw new ConfigException(name, provider, "Class must extend: " + AWSCredentialsProvider.class);
Expand Down Expand Up @@ -275,8 +291,7 @@ public boolean visible(String name, Map<String, Object> connectorConfigs) {
private static class PayloadConverterValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object provider) {
if (provider != null && provider instanceof Class
&& SinkRecordToPayloadConverter.class.isAssignableFrom((Class<?>) provider)) {
if (provider instanceof Class && SinkRecordToPayloadConverter.class.isAssignableFrom((Class<?>) provider)) {
return;
}
throw new ConfigException(name, provider, "Class must extend: " + SinkRecordToPayloadConverter.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.tm.kafka.connect.aws.lambda;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.kafka.common.Configurable;

import java.util.Map;

public class ConfigurationAWSCredentialsProvider implements AWSCredentialsProvider, Configurable {

private static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id";
private static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key";

private AWSCredentials awsCredentials;

@Override
public AWSCredentials getCredentials() {
org.apache.kafka.common.config.provider.FileConfigProvider p;
return awsCredentials;
}

@Override
public void refresh() {

}

@Override
public void configure(final Map<String, ?> configs) {
awsCredentials = new AWSCredentials() {
private final String key = (String) configs.get(AWS_ACCESS_KEY_ID_CONFIG);
private final String secret = (String) configs.get(AWS_SECRET_ACCESS_KEY_CONFIG);

@Override
public String getAWSAccessKeyId() {
return key;
}

@Override
public String getAWSSecretKey() {
return secret;
}
};
}
}

0 comments on commit 028a00e

Please sign in to comment.