Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Capability in dagger to consume from ACL enabled kafka clusters #195

Merged
merged 14 commits into from Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,6 +1,8 @@
package io.odpf.dagger.core.source.config;

import com.google.gson.annotations.JsonAdapter;
import io.odpf.dagger.core.source.config.adapter.DaggerSASLMechanismAdaptor;
import io.odpf.dagger.core.source.config.adapter.DaggerSecurityProtocolAdaptor;
import io.odpf.dagger.core.source.config.adapter.FileDateRangeAdaptor;
import io.odpf.dagger.core.source.config.adapter.SourceParquetFilePathsAdapter;
import io.odpf.dagger.core.source.config.models.SourceDetails;
Expand Down Expand Up @@ -66,6 +68,20 @@ public class StreamConfig {
@Getter
private String bootstrapServers;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL_KEY)
@Getter
@JsonAdapter(value = DaggerSecurityProtocolAdaptor.class)
private String securityProtocol;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM_KEY)
@JsonAdapter(value = DaggerSASLMechanismAdaptor.class)
@Getter
private String saslMechanism;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG_KEY)
@Getter
private String saslJaasConfig;

@SerializedName(STREAM_INPUT_STREAM_NAME_KEY)
Meghajit marked this conversation as resolved.
Show resolved Hide resolved
@Getter
private String kafkaName;
Expand Down
@@ -0,0 +1,31 @@
package io.odpf.dagger.core.source.config.adapter;

import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import io.odpf.dagger.core.exception.InvalidConfigurationException;
import io.odpf.dagger.core.utils.Constants;

import java.io.IOException;
import java.util.Arrays;

public class DaggerSASLMechanismAdaptor extends TypeAdapter<String> {
@Override
public void write(JsonWriter jsonWriter, String value) throws IOException {
if (value == null) {
jsonWriter.nullValue();
return;
}
jsonWriter.value(value);
}

@Override
public String read(JsonReader jsonReader) throws IOException {
String saslMechanism = jsonReader.nextString();
if (Arrays.stream(Constants.SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM).anyMatch(saslMechanism::equals)) {
return saslMechanism;
} else {
throw new InvalidConfigurationException(String.format("Configured wrong SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM supported values are %s", Arrays.toString(Constants.SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM)));
}
}
}
@@ -0,0 +1,31 @@
package io.odpf.dagger.core.source.config.adapter;

import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import io.odpf.dagger.core.exception.InvalidConfigurationException;
import io.odpf.dagger.core.utils.Constants;

import java.io.IOException;
import java.util.Arrays;

public class DaggerSecurityProtocolAdaptor extends TypeAdapter<String> {
@Override
public void write(JsonWriter jsonWriter, String value) throws IOException {
if (value == null) {
jsonWriter.nullValue();
return;
}
jsonWriter.value(value);
}

@Override
public String read(JsonReader jsonReader) throws IOException {
String securityProtocol = jsonReader.nextString();
if (Arrays.stream(Constants.SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL).anyMatch(securityProtocol::equals)) {
return securityProtocol;
} else {
throw new InvalidConfigurationException(String.format("Configured wrong SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL supported values are %s", Arrays.toString(Constants.SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL)));
}
}
}
Expand Up @@ -123,7 +123,9 @@ public class Constants {
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS";

public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG";

public static final String METRIC_TELEMETRY_ENABLE_KEY = "METRIC_TELEMETRY_ENABLE";
public static final boolean METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT = true;
Expand Down Expand Up @@ -172,4 +174,7 @@ public enum ExternalPostProcessorVariableType { REQUEST_VARIABLES, HEADER_VARIAB
// Comma seperated error types
public static final String SINK_ERROR_TYPES_FOR_FAILURE = "SINK_ERROR_TYPES_FOR_FAILURE";
public static final String SINK_ERROR_TYPES_FOR_FAILURE_DEFAULT = "";

public static final String[] SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL = {"SASL_PLAINTEXT", "SASL_SSL"};
public static final String[] SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM = {"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"};
}
Expand Up @@ -114,6 +114,58 @@ public void shouldParseKafkaProperties() {
assertEquals(properties, streamConfigs[0].getKafkaProps(configuration));
}

@Test
public void shouldParseKafkaPropertiesWithSASLConfig() {
when(configuration.getString(INPUT_STREAMS, "")).thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" } ]");
when(configuration.getBoolean(SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_KEY, SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_DEFAULT)).thenReturn(false);
StreamConfig[] streamConfigs = StreamConfig.parse(configuration);

HashMap<String, String> kafkaPropMap = new HashMap<>();
kafkaPropMap.put("group.id", "dummy-consumer-group");
kafkaPropMap.put("bootstrap.servers", "localhost:9092");
kafkaPropMap.put("auto.offset.reset", "latest");
kafkaPropMap.put("auto.commit.enable", "");
kafkaPropMap.put("sasl.mechanism", "SCRAM-SHA-512");
kafkaPropMap.put("security.protocol", "SASL_PLAINTEXT");
kafkaPropMap.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");

Properties properties = new Properties();
properties.putAll(kafkaPropMap);

assertEquals(properties, streamConfigs[0].getKafkaProps(configuration));
}

