Skip to content

Commit

Permalink
refactor CloudFormation implementation (#1132)
Browse files Browse the repository at this point in the history
  • Loading branch information
whummer committed Feb 18, 2019
1 parent c5f0a56 commit a403747
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 141 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ script:
- make docker-push-master

after_success:
- make coveralls
- make coveralls || true

notifications:
email: false
9 changes: 4 additions & 5 deletions localstack/plugins.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from localstack.services.infra import (register_plugin, Plugin,
start_s3, start_sns, start_ses, start_apigateway,
start_elasticsearch_service, start_lambda, start_redshift, start_firehose,
start_cloudwatch, start_cloudformation, start_dynamodbstreams, start_route53,
start_s3, start_sns, start_ses, start_apigateway, start_elasticsearch_service, start_lambda,
start_redshift, start_firehose, start_cloudwatch, start_dynamodbstreams, start_route53,
start_ssm, start_sts, start_secretsmanager)
from localstack.services.apigateway import apigateway_listener
from localstack.services.cloudformation import cloudformation_listener
from localstack.services.cloudformation import cloudformation_listener, cloudformation_starter
from localstack.services.dynamodb import dynamodb_listener, dynamodb_starter
from localstack.services.kinesis import kinesis_listener, kinesis_starter
from localstack.services.sns import sns_listener
Expand Down Expand Up @@ -62,7 +61,7 @@ def register_localstack_plugins():
register_plugin(Plugin('route53',
start=start_route53))
register_plugin(Plugin('cloudformation',
start=start_cloudformation,
start=cloudformation_starter.start_cloudformation,
listener=cloudformation_listener.UPDATE_CLOUDFORMATION))
register_plugin(Plugin('cloudwatch',
start=start_cloudwatch))
Expand Down
86 changes: 13 additions & 73 deletions localstack/services/cloudformation/cloudformation_listener.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import re
import uuid
import logging
import xmltodict
import requests
from requests.models import Response, Request
from requests.models import Response
from six.moves.urllib import parse as urlparse
from localstack.constants import DEFAULT_REGION, TEST_AWS_ACCOUNT_ID
from localstack.utils.common import to_str
Expand Down Expand Up @@ -55,14 +52,23 @@ def stack_exists(stack_name):
return False


# TODO - deprecated - remove!
def create_stack(req_data):
stack_name = req_data.get('StackName')[0]
if stack_exists(stack_name):
message = 'The resource with the name requested already exists.'
return error_response(message, error_type='AlreadyExists')
# create stack
cloudformation_service = aws_stack.connect_to_service('cloudformation')
template = template_deployer.template_to_json(req_data.get('TemplateBody')[0])
cloudformation_service.create_stack(StackName=stack_name,
TemplateBody=template)
# now run the actual deployment
template_deployer.deploy_template(template, stack_name)
return True


# TODO - deprecated - remove!
def create_change_set(req_data):
cs_name = req_data.get('ChangeSetName')[0]
change_set_uuid = uuid.uuid4()
Expand All @@ -73,6 +79,7 @@ def create_change_set(req_data):
return response


# TODO - deprecated - remove!
def describe_change_set(req_data):
cs_arn = req_data.get('ChangeSetName')[0]
cs_details = CHANGE_SETS.get(cs_arn)
Expand All @@ -87,6 +94,7 @@ def describe_change_set(req_data):
return response


# TODO - deprecated - remove!
def execute_change_set(req_data):
cs_arn = req_data.get('ChangeSetName')[0]
stack_name = req_data.get('StackName')[0]
Expand Down Expand Up @@ -142,79 +150,11 @@ def forward_request(self, method, path, data, headers):
action = req_data.get('Action')[0]

if req_data:
if action == 'CreateStack':
return create_stack(req_data)
if action == 'CreateChangeSet':
return create_change_set(req_data)
elif action == 'DescribeChangeSet':
return describe_change_set(req_data)
elif action == 'ExecuteChangeSet':
return execute_change_set(req_data)
elif action == 'UpdateStack' and req_data.get('TemplateURL'):
# Temporary fix until the moto CF backend can handle TemplateURL (currently fails)
url = re.sub(r'https?://s3\.amazonaws\.com', aws_stack.get_local_service_url('s3'),
req_data.get('TemplateURL')[0])
req_data['TemplateBody'] = requests.get(url).content
modified_data = urlparse.urlencode(req_data, doseq=True)
return Request(data=modified_data, headers=headers, method=method)
elif action == 'ValidateTemplate':
if action == 'ValidateTemplate':
return validate_template(req_data)

return True

def return_response(self, method, path, data, headers, response):
req_data = None
if method == 'POST' and path == '/':
req_data = urlparse.parse_qs(to_str(data))
action = req_data.get('Action')[0]

if req_data:
if action == 'DescribeStackResources':
if response.status_code < 300:
response_dict = xmltodict.parse(response.content)['DescribeStackResourcesResponse']
resources = response_dict['DescribeStackResourcesResult']['StackResources']
if not resources:
# Check if stack exists
stack_name = req_data.get('StackName')[0]
cloudformation_client = aws_stack.connect_to_service('cloudformation')
try:
cloudformation_client.describe_stacks(StackName=stack_name)
except Exception:
return error_response('Stack with name %s does not exist' % stack_name, code=404)
if action == 'DescribeStackResource':
if response.status_code >= 500:
# fix an error in moto where it fails with 500 if the stack does not exist
return error_response('Stack resource does not exist', code=404)
if action == 'ListStackResources':
response_dict = xmltodict.parse(response.content, force_list=('member'))['ListStackResourcesResponse']
resources = response_dict['ListStackResourcesResult']['StackResourceSummaries']
if resources:
sqs_client = aws_stack.connect_to_service('sqs')
content_str = content_str_original = to_str(response.content)
new_response = Response()
new_response.status_code = response.status_code
new_response.headers = response.headers
for resource in resources['member']:
if resource['ResourceType'] == 'AWS::SQS::Queue':
try:
queue_name = resource['PhysicalResourceId']
queue_url = sqs_client.get_queue_url(QueueName=queue_name)['QueueUrl']
except Exception:
stack_name = req_data.get('StackName')[0]
return error_response('Stack with name %s does not exist' % stack_name, code=404)
content_str = re.sub(resource['PhysicalResourceId'], queue_url, content_str)
new_response._content = content_str
if content_str_original != new_response._content:
# if changes have been made, return patched response
new_response.headers['content-length'] = len(new_response._content)
return new_response
elif action in ('CreateStack', 'UpdateStack'):
if response.status_code >= 400:
return response
# run the actual deployment
template = template_deployer.template_to_json(req_data.get('TemplateBody')[0])
template_deployer.deploy_template(template, req_data.get('StackName')[0])


# instantiate listener
UPDATE_CLOUDFORMATION = ProxyListenerCloudFormation()
100 changes: 100 additions & 0 deletions localstack/services/cloudformation/cloudformation_starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import sys
import logging
from moto.s3 import models as s3_models
from moto.server import main as moto_main
from moto.dynamodb import models as dynamodb_models
from moto.cloudformation import parsing
from boto.cloudformation.stack import Output
from localstack.config import PORT_CLOUDFORMATION
from localstack.constants import DEFAULT_PORT_CLOUDFORMATION_BACKEND
from localstack.services.infra import get_service_protocol, start_proxy_for_service, do_run
from localstack.utils.aws import aws_stack
from localstack.utils.cloudformation import template_deployer

LOG = logging.getLogger(__name__)


def start_cloudformation(port=PORT_CLOUDFORMATION, asynchronous=False, update_listener=None):
backend_port = DEFAULT_PORT_CLOUDFORMATION_BACKEND
cmd = 'python "%s" cloudformation -p %s -H 0.0.0.0' % (__file__, backend_port)
print('Starting mock CloudFormation (%s port %s)...' % (get_service_protocol(), port))
start_proxy_for_service('dynamodb', port, backend_port, update_listener)
env_vars = {'PYTHONPATH': ':'.join(sys.path)}
return do_run(cmd, asynchronous, env_vars=env_vars)


def apply_patches():
""" Apply patches to make LocalStack seamlessly interact with the moto backend.
TODO: Eventually, these patches should be contributed to the upstream repo! """

# Patch S3Backend.get_key method in moto to use S3 API from LocalStack

def get_key(self, bucket_name, key_name, version_id=None):
s3_client = aws_stack.connect_to_service('s3')
value = s3_client.get_object(Bucket=bucket_name, Key=key_name)['Body'].read()
return s3_models.FakeKey(name=key_name, value=value)

s3_models.S3Backend.get_key = get_key

# Patch parse_and_create_resource method in moto to deploy resources in LocalStack

def parse_and_create_resource(logical_id, resource_json, resources_map, region_name):
# parse and get final resource JSON
resource_tuple = parsing.parse_resource(logical_id, resource_json, resources_map)
if not resource_tuple:
return None
_, resource_json, _ = resource_tuple

# create resource definition and store CloudFormation metadata in moto
resource = parse_and_create_resource_orig(logical_id, resource_json, resources_map, region_name)

# deploy resource in LocalStack
stack_name = resources_map.get('AWS::StackName')
resource_wrapped = {logical_id: resource_json}
if template_deployer.should_be_deployed(logical_id, resource_wrapped, stack_name):
LOG.debug('Deploying CloudFormation resource: %s' % resource_json)
template_deployer.deploy_resource(logical_id, resource_wrapped, stack_name=stack_name)
return resource

parse_and_create_resource_orig = parsing.parse_and_create_resource
parsing.parse_and_create_resource = parse_and_create_resource

# Patch CloudFormation parse_output(..) method to fix a bug in moto

def parse_output(output_logical_id, output_json, resources_map):
try:
return parse_output_orig(output_logical_id, output_json, resources_map)
except KeyError:
output = Output()
output.key = output_logical_id
output.value = None
output.description = output_json.get('Description')
return output

parse_output_orig = parsing.parse_output
parsing.parse_output = parse_output

# Patch DynamoDB get_cfn_attribute(..) method to fix a bug in moto

def get_cfn_attribute(self, attribute_name):
try:
return get_cfn_attribute_orig(self, attribute_name)
except Exception:
if attribute_name == 'Arn':
return aws_stack.dynamodb_table_arn(table_name=self.name)
raise

get_cfn_attribute_orig = dynamodb_models.Table.get_cfn_attribute
dynamodb_models.Table.get_cfn_attribute = get_cfn_attribute


def main():
# patch moto implementation
apply_patches()

# start API
sys.exit(moto_main())


if __name__ == '__main__':
main()
28 changes: 13 additions & 15 deletions localstack/services/infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import warnings
import pkgutil
from localstack import constants, config
from localstack.constants import (ENV_DEV, DEFAULT_REGION, LOCALSTACK_VENV_FOLDER,
DEFAULT_PORT_S3_BACKEND, DEFAULT_PORT_APIGATEWAY_BACKEND,
DEFAULT_PORT_SNS_BACKEND, DEFAULT_PORT_CLOUDFORMATION_BACKEND)
from localstack.constants import (
ENV_DEV, DEFAULT_REGION, LOCALSTACK_VENV_FOLDER, DEFAULT_PORT_S3_BACKEND,
DEFAULT_PORT_APIGATEWAY_BACKEND, DEFAULT_PORT_SNS_BACKEND)
from localstack.config import (USE_SSL, PORT_ROUTE53, PORT_S3,
PORT_FIREHOSE, PORT_LAMBDA, PORT_SNS, PORT_REDSHIFT, PORT_CLOUDWATCH,
PORT_DYNAMODBSTREAMS, PORT_SES, PORT_ES, PORT_CLOUDFORMATION, PORT_APIGATEWAY,
PORT_SSM, PORT_SECRETSMANAGER, PORT_STS)
PORT_DYNAMODBSTREAMS, PORT_SES, PORT_ES, PORT_APIGATEWAY, PORT_SSM,
PORT_SECRETSMANAGER, PORT_STS)
from localstack.utils import common, persistence
from localstack.utils.common import (run, TMP_THREADS, in_ci, run_cmd_safe,
TIMESTAMP_FORMAT, FuncThread, ShellCommandThread, mkdir)
Expand All @@ -33,8 +33,6 @@
SIGNAL_HANDLERS_SETUP = False
# maps plugin scope ("services", "commands") to flags which indicate whether plugins have been loaded
PLUGINS_LOADED = {}
# flag to indicate whether we've received and processed the stop signal
INFRA_STOPPED = False

