Skip to content

Commit

Permalink
Add initial support for S3 bucket notifications; fix subprocess32 ins…
Browse files Browse the repository at this point in the history
…tallation
  • Loading branch information
whummer committed Apr 17, 2017
1 parent dc1a5e8 commit 5f0304d
Show file tree
Hide file tree
Showing 17 changed files with 279 additions and 102 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ LABEL authors="Waldemar Hummer (whummer@atlassian.com), Gianluca Bortoli (giallo

# install general packages
RUN apk update && \
apk add --update autoconf automake build-base ca-certificates git libffi-dev libtool make nodejs openssl openssl-dev python python-dev py-pip supervisor zip && \
apk add --update autoconf automake build-base ca-certificates git libffi-dev libtool linux-headers make nodejs openssl openssl-dev python python-dev py-pip supervisor zip && \
update-ca-certificates

# set workdir
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ make web

## Change Log

* v0.3.10: Add initial support for S3 bucket notifications; fix subprocess32 installation
* v0.3.9: Make services/ports configurable via $SERVICES; add tests for Firehose+S3
* v0.3.8: Fix Elasticsearch via local bind and proxy; refactoring; improve error logging
* v0.3.5: Fix lambda handler name; fix host name for S3 API; install web libs on pip install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,53 +45,47 @@ public LocalstackTestRunner(Class<?> klass) throws InitializationError {
/* SERVICE ENDPOINTS */

public static String getEndpointS3() {
ensureInstallation();
return getEndpoint("s3");
return ensureInstallationAndGetEndpoint("s3");
}

public static String getEndpointKinesis() {
ensureInstallation();
return getEndpoint("kinesis");
return ensureInstallationAndGetEndpoint("kinesis");
}

public static String getEndpointLambda() {
ensureInstallation();
return getEndpoint("lambda");
return ensureInstallationAndGetEndpoint("lambda");
}

public static String getEndpointDynamoDB() {
ensureInstallation();
return getEndpoint("dynamodb");
return ensureInstallationAndGetEndpoint("dynamodb");
}

public static String getEndpointDynamoDBStreams() {
ensureInstallation();
return getEndpoint("dynamodbstreams");
return ensureInstallationAndGetEndpoint("dynamodbstreams");
}

public static String getEndpointAPIGateway() {
ensureInstallation();
return getEndpoint("apigateway");
return ensureInstallationAndGetEndpoint("apigateway");
}

public static String getEndpointElasticsearch() {
ensureInstallation();
return getEndpoint("elasticsearch");
return ensureInstallationAndGetEndpoint("elasticsearch");
}

public static String getEndpointFirehose() {
ensureInstallation();
return getEndpoint("firehose");
return ensureInstallationAndGetEndpoint("firehose");
}

public static String getEndpointSNS() {
ensureInstallation();
return getEndpoint("sns");
return ensureInstallationAndGetEndpoint("sns");
}

public static String getEndpointSQS() {
ensureInstallation();
return getEndpoint("sns");
return ensureInstallationAndGetEndpoint("sqs");
}

public static String getEndpointRedshift() {
return ensureInstallationAndGetEndpoint("redshift");
}

/* UTILITY METHODS */
Expand All @@ -116,8 +110,12 @@ private static void killProcess(Process p) {
p.destroyForcibly();
}

private static String getEndpoint(String service) {
private static String ensureInstallationAndGetEndpoint(String service) {
ensureInstallation();
return getEndpoint(service);
}

private static String getEndpoint(String service) {
String regex = ".*DEFAULT_PORT_" + service.toUpperCase() + "\\s*=\\s*([0-9]+).*";
String port = Pattern.compile(regex, Pattern.DOTALL | Pattern.MULTILINE).matcher(CONFIG_FILE_CONTENT).replaceAll("$1");
return "http://" + LOCALHOST + ":" + port + "/";
Expand Down
7 changes: 4 additions & 3 deletions localstack/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
DEFAULT_PORT_APIGATEWAY_BACKEND = 4579
DEFAULT_PORT_KINESIS_BACKEND = 4580
DEFAULT_PORT_DYNAMODB_BACKEND = 4581
DEFAULT_PORT_SNS_BACKEND = 4582
DEFAULT_PORT_ELASTICSEARCH_BACKEND = 4583
DEFAULT_PORT_S3_BACKEND = 4582
DEFAULT_PORT_SNS_BACKEND = 4583
DEFAULT_PORT_ELASTICSEARCH_BACKEND = 4584

DEFAULT_PORT_WEB_UI = 8080

Expand All @@ -56,7 +57,7 @@
BIND_HOST = '0.0.0.0'

# AWS user account ID used for tests
TEST_AWS_ACCOUNT_ID = '123456789'
TEST_AWS_ACCOUNT_ID = '000000000000'
os.environ['TEST_AWS_ACCOUNT_ID'] = TEST_AWS_ACCOUNT_ID

# root code folder
Expand Down
1 change: 0 additions & 1 deletion localstack/mock/.gitignore

This file was deleted.

33 changes: 0 additions & 33 deletions localstack/mock/apis/firehose_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,39 +37,6 @@ def get_s3_client():
s3={'addressing_style': 'path'}))


def add_s3_destination(stream_name, bucket_name, path_prefix):
stream = get_stream(stream_name)
dest = {
"DestinationId": str(uuid.uuid4()),
"S3DestinationDescription": {
"RoleARN": role_arn(stream_name),
"BucketARN": s3_bucket_arn(bucket_name),
"Prefix": path_prefix,
"BufferingHints": {
"IntervalInSeconds": 60,
"SizeInMBs": 5
},
"EncryptionConfiguration": {
"NoEncryptionConfig": "NoEncryption"
},
"CompressionFormat": "UNCOMPRESSED",
"CloudWatchLoggingOptions": {
"Enabled": False
}
}
}
stream['Destinations'].append(dest)
try:
s3 = get_s3_client()
s3.create_bucket(Bucket=bucket_name)
except Exception, e:
print("WARN: Unable to create S3 bucket %s: %s" % (bucket_name, traceback.format_exc(e)))
try:
print(e.response)
except Exception, e:
pass


def put_record(stream_name, record):
return put_records(stream_name, [record])

Expand Down
6 changes: 4 additions & 2 deletions localstack/mock/apis/lambda_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import base64
import threading
import imp
import cStringIO
from flask import Flask, Response, jsonify, request, make_response
from datetime import datetime
from localstack.constants import *
Expand Down Expand Up @@ -86,7 +87,8 @@ def add_event_source(function_name, source_arn):
def process_kinesis_records(records, stream_name):
# feed records into listening lambdas
try:
sources = get_event_sources(source_arn=aws_stack.kinesis_stream_arn(stream_name))
stream_arn = aws_stack.kinesis_stream_arn(stream_name)
sources = get_event_sources(source_arn=stream_arn)
for source in sources:
arn = source['FunctionArn']
lambda_function = lambda_arn_to_function[arn]
Expand All @@ -100,7 +102,7 @@ def process_kinesis_records(records, stream_name):
})
run_lambda(lambda_function, event=event, context={}, lambda_cwd=lambda_cwd)
except Exception, e:
print(traceback.format_exc(e))
print(traceback.format_exc())


