Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.google.protobuf.DynamicMessage;
import io.odpf.depot.common.Tuple;
import io.odpf.depot.config.OdpfSinkConfig;
import io.odpf.depot.exception.UnknownFieldsException;
import io.odpf.depot.exception.ConfigurationException;
import io.odpf.depot.exception.UnknownFieldsException;
import io.odpf.depot.message.OdpfMessageSchema;
import io.odpf.depot.message.ParsedOdpfMessage;
import io.odpf.depot.message.proto.converter.fields.NestedProtoField;
Expand Down Expand Up @@ -129,19 +129,22 @@ private void addRepeatedFields(Map<String, Object> row, Object value, List<Objec


public Object getFieldByName(String name, OdpfMessageSchema odpfMessageSchema) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need odpfMessageSchema argument? I see that two implementations of ParsedOdpfMessage doesn't use this argument anymore.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be needed for others. Because the schema is not part of the message. we need to supply from outside.

if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("Invalid field config : name can not be empty");
}
String[] keys = name.split("\\.");
Map<String, Object> fields = getMapping(odpfMessageSchema);
for (String key: keys) {
Object localValue = fields.get(key);
if (localValue == null) {
throw new ConfigurationException("Invalid field config : " + name);
Object currentValue = dynamicMessage;
for (String key : keys) {
if (!(currentValue instanceof DynamicMessage)) {
Comment thread
h4rikris marked this conversation as resolved.
throw new IllegalArgumentException("Invalid field config : " + name);
}
if (localValue instanceof Map) {
fields = (Map<String, Object>) localValue;
} else {
return localValue;
DynamicMessage message = (DynamicMessage) currentValue;
Descriptors.FieldDescriptor descriptor = message.getDescriptorForType().findFieldByName(key);
if (descriptor == null) {
throw new IllegalArgumentException("Invalid field config : " + name);
}
currentValue = message.getField(descriptor);
}
return fields;
return currentValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.api.client.util.DateTime;
import com.google.protobuf.*;
import io.odpf.depot.*;
import io.odpf.depot.exception.ConfigurationException;
import io.odpf.depot.message.OdpfMessageSchema;
import io.odpf.depot.message.ParsedOdpfMessage;
import io.odpf.stencil.Parser;
Expand Down Expand Up @@ -347,7 +346,29 @@ public void shouldThrowExceptionIfColumnIsNotPresentInProto() throws IOException
TestNestedMessageBQ nestedMessage = TestNestedMessageBQ.newBuilder().setNestedId("test").setSingleMessage(message1).build();
ProtoOdpfParsedMessage protoOdpfParsedMessage = new ProtoOdpfParsedMessage(protoParser.parse(nestedMessage.toByteArray()));
Assert.assertEquals("test", protoOdpfParsedMessage.getFieldByName("nested_id", odpfMessageSchema));
ConfigurationException configurationException = assertThrows(ConfigurationException.class, () -> protoOdpfParsedMessage.getFieldByName("single_message.order_id", odpfMessageSchema));
Assert.assertEquals("Invalid field config : single_message.order_id", configurationException.getMessage());
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> protoOdpfParsedMessage.getFieldByName("single_message.order_id", odpfMessageSchema));
Assert.assertEquals("Invalid field config : single_message.order_id", exception.getMessage());
}

@Test
public void shouldThrowExceptionIfColumnIsNotNested() throws IOException {
TestMessageBQ message1 = TestProtoUtil.generateTestMessage(now);
Parser protoParser = StencilClientFactory.getClient().getParser(TestNestedMessageBQ.class.getName());
OdpfMessageSchema odpfMessageSchema = odpfMessageParser.getSchema("io.odpf.depot.TestNestedMessageBQ", descriptorsMap);
TestNestedMessageBQ nestedMessage = TestNestedMessageBQ.newBuilder().setNestedId("test").setSingleMessage(message1).build();
ProtoOdpfParsedMessage protoOdpfParsedMessage = new ProtoOdpfParsedMessage(protoParser.parse(nestedMessage.toByteArray()));
Assert.assertEquals("test", protoOdpfParsedMessage.getFieldByName("nested_id", odpfMessageSchema));
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> protoOdpfParsedMessage.getFieldByName("nested_id.order_id", odpfMessageSchema));
Assert.assertEquals("Invalid field config : nested_id.order_id", exception.getMessage());
}


