Skip to content

Commit

Permalink
Firehose kinesis source (#875)
Browse files Browse the repository at this point in the history
  • Loading branch information
derrick-mink-sp authored and whummer committed Aug 17, 2018
1 parent 5ad9414 commit 9180ea5
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 15 deletions.
53 changes: 43 additions & 10 deletions localstack/services/firehose/firehose_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from localstack.utils.aws import aws_responses
from localstack.utils.aws.aws_stack import get_s3_client, firehose_stream_arn, connect_elasticsearch
from boto3.dynamodb.types import TypeDeserializer
from localstack.utils.kinesis import kinesis_connector

APP_NAME = 'firehose_api'
app = Flask(APP_NAME)
Expand Down Expand Up @@ -50,8 +51,16 @@ def put_records(stream_name, records):
es = connect_elasticsearch()
for record in records:
obj_id = uuid.uuid4()
data = base64.b64decode(record['Data'])

# DirectPut
if 'Data' in record:
data = base64.b64decode(record['Data'])
# KinesisAsSource
elif 'data' in record:
data = base64.b64decode(record['data'])

body = json.loads(data)

try:
es.create(index=es_index, doc_type=es_type, id=obj_id, body=body)
except Exception as e:
Expand All @@ -63,7 +72,14 @@ def put_records(stream_name, records):
prefix = s3_dest.get('Prefix', '')
s3 = get_s3_client()
for record in records:
data = base64.b64decode(record['Data'])

# DirectPut
if 'Data' in record:
data = base64.b64decode(record['Data'])
# KinesisAsSource
elif 'data' in record:
data = base64.b64decode(record['data'])

obj_name = str(uuid.uuid4())
obj_path = '%s%s%s' % (prefix, '' if prefix.endswith('/') else '/', obj_name)
try:
Expand All @@ -86,7 +102,7 @@ def get_destination(stream_name, destination_id):


def update_destination(stream_name, destination_id,
s3_update=None, elasticsearch_update=None, version_id=None):
s3_update=None, elasticsearch_update=None, version_id=None):
dest = get_destination(stream_name, destination_id)
if elasticsearch_update:
if 'ESDestinationDescription' not in dest:
Expand All @@ -101,8 +117,15 @@ def update_destination(stream_name, destination_id,
return dest


def create_stream(stream_name, s3_destination=None, elasticsearch_destination=None):
def process_records(records, shard_id, fh_d_stream):
put_records(fh_d_stream, records)


def create_stream(stream_name, delivery_stream_type='DirectPut', delivery_stream_type_configuration=None,
s3_destination=None, elasticsearch_destination=None):
stream = {
'DeliveryStreamType': delivery_stream_type,
'KinesisStreamSourceConfiguration': delivery_stream_type_configuration,
'HasMoreDestinations': False,
'VersionId': '1',
'CreateTimestamp': time.time(),
Expand All @@ -114,10 +137,18 @@ def create_stream(stream_name, s3_destination=None, elasticsearch_destination=No
DELIVERY_STREAMS[stream_name] = stream
if elasticsearch_destination:
update_destination(stream_name=stream_name,
destination_id=short_uid(),
elasticsearch_update=elasticsearch_destination)
destination_id=short_uid(),
elasticsearch_update=elasticsearch_destination)
if s3_destination:
update_destination(stream_name=stream_name, destination_id=short_uid(), s3_update=s3_destination)

if delivery_stream_type == 'KinesisStreamAsSource':
kinesis_stream_name = delivery_stream_type_configuration.get('KinesisStreamARN').split('/')[1]
kinesis_connector.listen_to_kinesis(stream_name=kinesis_stream_name,
fh_d_stream=stream_name,
listener_func=process_records,
wait_until_started=True,
ddb_lease_table_suffix='-firehose')
return stream


Expand Down Expand Up @@ -164,8 +195,10 @@ def post_request():
elif action == '%s.CreateDeliveryStream' % ACTION_HEADER_PREFIX:
stream_name = data['DeliveryStreamName']
response = create_stream(stream_name,
s3_destination=data.get('S3DestinationConfiguration'),
elasticsearch_destination=data.get('ElasticsearchDestinationConfiguration'))
delivery_stream_type=data.get('DeliveryStreamType'),
delivery_stream_type_configuration=data.get('KinesisStreamSourceConfiguration'),
s3_destination=data.get('S3DestinationConfiguration'),
elasticsearch_destination=data.get('ElasticsearchDestinationConfiguration'))
elif action == '%s.DeleteDeliveryStream' % ACTION_HEADER_PREFIX:
stream_name = data['DeliveryStreamName']
response = delete_stream(stream_name)
Expand Down Expand Up @@ -198,10 +231,10 @@ def post_request():
destination_id = data['DestinationId']
s3_update = data['S3DestinationUpdate'] if 'S3DestinationUpdate' in data else None
update_destination(stream_name=stream_name, destination_id=destination_id,
s3_update=s3_update, version_id=version_id)
s3_update=s3_update, version_id=version_id)
es_update = data['ESDestinationUpdate'] if 'ESDestinationUpdate' in data else None
update_destination(stream_name=stream_name, destination_id=destination_id,
es_update=es_update, version_id=version_id)
es_update=es_update, version_id=version_id)
response = {}
else:
response = error_response('Unknown action "%s"' % action, code=400, error_type='InvalidAction')
Expand Down
4 changes: 2 additions & 2 deletions localstack/utils/kinesis/kinesis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def receive_msg(records, checkpointer, shard_id):
def listen_to_kinesis(stream_name, listener_func=None, processor_script=None,
events_file=None, endpoint_url=None, log_file=None, configs={}, env=None,
ddb_lease_table_suffix=None, env_vars={}, kcl_log_level=DEFAULT_KCL_LOG_LEVEL,
log_subscribers=[], wait_until_started=False):
log_subscribers=[], wait_until_started=False, fh_d_stream=None):
"""
High-level function that allows to subscribe to a Kinesis stream
and receive events in a listener function. A KCL client process is
Expand All @@ -410,7 +410,7 @@ def listen_to_kinesis(stream_name, listener_func=None, processor_script=None,
run('rm -f %s' % events_file)
# start event reader thread (this process)
ready_mutex = threading.Semaphore(0)
thread = EventFileReaderThread(events_file, listener_func, ready_mutex=ready_mutex)
thread = EventFileReaderThread(events_file, listener_func, ready_mutex=ready_mutex, fh_d_stream=fh_d_stream)
thread.start()
# Wait until the event reader thread is ready (to avoid 'Connection refused' error on the UNIX socket)
ready_mutex.acquire()
Expand Down
9 changes: 6 additions & 3 deletions localstack/utils/kinesis/kinesis_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@


class EventFileReaderThread(FuncThread):
def __init__(self, events_file, callback, ready_mutex=None):
def __init__(self, events_file, callback, ready_mutex=None, fh_d_stream=None):
FuncThread.__init__(self, self.retrieve_loop, None)
self.running = True
self.events_file = events_file
self.callback = callback
self.ready_mutex = ready_mutex
self.fh_d_stream = fh_d_stream

def retrieve_loop(self, params):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
Expand Down Expand Up @@ -46,13 +47,15 @@ def handle_connection(self, conn):
records = event['records']
shard_id = event['shard_id']
method_args = inspect.getargspec(self.callback)[0]
if len(method_args) > 1:
if len(method_args) > 2:
self.callback(records, shard_id=shard_id, fh_d_stream=self.fh_d_stream)
elif len(method_args) > 1:
self.callback(records, shard_id=shard_id)
else:
self.callback(records)
except Exception as e:
LOGGER.warning("Unable to process JSON line: '%s': %s %s. Callback: %s" %
(truncate(line), e, traceback.format_exc(), self.callback))
(truncate(line), e, traceback.format_exc(), self.callback))
conn.close()

def stop(self, quiet=True):
Expand Down
44 changes: 44 additions & 0 deletions tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,50 @@ def test_firehose_s3():
testutil.assert_objects(json.loads(to_str(test_data)), all_objects)


def test_firehose_kinesis_to_s3():
kinesis = aws_stack.connect_to_service('kinesis')
s3_resource = aws_stack.connect_to_resource('s3')
firehose = aws_stack.connect_to_service('firehose')

aws_stack.create_kinesis_stream(TEST_STREAM_NAME, delete=True)

s3_prefix = '/testdata'
test_data = '{"test": "firehose_data_%s"}' % short_uid()

# create Firehose stream
stream = firehose.create_delivery_stream(
DeliveryStreamType='KinesisStreamAsSource',
KinesisStreamSourceConfiguration={
'RoleARN': aws_stack.iam_resource_arn('firehose'),
'KinesisStreamARN': aws_stack.kinesis_stream_arn(TEST_STREAM_NAME)
},
DeliveryStreamName=TEST_FIREHOSE_NAME,
S3DestinationConfiguration={
'RoleARN': aws_stack.iam_resource_arn('firehose'),
'BucketARN': aws_stack.s3_bucket_arn(TEST_BUCKET_NAME),
'Prefix': s3_prefix
}
)
assert stream
assert TEST_FIREHOSE_NAME in firehose.list_delivery_streams()['DeliveryStreamNames']

# create target S3 bucket
s3_resource.create_bucket(Bucket=TEST_BUCKET_NAME)

# put records
kinesis.put_record(
Data=to_bytes(test_data),
PartitionKey='testId',
StreamName=TEST_STREAM_NAME
)

time.sleep(3)

# check records in target bucket
all_objects = testutil.list_all_s3_objects()
testutil.assert_objects(json.loads(to_str(test_data)), all_objects)


def test_kinesis_lambda_sns_ddb_streams():

ddb_lease_table_suffix = '-kclapp'
Expand Down

0 comments on commit 9180ea5

Please sign in to comment.