def get_event_sources(func_name=None, source_arn=None):
Expand Down
3 changes: 3 additions & 0 deletions localstack/mock/generic_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def forward(self, method):
response = self.method(proxy_url, data=self.data_string,
headers=forward_headers, proxies=proxies)
self.send_response(response.status_code)
# copy headers from response
for header_key, header_value in response.headers.iteritems():
self.send_header(header_key, header_value)
self.end_headers()
self.wfile.write(response.text)
if self.proxy.update_listener:
Expand Down
19 changes: 13 additions & 6 deletions localstack/mock/infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from localstack.mock import generic_proxy, install
from localstack.mock.install import ROOT_PATH
from localstack.mock.apis import firehose_api, lambda_api, dynamodbstreams_api, es_api
from localstack.mock.proxy import apigateway_listener, dynamodb_listener, kinesis_listener, sns_listener
from localstack.mock.proxy import (apigateway_listener,
dynamodb_listener, kinesis_listener, sns_listener, s3_listener)
from localstack.mock.generic_proxy import GenericProxy

# flag to indicate whether signal handlers have been set up already
Expand Down Expand Up @@ -113,9 +114,11 @@ def start_apigateway(port=PORT_APIGATEWAY, async=False, update_listener=None):
return do_run(cmd, async)


def start_s3(port=PORT_S3, async=False):
cmd = '%s/bin/moto_server s3 -p %s -H %s' % (LOCALSTACK_VENV_FOLDER, port, constants.BIND_HOST)
def start_s3(port=PORT_S3, async=False, update_listener=None):
backend_port = DEFAULT_PORT_S3_BACKEND
cmd = '%s/bin/moto_server s3 -p %s -H %s' % (LOCALSTACK_VENV_FOLDER, backend_port, constants.BIND_HOST)
print("Starting mock S3 server (port %s)..." % port)
start_proxy(port, backend_port, update_listener)
return do_run(cmd, async)