# default backend host address
DEFAULT_BACKEND_HOST = '127.0.0.1'
Expand Down Expand Up @@ -153,9 +151,9 @@ def start_sns(port=PORT_SNS, asynchronous=False, update_listener=None):
backend_port=DEFAULT_PORT_SNS_BACKEND, update_listener=update_listener)


def start_cloudformation(port=PORT_CLOUDFORMATION, asynchronous=False, update_listener=None):
return start_moto_server('cloudformation', port, name='CloudFormation', asynchronous=asynchronous,
backend_port=DEFAULT_PORT_CLOUDFORMATION_BACKEND, update_listener=update_listener)
# def start_cloudformation(port=PORT_CLOUDFORMATION, asynchronous=False, update_listener=None):
# return start_moto_server('cloudformation', port, name='CloudFormation', asynchronous=asynchronous,
# backend_port=DEFAULT_PORT_CLOUDFORMATION_BACKEND, update_listener=update_listener)


def start_cloudwatch(port=PORT_CLOUDWATCH, asynchronous=False):
Expand Down Expand Up @@ -216,6 +214,7 @@ def setup_logging():
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('botocore').setLevel(logging.ERROR)
logging.getLogger('s3transfer').setLevel(logging.INFO)
logging.getLogger('elasticsearch').setLevel(logging.ERROR)


