Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ GRPC)
* Instrumentation support with statsd
* Log Sink
* Bigquery Sink
* Redis Sink

Depot is a sink connector, which acts as a bridge between data processing systems and real sink. The APIs in this
library can be used to push data to various sinks. Common sinks implementations will be added in this repo.
Expand Down
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

## Sink
* [Bigquery](sinks/bigquery.md)
* [Redis](sinks/redis.md)

## Reference

Expand Down
79 changes: 79 additions & 0 deletions docs/reference/configuration/redis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Redis

A Redis sink in Depot requires the following environment variables to be set along with Generic ones

### `SINK_REDIS_URLS`

REDIS server instance hostname/IP address followed by its port.

- Example value: `localhost:6379,localhost:6380`
- Type: `required`

### `SINK_REDIS_DATA_TYPE`

To select whether you want to push your data as a `KEYVALUE`, `HASHSET` or as a `LIST` data type.

- Example value: `Hashset`
- Type: `required`
- Default value: `Hashset`

### `SINK_REDIS_KEY_TEMPLATE`

The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key.

- Example value: `Service\_%%s,1`

This will take the value with index 1 from proto and create the Redis keys as per the template.

- Type: `required`

### `SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`

This is the field that decides what all data will be stored in the HashSet for each message.
- Example value: `{"order_number":"ORDER_NUMBER","event_timestamp":"TIMESTAMP"}`
- Type: `required (For Hashset)`

### `SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`

This field decides what data will be stored in the value part of key-value pair

- Example value: `customer_id`

This will get the value of the field with name `customer_id` in your proto and push that to the Redis as value with the corresponding keyTemplate

- Type: `required (For KeyValue)`

### `SINK_REDIS_LIST_DATA_FIELD_NAME`

This field decides what all data will be stored in the List for each message.

- Example value: `customer_id`

This will get the value of the field with name `customer_id` in your proto and push that to the Redis list with the corresponding keyTemplate

- Type: `required (For List)`

### `SINK_REDIS_TTL_TYPE`

- Example value: `DURATION`
- Type: `optional`
- Default value: `DISABLE`
- Choice of Redis TTL type.It can be:
- `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\)
- `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired

### `SINK_REDIS_TTL_VALUE`

Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type.

- Example value: `100000`
- Type: `optional`
- Default value: `0`

### `SINK_REDIS_DEPLOYMENT_TYPE`

The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types.

- Example value: `Standalone`
- Type: `required`
- Default value: `Standalone`
2 changes: 1 addition & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ If you have feedback about the roadmap section itself,
such as how the issues are presented,
let us know through [discussions](https://github.com/odpf/depot/discussions).

## depot 0.1.4
## Depot 0.1.4

### Feature enhancements

Expand Down
14 changes: 14 additions & 0 deletions docs/sinks/redis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Redis Sink

### Data Types
Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](../reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List
- `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](../reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping)
- `List`: For each message entry of the format `key : value` is generated and pushed to Redis. The value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](../reference/configuration/redis.md#sink_redis_list_data_field_name)
- `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. The value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](../reference/configuration/redis.md#sink_redis_key_value_data_field_name)

The `key` is picked up from a field in the message itself.

Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now.

### Deployment Types
Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`.
3 changes: 0 additions & 3 deletions src/main/java/io/odpf/depot/config/RedisSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ public interface RedisSinkConfig extends OdpfSinkConfig {
@ConverterClass(RedisSinkDeploymentTypeConverter.class)
RedisSinkDeploymentType getSinkRedisDeploymentType();

@Key("SINK_REDIS_LIST_DATA_PROTO_INDEX")
String getSinkRedisListDataProtoIndex();

@Key("SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME")
String getSinkRedisKeyValueDataFieldName();

Expand Down
18 changes: 15 additions & 3 deletions src/main/java/io/odpf/depot/redis/RedisSinkFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,26 @@ public RedisSinkFactory(RedisSinkConfig sinkConfig) {

public void init() throws IOException {
Instrumentation instrumentation = new Instrumentation(statsDReporter, RedisSinkFactory.class);
String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.type = %s"
+ "\n\tredis.list.data.proto.index = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d",
String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.data.type = %s"
+ "\n\tredis.deployment.type = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d\n\t",
sinkConfig.getSinkRedisUrls(),
sinkConfig.getSinkRedisKeyTemplate(),
sinkConfig.getSinkRedisDataType().toString(),
sinkConfig.getSinkRedisListDataProtoIndex(),
sinkConfig.getSinkRedisDeploymentType().toString(),
sinkConfig.getSinkRedisTtlType().toString(),
sinkConfig.getSinkRedisTtlValue());
switch (sinkConfig.getSinkRedisDataType()) {
case LIST:
redisConfig += "redis.list.data.field.name=" + sinkConfig.getSinkRedisListDataFieldName();
break;
case KEYVALUE:
redisConfig += "redis.keyvalue.data.field.name=" + sinkConfig.getSinkRedisKeyValueDataFieldName();
break;
case HASHSET:
redisConfig += "redis.hashset.field.to.column.mapping=" + sinkConfig.getSinkRedisHashsetFieldToColumnMapping().toString();
break;
default:
}
instrumentation.logInfo(redisConfig);
instrumentation.logInfo("Redis server type = {}", sinkConfig.getSinkRedisDeploymentType());
OdpfMessageParser messageParser = OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter);
Expand Down