Skip to content

Commit

Permalink
[LYFT][STRMCMP-1152] Code refactoring for s3AndKinesis PTransform (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
maghamravi committed Nov 30, 2020
1 parent bf905ae commit ec14008
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 40 deletions.
25 changes: 23 additions & 2 deletions runners/flink/flink_runner.gradle
Expand Up @@ -184,15 +184,36 @@ dependencies {
runtimeOnly project(":sdks:java:harness")
if (flink_version.startsWith('1.11')) {
compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.11-lyft20200811"
compile("com.lyft:streamingplatform-events-source:2.2-SNAPSHOT")
compile "com.amazonaws:aws-java-sdk-s3:1.10.6"
compile "org.apache.hadoop:hadoop-aws:2.8.3"
compile "com.lyft:streamingplatform-kinesis:2.2-SNAPSHOT"
compile("com.lyft:streamingplatform-kafka:2.2-SNAPSHOT") {
exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11"
exclude group: "org.apache.flink", module: "flink-streaming-java_2.11"
}
compile("com.lyft:streamingplatform-events-source:2.2-SNAPSHOT") {
exclude group: "com.lyft", module: "streamingplatform-kinesis"
exclude group: "com.google.guava", module: "guava"
exclude group: "com.google.protobuf", module: "protobuf-java"
exclude group: "co.cask.tephra", module: "tephra-api"
exclude group: "co.cask.tephra", module: "tephra-core"
exclude group: "co.cask.tephra", module: "tephra-hbase-compat-1.0"
exclude group: "com.jolbox", module: "bonecp"
exclude group: "com.zaxxer", module: "HikariCP"
exclude group: "javolution", module: "javolution"
exclude group: "org.antlr", module: "antlr-runtime"
exclude group: "org.apache.hadoop", module: "hadoop-common"
exclude group: "org.apache.hadoop", module: "hadoop-hdfs"
exclude group: "org.apache.hadoop", module: "hadoop-mapreduce-client-core"
exclude group: "org.apache.hbase", module: "hbase-client"
exclude group: "org.datanucleus", module: "datanucleus-api-jdo"
exclude group: "org.datanucleus", module: "datanucleus-core"
exclude group: "org.datanucleus", module: "datanucleus-rdbms"
}
} else if (flink_version.startsWith('1.10')) {
compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.10-lyft20200508"
compile("com.lyft:streamingplatform-events-source:2.1-dryft-SNAPSHOT")
compile "com.amazonaws:aws-java-sdk-s3:1.10.6"
compile "org.apache.hadoop:hadoop-aws:2.8.3"
compile "com.lyft:streamingplatform-kinesis:2.1-SNAPSHOT"
compile("com.lyft:streamingplatform-kafka:2.1-SNAPSHOT") {
exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11"
Expand Down
Expand Up @@ -403,6 +403,17 @@ private void translateS3AndKinesisInputs(
S3Config s3Config = getS3Config(userS3Config);
List<EventConfig> eventConfigs = getEventConfigs(events);

// in case of default, [kinesis_parallelism = job_parallelism - s3.parallelism]
if (kinesisConfig.getParallelism() < 1) {
int parallelism = context.getExecutionEnvironment().getConfig().getParallelism();
KinesisConfig.Builder builder =
new KinesisConfig.Builder(kinesisConfig.getStreamName());
builder.withStreamStartMode(kinesisConfig.getStreamStartMode());
builder.withProperties(kinesisConfig.getProperties());
builder.withParallelism(parallelism - s3Config.parallelism);
kinesisConfig = builder.build();
}

SourceContext sourceContext =
new SourceContext.Builder()
.withStreamConfig(kinesisConfig)
Expand Down Expand Up @@ -445,12 +456,13 @@ List<EventConfig> getEventConfigs(List<Map<String, JsonNode>> events) {
EventConfig.Builder builder = new EventConfig.Builder(node.get("name").asText());

// Add lateness in sec
JsonNode latenessInSec = node.get("lateness_in_sec");
if (latenessInSec != null) {
builder = builder.withLatenessInSec(latenessInSec.asLong());
// TODO : refactor the builder to use a standard name.
JsonNode maxOutOfOrdernessMillis = node.get("max_out_of_orderness_millis");
if (maxOutOfOrdernessMillis != null) {
builder = builder.withLatenessInSec(maxOutOfOrdernessMillis.asLong()/1000);
}

// Add lookback hours
// Add lookback days
JsonNode lookbackDays = node.get("lookback_days");
if (lookbackDays != null) {
builder = builder.withLookbackInDays(lookbackDays.asInt());
Expand All @@ -473,7 +485,7 @@ S3Config getS3Config(Map<String, JsonNode> userS3Config) {
}

// Add s3 lookback hours
JsonNode lookbackHours = userS3Config.get("lookback_hours");
JsonNode lookbackHours = userS3Config.get("lookback_threshold_hours");
if (lookbackHours != null) {
builder = builder.withLookbackHours(lookbackHours.asInt());
}
Expand All @@ -491,9 +503,8 @@ KinesisConfig getKinesisConfig(Map<String, JsonNode> userKinesisConfig, ObjectMa
new KinesisConfig.Builder(userKinesisConfig.get("stream").asText());
// Add kinesis parallelism
JsonNode kinesisParallelism = userKinesisConfig.get("parallelism");
if (kinesisParallelism != null) {
builder = builder.withParallelism(kinesisParallelism.asInt());
}
builder = builder.withParallelism(kinesisParallelism.asInt());

// Add kinesis properties
Map<String, String> kinesisProps =
mapper.convertValue(
Expand Down
4 changes: 2 additions & 2 deletions runners/flink/src/test/resources/s3_and_kinesis_config.json
Expand Up @@ -2,13 +2,13 @@
"events": [
{
"name": "test_event",
"lateness_in_sec": 5,
"max_out_of_orderness_millis": 5000,
"lookback_days": 1
}
],
"s3": {
"parallelism": 1,
"lookback_hours": 12
"lookback_threshold_hours": 12
},
"kinesis": {
"stream": "kinesis_stream",
Expand Down
118 changes: 90 additions & 28 deletions sdks/python/apache_beam/io/lyft/s3_and_kinesis.py
Expand Up @@ -2,32 +2,76 @@
import logging
import random
import string
from collections import namedtuple

from apache_beam import PTransform
from apache_beam.pvalue import PBegin
from apache_beam.pvalue import PCollection
from apache_beam.transforms.core import Windowing
from apache_beam.transforms.window import GlobalWindows

Event = namedtuple('Event', 'name lateness_in_sec lookback_days')
S3Config = namedtuple('S3Config', 'parallelism lookback_hours')

class EventConfig(object):
"""
Configuration of analytic event.
"""
DEFAULT_MAX_OUT_OF_ORDERNESS_MILLIS = 5_000 # maximum allowed delay in time before an element is ignored

def __init__(self, name):
self.name = name # name of the analytics event.
self.max_out_of_orderness_millis = EventConfig.DEFAULT_MAX_OUT_OF_ORDERNESS_MILLIS
self.lookback_days = None # historical start time to consume events from.

def with_max_out_of_orderness_millis(self, out_of_orderness_millis):
"""
The interval between the maximum timestamp seen so far and the watermark that
is emitted. For example, if this is set to 1000ms, after seeing a record for
10:00:01 we will emit a watermark for 10:00:00, indicating that we believe that all
data from before that time has arrived.
"""
self.max_out_of_orderness_millis = out_of_orderness_millis
return self

def with_lookback_in_days(self, lookback_days):
self.lookback_days = lookback_days
return self


class S3Config(object):
"""
S3 configuration.
"""
DEFAULT_S3_PARALLELISM = 1 # parallelism for s3 source connector. Defaults to 1.
DEFAULT_LOOKBACK_THRESHOLD_HOURS = 23 # threshold in hours for consuming events from S3.

def __init__(self):
self.parallelism = S3Config.DEFAULT_S3_PARALLELISM
self.lookback_threshold_hours = S3Config.DEFAULT_LOOKBACK_THRESHOLD_HOURS

def with_parallelism(self, parallelism):
self.parallelism = parallelism
return self

def with_lookback_threshold_hours(self, lookback_threshold_hours):
self.lookback_threshold_hours = lookback_threshold_hours
return self


class S3AndKinesisInput(PTransform):
"""Custom composite transform that uses Kinesis and S3 as
input sources. This wraps the streamingplatform-dryft-sdk SourceConnector.
Only works with the portable Flink runner.
"""
DEFAULT_KINESIS_PARALLELISM = -1
DEFAULT_START_MODE = 'TRIM_HORIZON'

def __init__(self):
super().__init__()
self.events = []
self.s3_config = S3Config(None, None)
self.events_config = []
self.s3_config = S3Config()
self.source_name = 'S3andKinesis_' + self._get_random_source_name()
self.kinesis_properties = {'aws.region': 'us-east-1'}
self.stream_start_mode = 'TRIM_HORIZON'
self.kinesis_parallelism = 1
self.stream_start_mode = S3AndKinesisInput.DEFAULT_START_MODE
self.kinesis_parallelism = S3AndKinesisInput.DEFAULT_KINESIS_PARALLELISM

def expand(self, pbegin):
assert isinstance(pbegin, PBegin), (
Expand All @@ -40,15 +84,20 @@ def get_windowing(self, inputs):
def infer_output_type(self, unused_input_type):
return bytes

def with_event(self, event):
self.events.append(event)
def with_event_config(self, event_config):
"""
Append EventConfig to the list of event configuration.
:param event_config: An instance of EventConfig
:return: S3AndKinesisInput
"""
self.events_config.append(event_config)
return self

def with_kinesis_stream_name(self, stream_name):
self.stream_name = stream_name
return self

# Defaults to 1
# Defaults to -1
def with_kinesis_parallelism(self, parallelism):
self.kinesis_parallelism = parallelism
return self
Expand Down Expand Up @@ -85,26 +134,36 @@ def from_runner_api_parameter(_unused_ptransform, spec_parameter, _unused_contex
payload = json.loads(spec_parameter)
instance.source_name = payload['source_name']
s3_config_dict = payload['s3']
instance.s3_config = S3Config(
parallelism=s3_config_dict.get('parallelism', None),
lookback_hours=s3_config_dict.get('lookback_hours', None)
)

lookback_threshold_hours=s3_config_dict.get('lookback_threshold_hours', S3Config.DEFAULT_LOOKBACK_THRESHOLD_HOURS)
s3_parallelism=s3_config_dict.get('parallelism', S3Config.DEFAULT_S3_PARALLELISM)
s3_config = S3Config()
s3_config.with_lookback_threshold_hours(lookback_threshold_hours)
s3_config.with_parallelism(s3_parallelism)

instance.s3_config = s3_config

kinesis_config_dict = payload['kinesis']
instance.stream_name = kinesis_config_dict.get('stream')
instance.kinesis_properties = kinesis_config_dict.get('properties', None)
instance.kinesis_parallelism = kinesis_config_dict.get('parallelism', None)
instance.stream_start_mode = kinesis_config_dict.get('stream_start_mode', None)
instance.kinesis_parallelism = kinesis_config_dict.get('parallelism',
S3AndKinesisInput.DEFAULT_KINESIS_PARALLELISM)
instance.stream_start_mode = kinesis_config_dict.get('stream_start_mode',
S3AndKinesisInput.DEFAULT_START_MODE)
events_list = payload['events']
instance.events = []
instance.events_config = []
for event in events_list:
assert event.get('name') is not None, "Event name must be set"
instance.events.append(
Event(
name=event.get('name'),
lateness_in_sec=event.get('lateness_in_sec', None),
lookback_days=event.get('lookback_days', None)
)
)
event_config = EventConfig(event.get('name'))
max_out_of_orderness_millis = event.get('max_out_of_orderness_millis',
EventConfig.DEFAULT_MAX_OUT_OF_ORDERNESS_MILLIS)
event_config.with_max_out_of_orderness_millis(max_out_of_orderness_millis)

lookback_days = event.get('lookback_days', None)
if lookback_days is not None:
event_config.with_lookback_in_days(lookback_days)
instance.events_config.append(event_config)

return instance

def to_runner_api_parameter(self, _unused_context):
Expand All @@ -122,14 +181,17 @@ def to_runner_api_parameter(self, _unused_context):
},
's3': {
'parallelism': self.s3_config.parallelism,
'lookback_hours': self.s3_config.lookback_hours
'lookback_threshold_hours': self.s3_config.lookback_threshold_hours
},
}

event_list_json = []
for e in self.events:
assert isinstance(e, Event), "expected instance of Event, but got %s" % type(e)
event_map = {'name': e.name, 'lateness_in_sec': e.lateness_in_sec, 'lookback_days': e.lookback_days}
for event_config in self.events_config:
event_map = {
'name': event_config.name,
'max_out_of_orderness_millis': event_config.max_out_of_orderness_millis,
'lookback_days': event_config.lookback_days
}
event_list_json.append(event_map)

json_map['events'] = event_list_json
Expand Down

0 comments on commit ec14008

Please sign in to comment.