@Test
public void shouldThrowExceptionIfFieldIsEmpty() throws IOException {
Comment thread
h4rikris marked this conversation as resolved.
OdpfMessageSchema odpfMessageSchema = odpfMessageParser.getSchema("io.odpf.depot.TestMessageBQ", descriptorsMap);
ProtoOdpfParsedMessage protoOdpfParsedMessage = new ProtoOdpfParsedMessage(dynamicMessage);
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> protoOdpfParsedMessage.getFieldByName("", odpfMessageSchema));
Assert.assertEquals("Invalid field config : name can not be empty", exception.getMessage());
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.odpf.depot.redis.parsers;

import com.google.protobuf.Descriptors;
import io.odpf.depot.*;
import io.odpf.depot.TestKey;
import io.odpf.depot.TestMessage;
import io.odpf.depot.TestNestedMessage;
import io.odpf.depot.TestNestedRepeatedMessage;
import io.odpf.depot.config.RedisSinkConfig;
import io.odpf.depot.exception.ConfigurationException;
import io.odpf.depot.message.*;
import io.odpf.depot.message.OdpfMessage;
import io.odpf.depot.message.OdpfMessageSchema;
import io.odpf.depot.message.ParsedOdpfMessage;
import io.odpf.depot.message.SinkConnectorSchemaMessageMode;
import io.odpf.depot.message.proto.ProtoOdpfMessageParser;
import io.odpf.depot.metrics.StatsDReporter;
import io.odpf.depot.redis.client.entry.RedisEntry;
Expand All @@ -27,20 +32,19 @@

@RunWith(MockitoJUnitRunner.class)
public class RedisKeyValueEntryParserTest {
private final Map<String, Descriptors.Descriptor> descriptorsMap = new HashMap<String, Descriptors.Descriptor>() {{
put(String.format("%s", TestKey.class.getName()), TestKey.getDescriptor());
put(String.format("%s", TestMessage.class.getName()), TestMessage.getDescriptor());
put(String.format("%s", TestNestedMessage.class.getName()), TestNestedMessage.getDescriptor());
put(String.format("%s", TestNestedRepeatedMessage.class.getName()), TestNestedRepeatedMessage.getDescriptor());
}};
@Mock
private RedisSinkConfig redisSinkConfig;
@Mock
private StatsDReporter statsDReporter;
private RedisEntryParser redisKeyValueEntryParser;

private OdpfMessageSchema schema;
private ParsedOdpfMessage parsedOdpfMessage;
private final Map<String, Descriptors.Descriptor> descriptorsMap = new HashMap<String, Descriptors.Descriptor>() {{
put(String.format("%s", TestKey.class.getName()), TestKey.getDescriptor());
put(String.format("%s", TestMessage.class.getName()), TestMessage.getDescriptor());
put(String.format("%s", TestNestedMessage.class.getName()), TestNestedMessage.getDescriptor());
put(String.format("%s", TestNestedRepeatedMessage.class.getName()), TestNestedRepeatedMessage.getDescriptor());
}};

private void redisSinkSetup(String template, String field) throws IOException {
when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.KEYVALUE);
Expand Down Expand Up @@ -70,8 +74,8 @@ public void shouldConvertParsedOdpfMessageToRedisKeyValueEntry() throws IOExcept
@Test
public void shouldThrowExceptionForInvalidKeyValueDataFieldName() throws IOException {
redisSinkSetup("test-key", "random-field");
ConfigurationException configurationException =
assertThrows(ConfigurationException.class, () -> redisKeyValueEntryParser.getRedisEntry(parsedOdpfMessage));
assertEquals("Invalid field config : random-field", configurationException.getMessage());
IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> redisKeyValueEntryParser.getRedisEntry(parsedOdpfMessage));
assertEquals("Invalid field config : random-field", exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.odpf.depot.TestNestedMessage;
import io.odpf.depot.TestNestedRepeatedMessage;
import io.odpf.depot.config.RedisSinkConfig;
import io.odpf.depot.exception.ConfigurationException;
import io.odpf.depot.message.OdpfMessage;
import io.odpf.depot.message.OdpfMessageSchema;
import io.odpf.depot.message.ParsedOdpfMessage;
Expand Down Expand Up @@ -75,8 +74,8 @@ public void shouldConvertParsedOdpfMessageToRedisListEntry() throws IOException
@Test
public void shouldThrowExceptionForInvalidKeyValueDataFieldName() throws IOException {
redisSinkSetup("test-key", "random-field");
ConfigurationException configurationException =
assertThrows(ConfigurationException.class, () -> redisListEntryParser.getRedisEntry(parsedOdpfMessage));
assertEquals("Invalid field config : random-field", configurationException.getMessage());
IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> redisListEntryParser.getRedisEntry(parsedOdpfMessage));
assertEquals("Invalid field config : random-field", exception.getMessage());
}
}