Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ def _make_store(cfg):
store_cfg = cfg.yml.get('store')
assert store_cfg, "Store was not configured, but is necessary."
credentials = cfg.subtree('aws credentials')
store = make_store(store_cfg, credentials=credentials)
logger = make_logger(cfg, 'process')
store = make_store(store_cfg, credentials=credentials, logger=logger)
return store


Expand Down
62 changes: 51 additions & 11 deletions tilequeue/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,67 @@ def parse_coordinate_from_path(path, extension, layer):
pass


# decorates a function to back off and retry
def _backoff_and_retry(ExceptionType, num_tries=5, retry_factor=2,
retry_interval=1, logger=None):
from time import sleep
from functools import wraps

def decorator(f):
@wraps(f)
def func(*args, **kwargs):
# do the first num_tries-1 attempts wrapped in something to catch
# any exceptions, optionally log them, and try again.
interval = retry_interval
factor = retry_factor

for _ in xrange(1, num_tries):
try:
return f(*args, **kwargs)

except ExceptionType as e:
if logger:
logger.warning("Failed. Backing off and retrying. "
"Error: %s" % str(e))

sleep(interval)
interval *= factor

# do final attempt without try-except, so we get the exception
# in normal code.
return f(*args, **kwargs)

return func
return decorator


class S3(object):

def __init__(
self, bucket, date_prefix, path, reduced_redundancy,
delete_retry_interval):
delete_retry_interval, logger):
self.bucket = bucket
self.date_prefix = date_prefix
self.path = path
self.reduced_redundancy = reduced_redundancy
self.delete_retry_interval = delete_retry_interval
self.logger = logger

def write_tile(self, tile_data, coord, format, layer):
key_name = s3_tile_key(
self.date_prefix, self.path, layer, coord, format.extension)
key = self.bucket.new_key(key_name)
key.set_contents_from_string(
tile_data,
headers={'Content-Type': format.mimetype},
policy='public-read',
reduced_redundancy=self.reduced_redundancy,
)

@_backoff_and_retry(Exception, logger=self.logger)
def write_to_s3():
key.set_contents_from_string(
tile_data,
headers={'Content-Type': format.mimetype},
policy='public-read',
reduced_redundancy=self.reduced_redundancy,
)

write_to_s3()

def read_tile(self, coord, format, layer):
key_name = s3_tile_key(
Expand Down Expand Up @@ -314,11 +354,11 @@ def list_tiles(self, format, layer):
def make_s3_store(bucket_name,
aws_access_key_id=None, aws_secret_access_key=None,
path='osm', reduced_redundancy=False, date_prefix='',
delete_retry_interval=60):
delete_retry_interval=60, logger=None):
conn = connect_s3(aws_access_key_id, aws_secret_access_key)
bucket = Bucket(conn, bucket_name)
s3_store = S3(bucket, date_prefix, path, reduced_redundancy,
delete_retry_interval)
delete_retry_interval, logger)
return s3_store


Expand Down Expand Up @@ -354,7 +394,7 @@ def write_tile_if_changed(store, tile_data, coord, format, layer):
return False


def make_store(yml, credentials={}):
def make_store(yml, credentials={}, logger=None):
store_type = yml.get('type')

if store_type == 'directory':
Expand All @@ -377,7 +417,7 @@ def make_store(yml, credentials={}):
return make_s3_store(
bucket, aws_access_key_id, aws_secret_access_key, path=path,
reduced_redundancy=reduced_redundancy, date_prefix=date_prefix,
delete_retry_interval=delete_retry_interval)
delete_retry_interval=delete_retry_interval, logger=logger)

else:
raise ValueError('Unrecognized store type: `{}`'.format(store_type))