Expand Down Expand Up @@ -292,8 +295,10 @@ def check_aws_credentials():
assert credentials


def start_infra(async=False, dynamodb_update_listener=None, kinesis_update_listener=None,
apigateway_update_listener=None, sns_update_listener=None, apis=None):
def start_infra(async=False,
dynamodb_update_listener=None, kinesis_update_listener=None,
apigateway_update_listener=None, sns_update_listener=None,
s3_update_listener=None, apis=None):
try:
if not apis:
apis = SERVICE_PORTS.keys()
Expand All @@ -305,6 +310,8 @@ def start_infra(async=False, dynamodb_update_listener=None, kinesis_update_liste
apigateway_update_listener = apigateway_listener.update_apigateway
if not sns_update_listener:
sns_update_listener = sns_listener.update_sns
if not s3_update_listener:
s3_update_listener = s3_listener.update_s3
# set environment
os.environ['AWS_REGION'] = DEFAULT_REGION
os.environ['ENV'] = ENV_DEV
Expand All @@ -328,7 +335,7 @@ def start_infra(async=False, dynamodb_update_listener=None, kinesis_update_liste
thread = start_elasticsearch_service(async=True)
sleep_time = max(sleep_time, 5)
if 's3' in apis:
thread = start_s3(async=True)
thread = start_s3(async=True, update_listener=s3_update_listener)
sleep_time = max(sleep_time, 3)
if 'sns' in apis:
thread = start_sns(async=True, update_listener=sns_update_listener)
Expand Down
119 changes: 119 additions & 0 deletions localstack/mock/proxy/s3_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import re
import urlparse
import logging
import json
import xml.etree.ElementTree as ET
from requests.models import Response
from localstack.constants import *
from localstack.utils.aws import aws_stack
from localstack.utils.common import timestamp, TIMESTAMP_FORMAT_MILLIS

# mappings for S3 bucket notifications
S3_NOTIFICATIONS = {}

# set up logger
LOGGER = logging.getLogger(__name__)

# XML namespace constants
XMLNS_S3 = 'http://s3.amazonaws.com/doc/2006-03-01/'


def match_event(event, action, api_method):
regex = event.replace('*', '[^:]*')
action_string = 's3:%s:%s' % (action, api_method)
return re.match(regex, action_string)


def get_event_message(event_name, bucket_name, file_name='testfile.txt', file_size=1024):
# Based on: http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
return {
'Records': [{
'eventVersion': '2.0',
'eventSource': 'aws:s3',
'awsRegion': DEFAULT_REGION,
'eventTime': timestamp(format=TIMESTAMP_FORMAT_MILLIS),
'eventName': event_name,
'userIdentity': {
'principalId': 'AIDAJDPLRKLG7UEXAMPLE'
},
's3': {
's3SchemaVersion': '1.0',
'configurationId': 'testConfigRule',
'bucket': {
'name': bucket_name,
'ownerIdentity': {
'principalId': 'A3NL1KOZZKExample'
},
'arn': 'arn:aws:s3:::%s' % bucket_name
},
'object': {
'key': file_name,
'size': file_size,
'eTag': 'd41d8cd98f00b204e9800998ecf8427e',
'versionId': '096fKKXTRTtl3on89fVO.nfljtsv6qko',
'sequencer': '0055AED6DCD90281E5'
}
}
}]
}


def send_notifications(method, bucket_name, object_path):
for bucket, config in S3_NOTIFICATIONS.iteritems():
if bucket == bucket_name:
action = {'PUT': 'ObjectCreated', 'DELETE': 'ObjectRemoved'}[method]
# TODO: support more detailed methods, e.g., DeleteMarkerCreated
# http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
api_method = {'PUT': 'Put', 'DELETE': 'Delete'}[method]
event_name = '%s:%s' % (action, api_method)
if match_event(config['Event'], action, api_method):
# send notification
message = get_event_message(event_name=event_name, bucket_name=bucket_name)
message = json.dumps(message)
result = None
if config.get('Queue'):
sqs_client = aws_stack.connect_to_service('sqs')
sqs_client.send_message(QueueUrl=config['Queue'], MessageBody=message)
if config.get('Topic'):
sns_client = aws_stack.connect_to_service('sns')
sns_client.publish(TopicArn=config['Topic'], Message=message)
if config.get('CloudFunction'):
lambda_client = aws_stack.connect_to_service('lambda')
lambda_client.invoke(FunctionName=config['CloudFunction'], Payload=message)
if not filter(lambda x: config.get(x), ('Queue', 'Topic', 'CloudFunction')):
LOGGER.warn('Neither of Queue/Topic/CloudFunction defined for S3 notification.')


def get_xml_text(node, name, ns=None, default=None):
if ns is not None:
name = '{%s}%s' % (ns, name)
child = node.find(name)
if child is None:
return default
return child.text


def update_s3(method, path, data, headers, response=None, return_forward_info=False):
if return_forward_info:
parsed = urlparse.urlparse(path)
query = parsed.query
path = parsed.path
query_map = urlparse.parse_qs(query)
if method == 'PUT' and (query == 'notification' or 'notification' in query_map):
tree = ET.fromstring(data)
queue_config = tree.find('{%s}QueueConfiguration' % XMLNS_S3)
if len(queue_config):
bucket = path[1:]
S3_NOTIFICATIONS[bucket] = {
'Id': get_xml_text(queue_config, 'Id'),
'Event': get_xml_text(queue_config, 'Event', ns=XMLNS_S3),
'Queue': get_xml_text(queue_config, 'Queue', ns=XMLNS_S3),
'Topic': get_xml_text(queue_config, 'Topic', ns=XMLNS_S3),
'CloudFunction': get_xml_text(queue_config, 'CloudFunction', ns=XMLNS_S3)
}
return True
if method in ('PUT', 'DELETE') and '/' in path[1:]:
parts = path[1:].split('/', 1)
bucket_name = parts[0]
object_path = '/%s' % parts[1]
send_notifications(method, bucket_name, object_path)
3 changes: 2 additions & 1 deletion localstack/utils/aws/aws_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ def connect_to_service(service_name, client=True, env=None, region_name=None, en
if not endpoint_url:
if env.region == REGION_LOCAL:
endpoint_url = get_local_service_url(service_name)
return method(service_name, region_name=env.region, endpoint_url=endpoint_url)
region = env.region if env.region != REGION_LOCAL else DEFAULT_REGION
return method(service_name, region_name=region, endpoint_url=endpoint_url)


class VelocityInput:
Expand Down

0 comments on commit 5f0304d

Please sign in to comment.