diff --git a/docs/README.md b/docs/README.md index ea753403..31a55f8d 100644 --- a/docs/README.md +++ b/docs/README.md @@ -10,6 +10,7 @@ GRPC) * Instrumentation support with statsd * Log Sink * Bigquery Sink +* Redis Sink Depot is a sink connector, which acts as a bridge between data processing systems and real sink. The APIs in this library can be used to push data to various sinks. Common sinks implementations will be added in this repo. diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index c79dc5af..c71246e5 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -5,6 +5,7 @@ ## Sink * [Bigquery](sinks/bigquery.md) +* [Redis](sinks/redis.md) ## Reference diff --git a/docs/reference/configuration/redis.md b/docs/reference/configuration/redis.md new file mode 100644 index 00000000..22b296d2 --- /dev/null +++ b/docs/reference/configuration/redis.md @@ -0,0 +1,79 @@ +# Redis + +A Redis sink in Depot requires the following environment variables to be set along with Generic ones + +### `SINK_REDIS_URLS` + +REDIS server instance hostname/IP address followed by its port. + +- Example value: `localhost:6379,localhost:6380` +- Type: `required` + +### `SINK_REDIS_DATA_TYPE` + +To select whether you want to push your data as a `KEYVALUE`, `HASHSET` or as a `LIST` data type. + +- Example value: `Hashset` +- Type: `required` +- Default value: `Hashset` + +### `SINK_REDIS_KEY_TEMPLATE` + +The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key. + +- Example value: `Service\_%%s,1` + + This will take the value with index 1 from proto and create the Redis keys as per the template. + +- Type: `required` + +### `SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING` + +This is the field that decides what all data will be stored in the HashSet for each message. +- Example value: `{"order_number":"ORDER_NUMBER","event_timestamp":"TIMESTAMP"}` +- Type: `required (For Hashset)` + +### `SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME` + +This field decides what data will be stored in the value part of key-value pair + +- Example value: `customer_id` + + This will get the value of the field with name `customer_id` in your proto and push that to the Redis as value with the corresponding keyTemplate + +- Type: `required (For KeyValue)` + +### `SINK_REDIS_LIST_DATA_FIELD_NAME` + +This field decides what all data will be stored in the List for each message. + +- Example value: `customer_id` + + This will get the value of the field with name `customer_id` in your proto and push that to the Redis list with the corresponding keyTemplate + +- Type: `required (For List)` + +### `SINK_REDIS_TTL_TYPE` + +- Example value: `DURATION` +- Type: `optional` +- Default value: `DISABLE` +- Choice of Redis TTL type.It can be: + - `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\) + - `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired + +### `SINK_REDIS_TTL_VALUE` + +Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type. + +- Example value: `100000` +- Type: `optional` +- Default value: `0` + +### `SINK_REDIS_DEPLOYMENT_TYPE` + +The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types. + +- Example value: `Standalone` +- Type: `required` +- Default value: `Standalone` diff --git a/docs/roadmap.md b/docs/roadmap.md index 568d84b2..1eef242e 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -12,7 +12,7 @@ If you have feedback about the roadmap section itself, such as how the issues are presented, let us know through [discussions](https://github.com/odpf/depot/discussions). -## depot 0.1.4 +## Depot 0.1.4 ### Feature enhancements diff --git a/docs/sinks/redis.md b/docs/sinks/redis.md new file mode 100644 index 00000000..7b61e479 --- /dev/null +++ b/docs/sinks/redis.md @@ -0,0 +1,14 @@ +# Redis Sink + +### Data Types +Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](../reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List + - `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](../reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping) + - `List`: For each message entry of the format `key : value` is generated and pushed to Redis. The value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](../reference/configuration/redis.md#sink_redis_list_data_field_name) + - `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. The value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](../reference/configuration/redis.md#sink_redis_key_value_data_field_name) + +The `key` is picked up from a field in the message itself. + +Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now. + +### Deployment Types +Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`. diff --git a/src/main/java/io/odpf/depot/config/RedisSinkConfig.java b/src/main/java/io/odpf/depot/config/RedisSinkConfig.java index cdd4731c..dfe0eb57 100644 --- a/src/main/java/io/odpf/depot/config/RedisSinkConfig.java +++ b/src/main/java/io/odpf/depot/config/RedisSinkConfig.java @@ -39,9 +39,6 @@ public interface RedisSinkConfig extends OdpfSinkConfig { @ConverterClass(RedisSinkDeploymentTypeConverter.class) RedisSinkDeploymentType getSinkRedisDeploymentType(); - @Key("SINK_REDIS_LIST_DATA_PROTO_INDEX") - String getSinkRedisListDataProtoIndex(); - @Key("SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME") String getSinkRedisKeyValueDataFieldName(); diff --git a/src/main/java/io/odpf/depot/redis/RedisSinkFactory.java b/src/main/java/io/odpf/depot/redis/RedisSinkFactory.java index d8132ebb..42ffc6e3 100644 --- a/src/main/java/io/odpf/depot/redis/RedisSinkFactory.java +++ b/src/main/java/io/odpf/depot/redis/RedisSinkFactory.java @@ -36,14 +36,26 @@ public RedisSinkFactory(RedisSinkConfig sinkConfig) { public void init() throws IOException { Instrumentation instrumentation = new Instrumentation(statsDReporter, RedisSinkFactory.class); - String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.type = %s" - + "\n\tredis.list.data.proto.index = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d", + String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.data.type = %s" + + "\n\tredis.deployment.type = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d\n\t", sinkConfig.getSinkRedisUrls(), sinkConfig.getSinkRedisKeyTemplate(), sinkConfig.getSinkRedisDataType().toString(), - sinkConfig.getSinkRedisListDataProtoIndex(), + sinkConfig.getSinkRedisDeploymentType().toString(), sinkConfig.getSinkRedisTtlType().toString(), sinkConfig.getSinkRedisTtlValue()); + switch (sinkConfig.getSinkRedisDataType()) { + case LIST: + redisConfig += "redis.list.data.field.name=" + sinkConfig.getSinkRedisListDataFieldName(); + break; + case KEYVALUE: + redisConfig += "redis.keyvalue.data.field.name=" + sinkConfig.getSinkRedisKeyValueDataFieldName(); + break; + case HASHSET: + redisConfig += "redis.hashset.field.to.column.mapping=" + sinkConfig.getSinkRedisHashsetFieldToColumnMapping().toString(); + break; + default: + } instrumentation.logInfo(redisConfig); instrumentation.logInfo("Redis server type = {}", sinkConfig.getSinkRedisDeploymentType()); OdpfMessageParser messageParser = OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter);