Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

488 lines (423 sloc) 17.145 kb
## Amazon CloudFront support
## Author: Michal Ludvig <michal@logix.cz>
## http://www.logix.cz/michal
## License: GPL Version 2
import sys
import base64
import time
import httplib
from logging import debug, info, warning, error
try:
from hashlib import md5, sha1
except ImportError:
from md5 import md5
import sha as sha1
import hmac
try:
import xml.etree.ElementTree as ET
except ImportError:
import elementtree.ElementTree as ET
from Config import Config
from Exceptions import *
from Utils import getTreeFromXml, appendXmlTextNode, getDictFromTree, dateS3toPython
from S3Uri import S3Uri, S3UriS3
def output(message):
sys.stdout.write(message + "\n")
def pretty_output(label, message):
#label = ("%s " % label).ljust(20, ".")
label = ("%s:" % label).ljust(15)
output("%s %s" % (label, message))
class DistributionSummary(object):
## Example:
##
## <DistributionSummary>
## <Id>1234567890ABC</Id>
## <Status>Deployed</Status>
## <LastModifiedTime>2009-01-16T11:49:02.189Z</LastModifiedTime>
## <DomainName>blahblahblah.cloudfront.net</DomainName>
## <Origin>example.bucket.s3.amazonaws.com</Origin>
## <Enabled>true</Enabled>
## </DistributionSummary>
def __init__(self, tree):
if tree.tag != "DistributionSummary":
raise ValueError("Expected <DistributionSummary /> xml, got: <%s />" % tree.tag)
self.parse(tree)
def parse(self, tree):
self.info = getDictFromTree(tree)
self.info['Enabled'] = (self.info['Enabled'].lower() == "true")
def uri(self):
return S3Uri("cf://%s" % self.info['Id'])
class DistributionList(object):
## Example:
##
## <DistributionList xmlns="http://cloudfront.amazonaws.com/doc/2008-06-30/">
## <Marker />
## <MaxItems>100</MaxItems>
## <IsTruncated>false</IsTruncated>
## <DistributionSummary>
## ... handled by DistributionSummary() class ...
## </DistributionSummary>
## </DistributionList>
def __init__(self, xml):
tree = getTreeFromXml(xml)
if tree.tag != "DistributionList":
raise ValueError("Expected <DistributionList /> xml, got: <%s />" % tree.tag)
self.parse(tree)
def parse(self, tree):
self.info = getDictFromTree(tree)
## Normalise some items
self.info['IsTruncated'] = (self.info['IsTruncated'].lower() == "true")
self.dist_summs = []
for dist_summ in tree.findall(".//DistributionSummary"):
self.dist_summs.append(DistributionSummary(dist_summ))
class Distribution(object):
## Example:
##
## <Distribution xmlns="http://cloudfront.amazonaws.com/doc/2008-06-30/">
## <Id>1234567890ABC</Id>
## <Status>InProgress</Status>
## <LastModifiedTime>2009-01-16T13:07:11.319Z</LastModifiedTime>
## <DomainName>blahblahblah.cloudfront.net</DomainName>
## <DistributionConfig>
## ... handled by DistributionConfig() class ...
## </DistributionConfig>
## </Distribution>
def __init__(self, xml):
tree = getTreeFromXml(xml)
if tree.tag != "Distribution":
raise ValueError("Expected <Distribution /> xml, got: <%s />" % tree.tag)
self.parse(tree)
def parse(self, tree):
self.info = getDictFromTree(tree)
## Normalise some items
self.info['LastModifiedTime'] = dateS3toPython(self.info['LastModifiedTime'])
self.info['DistributionConfig'] = DistributionConfig(tree = tree.find(".//DistributionConfig"))
def uri(self):
return S3Uri("cf://%s" % self.info['Id'])
class DistributionConfig(object):
## Example:
##
## <DistributionConfig>
## <Origin>somebucket.s3.amazonaws.com</Origin>
## <CallerReference>s3://somebucket/</CallerReference>
## <Comment>http://somebucket.s3.amazonaws.com/</Comment>
## <Enabled>true</Enabled>
## </DistributionConfig>
EMPTY_CONFIG = "<DistributionConfig><Origin/><CallerReference/><Enabled>true</Enabled></DistributionConfig>"
xmlns = "http://cloudfront.amazonaws.com/doc/2008-06-30/"
def __init__(self, xml = None, tree = None):
if not xml:
xml = DistributionConfig.EMPTY_CONFIG
if not tree:
tree = getTreeFromXml(xml)
if tree.tag != "DistributionConfig":
raise ValueError("Expected <DistributionConfig /> xml, got: <%s />" % tree.tag)
self.parse(tree)
def parse(self, tree):
self.info = getDictFromTree(tree)
self.info['Enabled'] = (self.info['Enabled'].lower() == "true")
if not self.info.has_key("CNAME"):
self.info['CNAME'] = []
if type(self.info['CNAME']) != list:
self.info['CNAME'] = [self.info['CNAME']]
self.info['CNAME'] = [cname.lower() for cname in self.info['CNAME']]
if not self.info.has_key("Comment"):
self.info['Comment'] = ""
def __str__(self):
tree = ET.Element("DistributionConfig")
tree.attrib['xmlns'] = DistributionConfig.xmlns
## Retain the order of the following calls!
appendXmlTextNode("Origin", self.info['Origin'], tree)
appendXmlTextNode("CallerReference", self.info['CallerReference'], tree)
for cname in self.info['CNAME']:
appendXmlTextNode("CNAME", cname.lower(), tree)
if self.info['Comment']:
appendXmlTextNode("Comment", self.info['Comment'], tree)
appendXmlTextNode("Enabled", str(self.info['Enabled']).lower(), tree)
return ET.tostring(tree)
class CloudFront(object):
operations = {
"CreateDist" : { 'method' : "POST", 'resource' : "" },
"DeleteDist" : { 'method' : "DELETE", 'resource' : "/%(dist_id)s" },
"GetList" : { 'method' : "GET", 'resource' : "" },
"GetDistInfo" : { 'method' : "GET", 'resource' : "/%(dist_id)s" },
"GetDistConfig" : { 'method' : "GET", 'resource' : "/%(dist_id)s/config" },
"SetDistConfig" : { 'method' : "PUT", 'resource' : "/%(dist_id)s/config" },
}
## Maximum attempts of re-issuing failed requests
_max_retries = 5
def __init__(self, config):
self.config = config
## --------------------------------------------------
## Methods implementing CloudFront API
## --------------------------------------------------
def GetList(self):
response = self.send_request("GetList")
response['dist_list'] = DistributionList(response['data'])
if response['dist_list'].info['IsTruncated']:
raise NotImplementedError("List is truncated. Ask s3cmd author to add support.")
## TODO: handle Truncated
return response
def CreateDistribution(self, uri, cnames_add = [], comment = None):
dist_config = DistributionConfig()
dist_config.info['Enabled'] = True
dist_config.info['Origin'] = uri.host_name()
dist_config.info['CallerReference'] = str(uri)
if comment == None:
dist_config.info['Comment'] = uri.public_url()
else:
dist_config.info['Comment'] = comment
for cname in cnames_add:
if dist_config.info['CNAME'].count(cname) == 0:
dist_config.info['CNAME'].append(cname)
request_body = str(dist_config)
debug("CreateDistribution(): request_body: %s" % request_body)
response = self.send_request("CreateDist", body = request_body)
response['distribution'] = Distribution(response['data'])
return response
def ModifyDistribution(self, cfuri, cnames_add = [], cnames_remove = [],
comment = None, enabled = None):
if cfuri.type != "cf":
raise ValueError("Expected CFUri instead of: %s" % cfuri)
# Get current dist status (enabled/disabled) and Etag
info("Checking current status of %s" % cfuri)
response = self.GetDistConfig(cfuri)
dc = response['dist_config']
if enabled != None:
dc.info['Enabled'] = enabled
if comment != None:
dc.info['Comment'] = comment
for cname in cnames_add:
if dc.info['CNAME'].count(cname) == 0:
dc.info['CNAME'].append(cname)
for cname in cnames_remove:
while dc.info['CNAME'].count(cname) > 0:
dc.info['CNAME'].remove(cname)
response = self.SetDistConfig(cfuri, dc, response['headers']['etag'])
return response
def DeleteDistribution(self, cfuri):
if cfuri.type != "cf":
raise ValueError("Expected CFUri instead of: %s" % cfuri)
# Get current dist status (enabled/disabled) and Etag
info("Checking current status of %s" % cfuri)
response = self.GetDistConfig(cfuri)
if response['dist_config'].info['Enabled']:
info("Distribution is ENABLED. Disabling first.")
response['dist_config'].info['Enabled'] = False
response = self.SetDistConfig(cfuri, response['dist_config'],
response['headers']['etag'])
warning("Waiting for Distribution to become disabled.")
warning("This may take several minutes, please wait.")
while True:
response = self.GetDistInfo(cfuri)
d = response['distribution']
if d.info['Status'] == "Deployed" and d.info['Enabled'] == False:
info("Distribution is now disabled")
break
warning("Still waiting...")
time.sleep(10)
headers = {}
headers['if-match'] = response['headers']['etag']
response = self.send_request("DeleteDist", dist_id = cfuri.dist_id(),
headers = headers)
return response
def GetDistInfo(self, cfuri):
if cfuri.type != "cf":
raise ValueError("Expected CFUri instead of: %s" % cfuri)
response = self.send_request("GetDistInfo", dist_id = cfuri.dist_id())
response['distribution'] = Distribution(response['data'])
return response
def GetDistConfig(self, cfuri):
if cfuri.type != "cf":
raise ValueError("Expected CFUri instead of: %s" % cfuri)
response = self.send_request("GetDistConfig", dist_id = cfuri.dist_id())
response['dist_config'] = DistributionConfig(response['data'])
return response
def SetDistConfig(self, cfuri, dist_config, etag = None):
if etag == None:
debug("SetDistConfig(): Etag not set. Fetching it first.")
etag = self.GetDistConfig(cfuri)['headers']['etag']
debug("SetDistConfig(): Etag = %s" % etag)
request_body = str(dist_config)
debug("SetDistConfig(): request_body: %s" % request_body)
headers = {}
headers['if-match'] = etag
response = self.send_request("SetDistConfig", dist_id = cfuri.dist_id(),
body = request_body, headers = headers)
return response
## --------------------------------------------------
## Low-level methods for handling CloudFront requests
## --------------------------------------------------
def send_request(self, op_name, dist_id = None, body = None, headers = {}, retries = _max_retries):
operation = self.operations[op_name]
if body:
headers['content-type'] = 'text/plain'
request = self.create_request(operation, dist_id, headers)
conn = self.get_connection()
debug("send_request(): %s %s" % (request['method'], request['resource']))
conn.request(request['method'], request['resource'], body, request['headers'])
http_response = conn.getresponse()
response = {}
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = dict(http_response.getheaders())
response["data"] = http_response.read()
conn.close()
debug("CloudFront: response: %r" % response)
if response["status"] >= 500:
e = CloudFrontError(response)
if retries:
warning(u"Retrying failed request: %s" % op_name)
warning(unicode(e))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
return self.send_request(op_name, dist_id, body, retries - 1)
else:
raise e
if response["status"] < 200 or response["status"] > 299:
raise CloudFrontError(response)
return response
def create_request(self, operation, dist_id = None, headers = None):
resource = self.config.cloudfront_resource + (
operation['resource'] % { 'dist_id' : dist_id })
if not headers:
headers = {}
if headers.has_key("date"):
if not headers.has_key("x-amz-date"):
headers["x-amz-date"] = headers["date"]
del(headers["date"])
if not headers.has_key("x-amz-date"):
headers["x-amz-date"] = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime())
signature = self.sign_request(headers)
headers["Authorization"] = "AWS "+self.config.access_key+":"+signature
request = {}
request['resource'] = resource
request['headers'] = headers
request['method'] = operation['method']
return request
def sign_request(self, headers):
string_to_sign = headers['x-amz-date']
signature = base64.encodestring(hmac.new(self.config.secret_key, string_to_sign, sha1).digest()).strip()
debug(u"CloudFront.sign_request('%s') = %s" % (string_to_sign, signature))
return signature
def get_connection(self):
if self.config.proxy_host != "":
raise ParameterError("CloudFront commands don't work from behind a HTTP proxy")
return httplib.HTTPSConnection(self.config.cloudfront_host)
def _fail_wait(self, retries):
# Wait a few seconds. The more it fails the more we wait.
return (self._max_retries - retries + 1) * 3
class Cmd(object):
"""
Class that implements CloudFront commands
"""
class Options(object):
cf_cnames_add = []
cf_cnames_remove = []
cf_comment = None
cf_enable = None
def option_list(self):
return [opt for opt in dir(self) if opt.startswith("cf_")]
def update_option(self, option, value):
setattr(Cmd.options, option, value)
options = Options()
@staticmethod
def info(args):
cf = CloudFront(Config())
if not args:
response = cf.GetList()
for d in response['dist_list'].dist_summs:
pretty_output("Origin", S3UriS3.httpurl_to_s3uri(d.info['Origin']))
pretty_output("DistId", d.uri())
pretty_output("DomainName", d.info['DomainName'])
pretty_output("Status", d.info['Status'])
pretty_output("Enabled", d.info['Enabled'])
output("")
else:
cfuris = []
for arg in args:
cfuris.append(S3Uri(arg))
if cfuris[-1].type != 'cf':
raise ParameterError("CloudFront URI required instead of: %s" % arg)
for cfuri in cfuris:
response = cf.GetDistInfo(cfuri)
d = response['distribution']
dc = d.info['DistributionConfig']
pretty_output("Origin", S3UriS3.httpurl_to_s3uri(dc.info['Origin']))
pretty_output("DistId", d.uri())
pretty_output("DomainName", d.info['DomainName'])
pretty_output("Status", d.info['Status'])
pretty_output("CNAMEs", ", ".join(dc.info['CNAME']))
pretty_output("Comment", dc.info['Comment'])
pretty_output("Enabled", dc.info['Enabled'])
pretty_output("Etag", response['headers']['etag'])
@staticmethod
def create(args):
cf = CloudFront(Config())
buckets = []
for arg in args:
uri = S3Uri(arg)
if uri.type != "s3":
raise ParameterError("Bucket can only be created from a s3:// URI instead of: %s" % arg)
if uri.object():
raise ParameterError("Use s3:// URI with a bucket name only instead of: %s" % arg)
if not uri.is_dns_compatible():
raise ParameterError("CloudFront can only handle lowercase-named buckets.")
buckets.append(uri)
if not buckets:
raise ParameterError("No valid bucket names found")
for uri in buckets:
info("Creating distribution from: %s" % uri)
response = cf.CreateDistribution(uri, cnames_add = Cmd.options.cf_cnames_add,
comment = Cmd.options.cf_comment)
d = response['distribution']
dc = d.info['DistributionConfig']
output("Distribution created:")
pretty_output("Origin", S3UriS3.httpurl_to_s3uri(dc.info['Origin']))
pretty_output("DistId", d.uri())
pretty_output("DomainName", d.info['DomainName'])
pretty_output("CNAMEs", ", ".join(dc.info['CNAME']))
pretty_output("Comment", dc.info['Comment'])
pretty_output("Status", d.info['Status'])
pretty_output("Enabled", dc.info['Enabled'])
pretty_output("Etag", response['headers']['etag'])
@staticmethod
def delete(args):
cf = CloudFront(Config())
cfuris = []
for arg in args:
cfuris.append(S3Uri(arg))
if cfuris[-1].type != 'cf':
raise ParameterError("CloudFront URI required instead of: %s" % arg)
for cfuri in cfuris:
response = cf.DeleteDistribution(cfuri)
if response['status'] >= 400:
error("Distribution %s could not be deleted: %s" % (cfuri, response['reason']))
output("Distribution %s deleted" % cfuri)
@staticmethod
def modify(args):
cf = CloudFront(Config())
cfuri = S3Uri(args.pop(0))
if cfuri.type != 'cf':
raise ParameterError("CloudFront URI required instead of: %s" % arg)
if len(args):
raise ParameterError("Too many parameters. Modify one Distribution at a time.")
response = cf.ModifyDistribution(cfuri,
cnames_add = Cmd.options.cf_cnames_add,
cnames_remove = Cmd.options.cf_cnames_remove,
comment = Cmd.options.cf_comment,
enabled = Cmd.options.cf_enable)
if response['status'] >= 400:
error("Distribution %s could not be modified: %s" % (cfuri, response['reason']))
output("Distribution modified: %s" % cfuri)
response = cf.GetDistInfo(cfuri)
d = response['distribution']
dc = d.info['DistributionConfig']
pretty_output("Origin", S3UriS3.httpurl_to_s3uri(dc.info['Origin']))
pretty_output("DistId", d.uri())
pretty_output("DomainName", d.info['DomainName'])
pretty_output("Status", d.info['Status'])
pretty_output("CNAMEs", ", ".join(dc.info['CNAME']))
pretty_output("Comment", dc.info['Comment'])
pretty_output("Enabled", dc.info['Enabled'])
pretty_output("Etag", response['headers']['etag'])
Jump to Line
Something went wrong with that request. Please try again.