@Test
public void shouldParseMultipleStreamsFromStreamConfigWithSASLConfig() {
when(configuration.getString(INPUT_STREAMS, "")).thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" } ]");
StreamConfig[] streamConfigs = StreamConfig.parse(configuration);

assertEquals(2, streamConfigs.length);

StreamConfig currConfig = streamConfigs[0];
assertEquals("false", currConfig.getAutoCommitEnable());
assertEquals("latest", currConfig.getAutoOffsetReset());
assertEquals("PROTO", currConfig.getDataType());
assertEquals("dummy-consumer-group", currConfig.getConsumerGroupId());
assertEquals("41", currConfig.getEventTimestampFieldIndex());
assertEquals("test-topic", currConfig.getKafkaTopicNames());
assertEquals("data_stream", currConfig.getSchemaTable());
assertEquals("local-kafka-stream", currConfig.getKafkaName());
assertEquals("SCRAM-SHA-512", currConfig.getSaslMechanism());
assertEquals("SASL_PLAINTEXT", currConfig.getSecurityProtocol());
assertEquals("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";", currConfig.getSaslJaasConfig());

StreamConfig currConfigNext = streamConfigs[1];
assertEquals("true", currConfigNext.getAutoCommitEnable());
assertEquals("latest", currConfigNext.getAutoOffsetReset());
assertEquals("JSON", currConfigNext.getDataType());
assertEquals("dummy-consumer-group", currConfigNext.getConsumerGroupId());
assertEquals("41", currConfigNext.getEventTimestampFieldIndex());
assertEquals("test-topic", currConfigNext.getKafkaTopicNames());
assertEquals("data_stream_1", currConfigNext.getSchemaTable());
assertEquals("local-kafka-stream", currConfigNext.getKafkaName());
}

@Test
public void shouldAddAdditionalKafkaConfigToKafkaProperties() {
when(configuration.getString(INPUT_STREAMS, "")).thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" } ]");
Expand Down
@@ -0,0 +1,37 @@
package io.odpf.dagger.core.source.config.adapter;

import com.google.gson.stream.JsonReader;
import io.odpf.dagger.core.exception.InvalidConfigurationException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

public class DaggerSASLMechanismAdaptorTest {
@Mock
private JsonReader jsonReader;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldAcceptConfiguredValue() throws IOException {
when(jsonReader.nextString()).thenReturn("SCRAM-SHA-512");
DaggerSASLMechanismAdaptor daggerSASLMechanismAdaptor = new DaggerSASLMechanismAdaptor();
String saslMechanism = daggerSASLMechanismAdaptor.read(jsonReader);
assertEquals("SCRAM-SHA-512", saslMechanism);
}

@Test
public void shouldNotAcceptConfiguredValue() throws IOException {
when(jsonReader.nextString()).thenReturn("SCRAMSHA512");
DaggerSASLMechanismAdaptor daggerSASLMechanismAdaptor = new DaggerSASLMechanismAdaptor();
assertThrows(InvalidConfigurationException.class, () -> daggerSASLMechanismAdaptor.read(jsonReader));
}
}
@@ -0,0 +1,41 @@
package io.odpf.dagger.core.source.config.adapter;

import com.google.gson.stream.JsonReader;
import io.odpf.dagger.core.exception.InvalidConfigurationException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.io.IOException;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

public class DaggerSecurityProtocolAdaptorTest {

@Mock
private JsonReader jsonReader;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldAcceptConfiguredValue() throws IOException {
when(jsonReader.nextString()).thenReturn("SASL_PLAINTEXT");
DaggerSecurityProtocolAdaptor daggerSecurityProtocolAdaptor = new DaggerSecurityProtocolAdaptor();
String securityProtocol = daggerSecurityProtocolAdaptor.read(jsonReader);
assertEquals("SASL_PLAINTEXT", securityProtocol);
}

@Test
public void shouldNotAcceptConfiguredValue() throws IOException {
when(jsonReader.nextString()).thenReturn("SASLPLAINTEXT");
DaggerSecurityProtocolAdaptor daggerSecurityProtocolAdaptor = new DaggerSecurityProtocolAdaptor();
assertThrows(InvalidConfigurationException.class, () -> daggerSecurityProtocolAdaptor.read(jsonReader));
}
}