Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
a709abc
Initial commit: add class diagram
eyeofvinay Jul 26, 2022
c1a6adc
move diagram to docs/sinks
eyeofvinay Jul 26, 2022
ed4fa0e
Add config files
eyeofvinay Jul 27, 2022
244d3ee
Add files for KeyValue mode support
eyeofvinay Jul 30, 2022
1b75c65
Refactor: follow checkstyle
eyeofvinay Jul 31, 2022
e2d5a42
Add and use REDIS_VALUE_BY_NAME property
eyeofvinay Aug 1, 2022
d84d669
Add logic for parsing key templates
eyeofvinay Aug 1, 2022
ab78c09
Use JsonPath for nested Proto and Json field by name support
eyeofvinay Aug 4, 2022
3f9ac42
add config SINK_DEFAULT_DATATYPE_STRING_ENABLE, add getFieldByName() …
eyeofvinay Aug 4, 2022
eb125f5
Fix instrumentation, error handling
eyeofvinay Aug 5, 2022
741a4b9
Add logic to extract nested Proto fields
eyeofvinay Aug 8, 2022
584f76b
Simplify getFieldByName for proto, follow consistent convention for j…
eyeofvinay Aug 8, 2022
141b913
Optimization: cache schema mapping
eyeofvinay Aug 9, 2022
7818173
Ref: Move Json parsing code to statis util method
eyeofvinay Aug 9, 2022
565d8d7
Fix getMapping caching, move common code to RedisParser from RedisKey…
eyeofvinay Aug 10, 2022
7ec92b1
Add List parser module
eyeofvinay Aug 10, 2022
895c14e
Add Hashset parser module
eyeofvinay Aug 10, 2022
63a0d74
Consistent key template formatting
eyeofvinay Aug 11, 2022
9a76054
Add tests for ttl, dataentry, parsers packages
eyeofvinay Aug 15, 2022
932d440
Add test for JsonUtils class
eyeofvinay Aug 15, 2022
f0195f0
Minor refactor, fix tests
eyeofvinay Aug 16, 2022
77b27e4
Add tests for redis/client package
eyeofvinay Aug 16, 2022
ba004da
Add tests for redis/parsers package
eyeofvinay Aug 16, 2022
85c01a5
Fix RedisHashSetParserTest
eyeofvinay Aug 17, 2022
2cf591a
chore: review changes
lavkesh Aug 17, 2022
29a107d
test: add test for parseKeyTemplate(), fix test after review changes
eyeofvinay Aug 17, 2022
dc709bc
fix client tests
eyeofvinay Aug 18, 2022
af1cef9
feat: added response parsing
lavkesh Aug 18, 2022
b42d141
Reduce parseKey code, add test for multiple variables in template, fi…
eyeofvinay Aug 19, 2022
7e7ba40
fix: review comments
lavkesh Aug 19, 2022
eef53df
chore: rearrange packages
lavkesh Aug 19, 2022
3e79770
chore: refactor packages
lavkesh Aug 19, 2022
02c4fb4
fix and add tests, checkstyle
eyeofvinay Aug 21, 2022
ba352b0
chore: fix tests, improve coverage
eyeofvinay Aug 21, 2022
f65af5b
Add test for getErrorsFromResponse() method
eyeofvinay Aug 21, 2022
27df76e
chore: minor change
lavkesh Aug 22, 2022
44b010c
Add RedisParserTest, optimize getSchema in RedisParser
eyeofvinay Aug 22, 2022
e3f12e0
feat: Template abstraction
lavkesh Aug 22, 2022
2ba607e
chore: move entry package inside client
lavkesh Aug 22, 2022
d71cc3a
Fix tests
eyeofvinay Aug 22, 2022
ad65d29
Add RedisSinkTest
eyeofvinay Aug 22, 2022
15c3e01
Add StandaloneClientClientTest
eyeofvinay Aug 23, 2022
0781067
Improve client tests
eyeofvinay Aug 23, 2022
a4e6ec4
Fix Entry tests
eyeofvinay Aug 23, 2022
5518ddb
add TTL response
lavkesh Aug 25, 2022
a756514
Merge branch 'main' into feat-redis-sink
lavkesh Aug 25, 2022
7f0af42
fix: template constructor should validate arguments
lavkesh Aug 25, 2022
3a98bb1
chore: fix checkstyle
lavkesh Aug 25, 2022
bf2d7bf
tests: add test for entries
lavkesh Aug 25, 2022
fbf73ef
tests: add tests for redis clients
lavkesh Aug 25, 2022
92decc2
fix: remove schema from entry classes
lavkesh Aug 26, 2022
d69862c
fix: minor fixes after qa
lavkesh Aug 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ dependencies {
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation "io.grpc:grpc-all:1.38.0"
implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35'
implementation group: 'redis.clients', name: 'jedis', version: '3.0.1'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.2.1'
implementation 'org.json:json:20220320'

implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.4.0'
testImplementation group: 'junit', name: 'junit', version: '4.13'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.mockito:mockito-core:4.5.1'
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/odpf/depot/config/OdpfSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public interface OdpfSinkConfig extends Config {
@DefaultValue("")
String getSinkConnectorSchemaProtoKeyClass();

@Key("SINK_CONNECTOR_SCHEMA_JSON_PARSER_STRING_MODE_ENABLED")
@DefaultValue("true")
boolean getSinkConnectorSchemaJsonParserStringModeEnabled();

@Key("SINK_CONNECTOR_SCHEMA_DATA_TYPE")
@ConverterClass(SinkConnectorSchemaDataTypeConverter.class)
@DefaultValue("PROTOBUF")
Expand Down
55 changes: 55 additions & 0 deletions src/main/java/io/odpf/depot/config/RedisSinkConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.odpf.depot.config;

import io.odpf.depot.config.converter.JsonToPropertiesConverter;
import io.odpf.depot.config.converter.RedisSinkDataTypeConverter;
import io.odpf.depot.config.converter.RedisSinkDeploymentTypeConverter;
import io.odpf.depot.config.converter.RedisSinkTtlTypeConverter;
import io.odpf.depot.redis.enums.RedisSinkDataType;
import io.odpf.depot.redis.enums.RedisSinkDeploymentType;
import io.odpf.depot.redis.enums.RedisSinkTtlType;
import org.aeonbits.owner.Config;

import java.util.Properties;


@Config.DisableFeature(Config.DisableableFeature.PARAMETER_FORMATTING)
public interface RedisSinkConfig extends OdpfSinkConfig {
@Key("SINK_REDIS_URLS")
String getSinkRedisUrls();

@Key("SINK_REDIS_KEY_TEMPLATE")
String getSinkRedisKeyTemplate();

@Key("SINK_REDIS_DATA_TYPE")
@DefaultValue("HASHSET")
@ConverterClass(RedisSinkDataTypeConverter.class)
RedisSinkDataType getSinkRedisDataType();

@Key("SINK_REDIS_TTL_TYPE")
@DefaultValue("DISABLE")
@ConverterClass(RedisSinkTtlTypeConverter.class)
RedisSinkTtlType getSinkRedisTtlType();

@Key("SINK_REDIS_TTL_VALUE")
@DefaultValue("0")
long getSinkRedisTtlValue();

@Key("SINK_REDIS_DEPLOYMENT_TYPE")
@DefaultValue("Standalone")
@ConverterClass(RedisSinkDeploymentTypeConverter.class)
RedisSinkDeploymentType getSinkRedisDeploymentType();

@Key("SINK_REDIS_LIST_DATA_PROTO_INDEX")
String getSinkRedisListDataProtoIndex();

@Key("SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME")
String getSinkRedisKeyValueDataFieldName();

@Key("SINK_REDIS_LIST_DATA_FIELD_NAME")
String getSinkRedisListDataFieldName();

@Key("SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING")
@ConverterClass(JsonToPropertiesConverter.class)
@DefaultValue("")
Properties getSinkRedisHashsetFieldToColumnMapping();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.odpf.depot.config.converter;

import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Stream;


public class JsonToPropertiesConverter implements org.aeonbits.owner.Converter<Properties> {
private static final Gson GSON = new Gson();

@Override
public Properties convert(Method method, String input) {
if (Strings.isNullOrEmpty(input)) {
return null;
}
Type type = new TypeToken<Map<String, Object>>() {
}.getType();
Map<String, Object> m = GSON.fromJson(input, type);
Properties properties = getProperties(m);
validate(properties);
return properties;
}

private Properties getProperties(Map<String, Object> inputMap) {
Properties properties = new Properties();
for (String key : inputMap.keySet()) {
Object value = inputMap.get(key);
if (value instanceof String) {
properties.put(key, value);
} else if (value instanceof Map) {
properties.put(key, getProperties((Map) value));
}
}
return properties;
}

private void validate(Properties properties) {
DuplicateFinder duplicateFinder = flattenValues(properties)
.collect(DuplicateFinder::new, DuplicateFinder::accept, DuplicateFinder::combine);
if (duplicateFinder.duplicates.size() > 0) {
throw new IllegalArgumentException("duplicates found in SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING for : " + duplicateFinder.duplicates);
}
}

private Stream<String> flattenValues(Properties properties) {
return properties
.values()
.stream()
.flatMap(v -> {
if (v instanceof String) {
return Stream.of((String) v);
} else if (v instanceof Properties) {
return flattenValues((Properties) v);
} else {
return Stream.empty();
}
});
}

private static class DuplicateFinder implements Consumer<String> {
private final Set<String> processedValues = new HashSet<>();
private final List<String> duplicates = new ArrayList<>();

@Override
public void accept(String o) {
if (processedValues.contains(o)) {
duplicates.add(o);
} else {
processedValues.add(o);
}
}

void combine(DuplicateFinder other) {
other.processedValues
.forEach(v -> {
if (processedValues.contains(v)) {
duplicates.add(v);
} else {
processedValues.add(v);
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.depot.config.converter;

import io.odpf.depot.redis.enums.RedisSinkDataType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class RedisSinkDataTypeConverter implements Converter<RedisSinkDataType> {
@Override
public RedisSinkDataType convert(Method method, String input) {
return RedisSinkDataType.valueOf(input.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.depot.config.converter;

import io.odpf.depot.redis.enums.RedisSinkDeploymentType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class RedisSinkDeploymentTypeConverter implements Converter<RedisSinkDeploymentType> {
@Override
public RedisSinkDeploymentType convert(Method method, String input) {
return RedisSinkDeploymentType.valueOf(input.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.depot.config.converter;

import io.odpf.depot.redis.enums.RedisSinkTtlType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class RedisSinkTtlTypeConverter implements Converter<RedisSinkTtlType> {
@Override
public RedisSinkTtlType convert(Method method, String input) {
return RedisSinkTtlType.valueOf(input.toUpperCase());
}
}
5 changes: 3 additions & 2 deletions src/main/java/io/odpf/depot/error/ErrorInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
@Data
public class ErrorInfo {

@EqualsAndHashCode.Exclude private Exception exception;
@EqualsAndHashCode.Exclude
private Exception exception;
private ErrorType errorType;

public String toString() {
return errorType.name();
return String.format("Exception %s, ErrorType: %s", exception != null ? exception.getMessage() : "NULL", errorType.name());
}
}
5 changes: 1 addition & 4 deletions src/main/java/io/odpf/depot/message/OdpfMessageSchema.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package io.odpf.depot.message;

import java.io.IOException;

public interface OdpfMessageSchema {

Object getSchema() throws IOException;
Object getSchema();
}
2 changes: 2 additions & 0 deletions src/main/java/io/odpf/depot/message/ParsedOdpfMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ public interface ParsedOdpfMessage {
void validate(OdpfSinkConfig config);

Map<String, Object> getMapping(OdpfMessageSchema schema) throws IOException;

Object getFieldByName(String name, OdpfMessageSchema odpfMessageSchema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.odpf.depot.message.ParsedOdpfMessage;
import io.odpf.depot.metrics.Instrumentation;
import io.odpf.depot.metrics.JsonParserMetrics;
import io.odpf.depot.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONException;
import org.json.JSONObject;
Expand Down Expand Up @@ -54,21 +55,9 @@ public ParsedOdpfMessage parse(OdpfMessage message, SinkConnectorSchemaMessageMo
throw new EmptyMessageException();
}
Instant instant = Instant.now();
JSONObject jsonObject = new JSONObject(new String(payload));
JSONObject jsonWithStringValues = new JSONObject();
jsonObject.keySet()
.forEach(k -> {
Object value = jsonObject.get(k);
if (value instanceof JSONObject) {
throw new UnsupportedOperationException("nested json structure not supported yet");
}
if (JSONObject.NULL.equals(value)) {
return;
}
jsonWithStringValues.put(k, value.toString());
});
JSONObject jsonObject = JsonUtils.getJsonObject(config, payload);
instrumentation.captureDurationSince(jsonParserMetrics.getJsonParseTimeTakenMetric(), instant);
return new JsonOdpfParsedMessage(jsonWithStringValues);
return new JsonOdpfParsedMessage(jsonObject);
} catch (JSONException ex) {
throw new IOException("invalid json error", ex);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.odpf.depot.message.json;

import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.spi.json.JsonOrgJsonProvider;
import io.odpf.depot.config.OdpfSinkConfig;
import io.odpf.depot.message.OdpfMessageSchema;
import io.odpf.depot.message.ParsedOdpfMessage;
Expand Down Expand Up @@ -32,8 +35,17 @@ public void validate(OdpfSinkConfig config) {
@Override
public Map<String, Object> getMapping(OdpfMessageSchema schema) {
if (jsonObject == null || jsonObject.isEmpty()) {
return Collections.emptyMap();
return Collections.emptyMap();
}
return jsonObject.toMap();
}

public Object getFieldByName(String name, OdpfMessageSchema odpfMessageSchema) {
String jsonPathName = "$." + name;
Configuration configuration = Configuration.builder()
.jsonProvider(new JsonOrgJsonProvider())
.build();
JsonPath jsonPath = JsonPath.compile(jsonPathName);
return jsonPath.read(jsonObject, configuration);
}
}
2 changes: 2 additions & 0 deletions src/main/java/io/odpf/depot/message/proto/ProtoField.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.odpf.depot.message.proto;

import com.google.protobuf.DescriptorProtos;
import lombok.EqualsAndHashCode;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@EqualsAndHashCode
public class ProtoField {
private String name;
private String typeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.odpf.depot.message.OdpfMessageSchema;
import lombok.EqualsAndHashCode;
import lombok.Getter;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.Properties;

@EqualsAndHashCode
public class ProtoOdpfMessageSchema implements OdpfMessageSchema {

@Getter
Expand All @@ -21,13 +23,13 @@ public ProtoOdpfMessageSchema(ProtoField protoField) throws IOException {
this(protoField, createProperties(protoField));
}

public ProtoOdpfMessageSchema(ProtoField protoField, Properties properties) throws IOException {
public ProtoOdpfMessageSchema(ProtoField protoField, Properties properties) {
this.protoField = protoField;
this.properties = properties;
}

@Override
public Properties getSchema() throws IOException {
public Properties getSchema() {
return this.properties;
}

Expand Down
Loading