Skip to content

Commit

Permalink
Merge pull request #1481 from jwhitlock/datamap-even-more-logging-840
Browse files Browse the repository at this point in the history
Datamap script: Limit process usage, handle Ctrl-C, more logging tweaks
  • Loading branch information
jwhitlock committed Jan 15, 2021
2 parents eaccd16 + 3ac10c3 commit e2b7c8b
Showing 1 changed file with 143 additions and 107 deletions.
250 changes: 143 additions & 107 deletions ichnaea/scripts/datamap.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def generate(
tiles_dir = os.path.abspath(os.path.join(output_dir, "tiles"))

if create:
LOG.debug("Generating tiles from datamap tables...")

# Export datamap table to CSV files
if not os.path.isdir(csv_dir):
os.mkdir(csv_dir)
Expand All @@ -116,6 +118,9 @@ def generate(
LOG.debug(
f"Exported {row_count:,} rows in {export_timer.duration_s:0.1f} seconds"
)
if result["row_count"] == 0:
LOG.debug("No rows to export, so no tiles to generate.")
return result

# Convert CSV files to per-table quadtrees
if os.path.isdir(quadtree_dir):
Expand Down Expand Up @@ -145,15 +150,24 @@ def generate(
result["tile_count"] = tile_count
result["render_duration_s"] = render_timer.duration_s
LOG.debug(
f"Rendered {tile_count} tiles in {render_timer.duration_s:0.1f} seconds"
f"Rendered {tile_count:,} tiles in {render_timer.duration_s:0.1f} seconds"
)

if upload:
LOG.debug(f"Syncing tiles to S3 bucket {bucket_name}...")

# Determine the sync plan by comparing S3 to the local tiles
# This function times itself
plan, unchanged_count = get_sync_plan(bucket_name, tiles_dir)

# Sync local tiles with S3 bucket
# Double concurrency since I/O rather than CPU bound
with Pool(processes=concurrency * 2) as pool, Timer() as sync_timer:
# Max tasks to free accumulated memory from the S3 clients
with Pool(
processes=concurrency * 2, maxtasksperchild=1000
) as pool, Timer() as sync_timer:
sync_counts = sync_tiles(
pool, bucket_name, tiles_dir, max_zoom, raven_client
pool, plan, bucket_name, tiles_dir, max_zoom, raven_client
)

result["sync_duration_s"] = sync_timer.duration_s
Expand All @@ -163,7 +177,8 @@ def generate(
f"{sync_counts['tile_new']:,} new, "
f"{sync_counts['tile_changed']:,} changed, "
f"{sync_counts['tile_deleted']:,} deleted, "
f"{sync_counts['tile_unchanged']:,} unchanged"
f"{sync_counts['tile_failed']:,} failed, "
f"{unchanged_count:,} unchanged"
)

upload_status_file(bucket_name, result)
Expand All @@ -189,8 +204,8 @@ def on_success(job_rows):
def on_progress(tables_complete, table_percent):
nonlocal result_rows
LOG.debug(
f" Exported {result_rows} row{'' if result_rows==1 else 's'}"
f" from {tables_complete} table{'' if tables_complete == 0 else 's'}"
f" Exported {result_rows:,} row{'' if result_rows==1 else 's'}"
f" from {tables_complete:,} table{'' if tables_complete==1 else 's'}"
f" ({table_percent:0.1%})"
)

Expand Down Expand Up @@ -229,6 +244,9 @@ def watch_jobs(
job_resp = job.get()
if on_success:
on_success(job_resp)
except KeyboardInterrupt:
# Skip Raven for Ctrl-C, reraise to halt execution
raise
except Exception as e:
if raven_client:
raven_client.captureException()
Expand All @@ -251,7 +269,7 @@ def on_progress(converted, percent):
if converted == 1:
LOG.debug(f" Converted 1 CSV to a quadtree ({percent:0.1%})")
else:
LOG.debug(f" Converted {converted} CSVs to quadtrees ({percent:0.1%})")
LOG.debug(f" Converted {converted:,} CSVs to quadtrees ({percent:0.1%})")

watch_jobs(jobs, on_progress=on_progress)
return len(jobs)
Expand Down Expand Up @@ -289,7 +307,107 @@ def render_tiles(pool, shapes_dir, tiles_dir, max_zoom):
return tile_count


def export_to_csv(filename, tablename, _db=None, _session=None):
def get_sync_plan(bucket_name, tiles_dir, bucket_prefix="tiles/"):
"""Compare S3 bucket and tiles directory to determine the sync plan."""

# Get objects currently in the S3 bucket
with Timer() as obj_timer:
objects = get_current_objects(bucket_name, bucket_prefix)
LOG.debug(
f"Found {len(objects):,} existing tiles in bucket {bucket_name},"
f" /{bucket_prefix} in {obj_timer.duration_s:0.1f} seconds"
)

# Determine what actions we are taking for each
with Timer() as action_timer:
actions, unchanged_count = get_sync_actions(tiles_dir, objects)
LOG.debug(
f"Completed sync actions in {action_timer.duration_s:0.1f} seconds,"
f" {len(actions['upload']):,} new"
f" tile{'' if len(actions['upload']) == 1 else 's'} to upload,"
f" {len(actions['update']):,} changed"
f" tile{'' if len(actions['update']) == 1 else 's'} to update,"
f" {len(actions['delete']):,} orphaned"
f" tile{'' if len(actions['delete']) == 1 else 's'} to delete,"
f" and {unchanged_count:,} unchanged"
f" tile{'' if unchanged_count == 1 else 's'}"
)

return actions, unchanged_count


def sync_tiles(
pool,
plan,
bucket_name,
tiles_dir,
max_zoom,
raven_client,
bucket_prefix="tiles/",
delete_batch_size=100,
):
"""Execute the plan to sync the local tiles to S3 bucket objects."""

result = {
"tile_new": 0,
"tile_changed": 0,
"tile_deleted": 0,
"tile_failed": 0,
}

# Queue the sync plan actions
jobs = []
for path in plan["upload"]:
jobs.append(
pool.apply_async(upload_file, (path, bucket_name, bucket_prefix, tiles_dir))
)
for path in plan["update"]:
jobs.append(
pool.apply_async(update_file, (path, bucket_name, bucket_prefix, tiles_dir))
)
total = len(plan["upload"]) + len(plan["update"])

# Queue the delete actions in batches
for paths in chunked(plan["delete"], delete_batch_size):
total += len(paths)
jobs.append(pool.apply_async(delete_files, (paths, bucket_name, bucket_prefix)))

# Watch sync jobs until completion
def on_success(job_result):
nonlocal result
tile_result, count = job_result
result[tile_result] += count

def on_error(exception):
nonlocal result
LOG.error(f"Exception while syncing: {exception}")
result["tile_failed"] += 1 # Would be wrong if a delete fails

def on_progress(jobs_complete, job_total):
nonlocal result, total
count = sum(result.values())
percent = count / total
LOG.debug(f" Synced {count:,} file{'' if count == 1 else 's'} ({percent:.1%})")

watch_jobs(jobs, on_progress=on_progress, on_success=on_success, on_error=on_error)
return result


def upload_status_file(bucket_name, runtime_data, bucket_prefix="tiles/"):
"""Upload the status file to S3"""

data = {"updated": util.utcnow().isoformat()}
data.update(runtime_data)
s3_client().put_object(
Body=dumps(data),
Bucket=bucket_name,
CacheControl="max-age=3600, public",
ContentType="application/json",
Key=bucket_prefix + "data.json",
)


def export_to_csv(filename, tablename, _session=None):
"""Export a datamap table to a CSV file."""
stmt = text(
"""\
Expand All @@ -306,7 +424,7 @@ def export_to_csv(filename, tablename, _db=None, _session=None):
)
)

db = configure_db("ro", _db=_db, pool=False)
db = configure_db("ro", pool=False)
min_grid = b""
limit = 200000

Expand Down Expand Up @@ -470,100 +588,6 @@ def generate_tile(
pngquant.wait()


def sync_tiles(
pool,
bucket_name,
tiles_dir,
max_zoom,
raven_client,
bucket_prefix="tiles/",
progress_seconds=5.0,
delete_batch_size=100,
):
"""Sync the local tiles to S3 bucket objects."""

# Get objects currently in the S3 bucket
with Timer() as obj_timer:
objects = get_current_objects(bucket_name, bucket_prefix)
LOG.debug(
f"Found {len(objects):,} existing tiles in bucket {bucket_name},"
f" /{bucket_prefix} in {obj_timer.duration_s:0.1f} seconds"
)

# Determine what actions we are taking for each
with Timer() as action_timer:
actions = get_sync_actions(tiles_dir, objects)
LOG.debug(
f"Completed sync plan in {action_timer.duration_s:0.1f} seconds,"
f" {len(actions['upload']):,} new"
f" tile{'' if len(actions['upload']) == 1 else 's'} to upload,"
f" {len(actions['update']):,} changed"
f" tile{'' if len(actions['update']) == 1 else 's'} to update,"
f" {len(actions['delete']):,} orphaned"
f" tile{'' if len(actions['delete']) == 1 else 's'} to delete,"
f" and {len(actions['none']):,} unchanged"
f" tile{'' if len(actions['none']) == 1 else 's'}"
)
result = {
"tile_new": 0,
"tile_changed": 0,
"tile_deleted": 0,
"tile_unchanged": 0,
}

# Queue the upload actions
jobs = []
for path in actions["upload"]:
jobs.append(
pool.apply_async(upload_file, (path, bucket_name, bucket_prefix, tiles_dir))
)
for path in actions["update"]:
jobs.append(
pool.apply_async(update_file, (path, bucket_name, bucket_prefix, tiles_dir))
)
total = len(actions["upload"]) + len(actions["update"])

# Queue the delete actions in batches
for paths in chunked(actions["delete"], delete_batch_size):
total += len(paths)
jobs.append(pool.apply_async(delete_files, (paths, bucket_name, bucket_prefix)))

# Watch sync jobs until completion
def on_success(job_result):
nonlocal result
tile_result, count = job_result
result[tile_result] += count

def on_error(exception):
nonlocal result
LOG.error(f"Exception while syncing: {exception}")
result["tile_failed"] = result.get("tile_failed", 0) + 1

def on_progress(jobs_complete, job_total):
nonlocal result, total
count = sum(result.values())
percent = count / total
LOG.debug(f" Synced {count:,} file{'' if count == 1 else 's'} ({percent:.1%})")

watch_jobs(jobs, on_progress=on_progress, on_success=on_success, on_error=on_error)
result["tile_unchanged"] = len(actions["none"])
return result


def upload_status_file(bucket_name, runtime_data, bucket_prefix="tiles/"):
"""Upload the status file to S3"""

data = {"updated": util.utcnow().isoformat()}
data.update(runtime_data)
s3_client().put_object(
Body=dumps(data),
Bucket=bucket_name,
CacheControl="max-age=3600, public",
ContentType="application/json",
Key=bucket_prefix + "data.json",
)


def s3_client():
"""
Initialize the s3 bucket client.
Expand All @@ -578,6 +602,12 @@ def s3_client():
return S3_CLIENT


def reset_s3_client():
"""Clear the S3 client, to free memory."""
global S3_CLIENT
S3_CLIENT = None


def get_current_objects(bucket_name, bucket_prefix):
"""Get names, sizes, and MD5 signatures of objects in the bucket."""

Expand Down Expand Up @@ -612,8 +642,8 @@ def get_sync_actions(tiles_dir, objects):
"upload": [],
"update": [],
"delete": [],
"none": [],
}
unchanged_count = 0
remaining_objects = set(objects.keys())

for png in get_png_entries(tiles_dir):
Expand All @@ -634,14 +664,14 @@ def get_sync_actions(tiles_dir, objects):
if changed:
actions["update"].append(obj_name)
else:
actions["none"].append(obj_name)
unchanged_count += 1
else:
# New object
actions["upload"].append(obj_name)

# Any remaining objects should be deleted
actions["delete"] = sorted(remaining_objects)
return actions
return actions, unchanged_count


def upload_file(path, bucket_name, bucket_prefix, tiles_dir):
Expand Down Expand Up @@ -859,6 +889,7 @@ def main(_argv=None, _raven_client=None, _bucket_name=None):

# Generate and upload the tiles
success = True
interrupted = False
result = {}
try:
with Timer() as timer:
Expand All @@ -881,6 +912,9 @@ def main(_argv=None, _raven_client=None, _bucket_name=None):
upload=upload,
concurrency=concurrency,
)
except KeyboardInterrupt:
interrupted = True
success = False
except Exception:
raven_client.captureException()
success = False
Expand All @@ -892,7 +926,9 @@ def main(_argv=None, _raven_client=None, _bucket_name=None):
task = "generation"
else:
task = "upload"
if success:
if interrupted:
complete = "interrupted"
elif success:
complete = "complete"
else:
complete = "failed"
Expand Down

0 comments on commit e2b7c8b

Please sign in to comment.