Expand Down Expand Up @@ -246,13 +245,13 @@ def is_debug():
return os.environ.get('DEBUG', '').strip() not in ['', '0', 'false']


def do_run(cmd, asynchronous, print_output=False):
def do_run(cmd, asynchronous, print_output=False, env_vars={}):
sys.stdout.flush()
if asynchronous:
if is_debug():
print_output = True
outfile = subprocess.PIPE if print_output else None
t = ShellCommandThread(cmd, outfile=outfile)
t = ShellCommandThread(cmd, outfile=outfile, env_vars=env_vars)
t.start()
TMP_THREADS.append(t)
return t
Expand Down Expand Up @@ -302,9 +301,9 @@ def start_local_api(name, port, method, asynchronous=False):


def stop_infra():
global INFRA_STOPPED
if INFRA_STOPPED:
if common.INFRA_STOPPED:
return
common.INFRA_STOPPED = True

event_publisher.fire_event(event_publisher.EVENT_STOP_INFRA)

Expand All @@ -315,7 +314,6 @@ def stop_infra():
time.sleep(2)
# TODO: optimize this (takes too long currently)
# check_infra(retries=2, expect_shutdown=True)
INFRA_STOPPED = True


def check_aws_credentials():
Expand Down

0 comments on commit a403747

Please sign in to comment.