Skip to content

Commit

Permalink
feat: add support for AWS glue schema registry (datahub-project#3083)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dexter Lee authored and Rahul Jain committed Aug 31, 2021
1 parent 5e1fb81 commit 5a0f85f
Show file tree
Hide file tree
Showing 17 changed files with 386 additions and 199 deletions.
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -35,6 +35,7 @@ project.ext.externalDependency = [
'assertJ': 'org.assertj:assertj-core:3.11.1',
'avro_1_7': 'org.apache.avro:avro:1.7.7',
'avroCompiler_1_7': 'org.apache.avro:avro-compiler:1.7.7',
'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.1',
'cacheApi' : 'javax.cache:cache-api:1.1.0',
'commonsIo': 'commons-io:commons-io:2.4',
'commonsLang': 'commons-lang:commons-lang:2.6',
Expand Down
86 changes: 84 additions & 2 deletions docs/deploy/aws.md
Expand Up @@ -247,7 +247,7 @@ You can also allow communication via HTTP (without SSL) by using the settings be
indexPrefix: demo
```

Lastly, you need to set the following env variable for **elasticsearchSetupJob**.
Lastly, you need to set the following env variable for **elasticsearchSetupJob**.

```
elasticsearchSetupJob:
Expand Down Expand Up @@ -282,4 +282,86 @@ kafka:
url: "http://prerequisites-cp-schema-registry:8081"
```

Run `helm install datahub datahub/ --values datahub/quickstart-values.yaml` to apply the changes.
Run `helm install datahub datahub/ --values datahub/quickstart-values.yaml` to apply the changes.

### AWS Glue Schema Registry

You can use AWS Glue schema registry instead of the kafka schema registry. To do so, first provision an AWS Glue schema
registry in the "Schema Registry" tab in the AWS Glue console page.

Once the registry is provisioned, you can change helm chart as follows.

```
kafka:
bootstrap:
...
zookeeper:
...
schemaregistry:
type: AWS_GLUE
glue:
region: <<AWS region of registry>>
registry: <<name of registry>>
```

Note, it will use the name of the topic as the schema name in the registry.

Before you update the pods, you need to give the k8s worker nodes the correct permissions to access the schema registry.

The minimum permissions required looks like this

```
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"glue:GetRegistry",
"glue:ListRegistries",
"glue:CreateSchema",
"glue:UpdateSchema",
"glue:GetSchema",
"glue:ListSchemas",
"glue:RegisterSchemaVersion",
"glue:GetSchemaByDefinition",
"glue:GetSchemaVersion",
"glue:GetSchemaVersionsDiff",
"glue:ListSchemaVersions",
"glue:CheckSchemaVersionValidity",
"glue:PutSchemaVersionMetadata",
"glue:QuerySchemaVersionMetadata"
],
"Resource": [
"arn:aws:glue:*:795586375822:schema/*",
"arn:aws:glue:us-west-2:795586375822:registry/demo-shared"
]
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"glue:GetSchemaVersion"
],
"Resource": [
"*"
]
}
]
}
```

The latter part is required to have "*" as the resource because of an issue in the AWS Glue schema registry library.
Refer to [this issue](https://github.com/awslabs/aws-glue-schema-registry/issues/68) for any updates.

Glue currently doesn't support AWS Signature V4. As such, we cannot use service accounts to give permissions to access
the schema registry. The workaround is to give the above permission to the EKS worker node's IAM role. Refer
to [this issue](https://github.com/awslabs/aws-glue-schema-registry/issues/69) for any updates.

Run `helm install datahub datahub/ --values datahub/quickstart-values.yaml` to apply the changes.

Note, you will be seeing log "Schema Version Id is null. Trying to register the schema" on every request. This log is
misleading, so should be ignored. Schemas are cached, so it does not register a new version on every request (aka no
performance issues). This has been fixed by [this PR](https://github.com/awslabs/aws-glue-schema-registry/pull/64) but
the code has not been released yet. We will update version once a new release is out.
1 change: 1 addition & 0 deletions gms/factories/build.gradle
Expand Up @@ -4,6 +4,7 @@ dependencies {
compile project(':metadata-io')
compile project(':gms:impl')
compile project(':metadata-dao-impl:kafka-producer')
compile externalDependency.awsGlueSchemaRegistrySerde
compile externalDependency.elasticSearchRest
compile externalDependency.httpClient
compile externalDependency.gson
Expand Down
@@ -0,0 +1,83 @@
package com.linkedin.gms.factory.kafka;

import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;


@Slf4j
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class})
public class KafkaEventConsumerFactory {

@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}")
private String kafkaBootstrapServers;

@Value("${SCHEMA_REGISTRY_TYPE:KAFKA}")
private String schemaRegistryType;

@Autowired
@Lazy
@Qualifier("kafkaSchemaRegistry")
private SchemaRegistryConfig kafkaSchemaRegistryConfig;

@Autowired
@Lazy
@Qualifier("awsGlueSchemaRegistry")
private SchemaRegistryConfig awsGlueSchemaRegistryConfig;

@Bean(name = "kafkaEventConsumer")
protected KafkaListenerContainerFactory<?> createInstance(KafkaProperties properties) {

KafkaProperties.Consumer consumerProps = properties.getConsumer();

// Specify (de)serializers for record keys and for record values.
consumerProps.setKeyDeserializer(StringDeserializer.class);
// Records will be flushed every 10 seconds.
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));

// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092

SchemaRegistryConfig schemaRegistryConfig;
if (schemaRegistryType.equals(KafkaSchemaRegistryFactory.TYPE)) {
schemaRegistryConfig = kafkaSchemaRegistryConfig;
} else {
schemaRegistryConfig = awsGlueSchemaRegistryConfig;
}

consumerProps.setValueDeserializer(schemaRegistryConfig.getDeserializer());
Map<String, Object> props = properties.buildConsumerProperties();
props.putAll(schemaRegistryConfig.getProperties());

ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));

log.info("Event-based KafkaListenerContainerFactory built successfully");

return factory;
}
}
@@ -1,44 +1,69 @@
package com.linkedin.gms.factory.common;
package com.linkedin.gms.factory.kafka;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import java.util.Arrays;
import java.util.Map;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;


@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class})
public class KafkaEventProducerFactory {

@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}")
private String kafkaBootstrapServers;

@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Value("${SCHEMA_REGISTRY_TYPE:KAFKA}")
private String schemaRegistryType;

@Autowired
@Lazy
@Qualifier("kafkaSchemaRegistry")
private SchemaRegistryConfig kafkaSchemaRegistryConfig;

@Autowired
@Lazy
@Qualifier("awsGlueSchemaRegistry")
private SchemaRegistryConfig awsGlueSchemaRegistryConfig;

@Bean(name = "kafkaEventProducer")
protected Producer<String, IndexedRecord> createInstance(KafkaProperties properties) {
KafkaProperties.Producer producerProps = properties.getProducer();

producerProps.setKeySerializer(StringSerializer.class);
producerProps.setValueSerializer(KafkaAvroSerializer.class);

// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092

SchemaRegistryConfig schemaRegistryConfig;
if (schemaRegistryType.equals(KafkaSchemaRegistryFactory.TYPE)) {
schemaRegistryConfig = kafkaSchemaRegistryConfig;
} else {
schemaRegistryConfig = awsGlueSchemaRegistryConfig;
}

Map<String, Object> props = properties.buildProducerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);

return new KafkaProducer(props);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName());
props.putAll(schemaRegistryConfig.getProperties());

return new KafkaProducer<>(props);
}
}
@@ -0,0 +1,55 @@
package com.linkedin.gms.factory.kafka;

import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory;
import java.time.Duration;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;


@Slf4j
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class})
public class SimpleKafkaConsumerFactory {

@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}")
private String kafkaBootstrapServers;

@Bean(name = "simpleKafkaConsumer")
protected KafkaListenerContainerFactory<?> createInstance(KafkaProperties properties) {

KafkaProperties.Consumer consumerProps = properties.getConsumer();

// Specify (de)serializers for record keys and for record values.
consumerProps.setKeyDeserializer(StringDeserializer.class);
consumerProps.setValueDeserializer(StringDeserializer.class);
// Records will be flushed every 10 seconds.
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));

// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092

ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties()));

log.info("Simple KafkaListenerContainerFactory built successfully");

return factory;
}
}
@@ -0,0 +1,41 @@
package com.linkedin.gms.factory.kafka.schemaregistry;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Slf4j
@Configuration
public class AwsGlueSchemaRegistryFactory {

public static final String TYPE = "AWS_GLUE";

@Value("${AWS_GLUE_SCHEMA_REGISTRY_REGION:us-east-1}")
private String awsRegion;
@Value("${AWS_GLUE_SCHEMA_REGISTRY_NAME:#{null}}")
private Optional<String> registryName;

@Bean(name = "awsGlueSchemaRegistry")
@Nonnull
protected SchemaRegistryConfig getInstance() {
Map<String, Object> props = new HashMap<>();
props.put(AWSSchemaRegistryConstants.AWS_REGION, awsRegion);
props.put(AWSSchemaRegistryConstants.DATA_FORMAT, "AVRO");
props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
registryName.ifPresent(s -> props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, s));
log.info("Creating AWS Glue registry");
return new SchemaRegistryConfig(GlueSchemaRegistryKafkaSerializer.class, GlueSchemaRegistryKafkaDeserializer.class,
props);
}
}

0 comments on commit 5a0f85f

Please sign in to comment.