Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command to automatically prune tiles of interest #176

Merged
merged 14 commits into from Mar 24, 2017
Merged
8 changes: 7 additions & 1 deletion logging.conf.sample
@@ -1,5 +1,5 @@
[loggers]
keys=root,process,seed,intersect,drain,enqueue_tiles_of_interest,dump_tiles_of_interest,wof_process_neighbourhoods,query
keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,wof_process_neighbourhoods,query

[handlers]
keys=consoleHandler
Expand All @@ -11,6 +11,12 @@ keys=simpleFormatter
level=WARNING
handlers=consoleHandler

[logger_prune_tiles_of_interest]
level=INFO
handlers=consoleHandler
qualName=prune_tiles_of_interest
propagate=0

[logger_enqueue_tiles_of_interest]
level=INFO
handlers=consoleHandler
Expand Down
62 changes: 62 additions & 0 deletions tilequeue/command.py
Expand Up @@ -21,6 +21,7 @@
from tilequeue.tile import coord_int_zoom_up
from tilequeue.tile import coord_marshall_int
from tilequeue.tile import coord_unmarshall_int
from tilequeue.tile import create_coord
from tilequeue.tile import parse_expired_coord_string
from tilequeue.tile import seed_tiles
from tilequeue.tile import tile_generator_for_multiple_bounds
Expand Down Expand Up @@ -948,6 +949,65 @@ def tilequeue_enqueue_tiles_of_interest(cfg, peripherals):
logger.info('%d tiles of interest processed' % n_toi)


def tilequeue_prune_tiles_of_interest(cfg, peripherals):
logger = make_logger(cfg, 'prune_tiles_of_interest')
logger.info('Pruning tiles of interest')

logger.info('Fetching tiles of interest ...')
tiles_of_interest = peripherals.redis_cache_index.fetch_tiles_of_interest()
n_toi = len(tiles_of_interest)
logger.info('Fetching tiles of interest ... done. %s found', n_toi)

logger.info('Fetching tiles recently requested ...')
import psycopg2

redshift_uri = cfg.yml.get('redshift_uri')
assert redshift_uri, ("A redshift connection URI must "
"be present in the config yaml")

tiles_recently_requested = set()
with psycopg2.connect(redshift_uri) as conn:
with conn.cursor() as cur:
cur.execute("""
select x, y, z
from tile_traffic_v4
where (date >= dateadd(day, -30, current_date))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make this time window configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 071cc52.

and (z between 0 and 16)
and (x between 0 and pow(2,z)-1)
and (y between 0 and pow(2,z)-1)
group by z, x, y
order by z, x, y
);""")
n_trr = cur.rowcount
for (x, y, z) in cur:
coord = create_coord(x, y, z)
coord_int = coord_marshall_int(coord)
tiles_recently_requested.add(coord_int)

logger.info('Fetching tiles recently requested ... done. %s found', n_trr)

logger.info('Computing tiles of interest to remove ...')
toi_to_remove = tiles_of_interest - tiles_recently_requested
logger.info('Computing tiles of interest to remove ... done. %s found',
len(toi_to_remove))

logger.info('Removing tiles from TOI and S3 ...')

def delete_tile_of_interest(coord_int):
# Remove from the redis toi set
peripherals.redis_cache_index.remove_tile_of_interest(coord_int)

# Remove the tile from S3

pass

for coord_int in toi_to_remove:
# FIXME: Think about doing this in a thread/process pool
delete_tile_of_interest(coord_int)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also think about formalizing this a bit more and doing this out of process. What's the order of the amount that we've been managing in the past, several million?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't calculated the number of tiles that would be removed. I can do that now.

I was thinking about putting together an SQS queue and a worker process to do the deletes, but that seemed heavy-handed. Maybe a lambda task to do the delete?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about putting together an SQS queue and a worker process to do the deletes, but that seemed heavy-handed. Maybe a lambda task to do the delete?

I was thinking the same. It's operationally heavier, but I think we'll need something like that if we want to scale past multiple processes/threads on a single instance.

Maybe a good use case for batch?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a comprehensive list of event sources for lambda, but thinking about it more I think that's a reasonable option. One idea is that we can split up the list into groups of 10k or so, push those groups to a location on s3, and have lambda listen to that. Lambda would perform the delete and remove that object from s3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tested this on dev and each run of the delete_tile_of_interest() function (which deletes 1000 tiles at a time) takes ~2 seconds. With ~9 million tiles to remove, that'll take ~5 hours or so. Subsequent runs should be faster.


logger.info('Removing tiles from TOI and S3 ... done')


def tilequeue_tile_sizes(cfg, peripherals):
# find averages, counts, and medians for metro extract tiles
assert cfg.metro_extract_url
Expand Down Expand Up @@ -1237,6 +1297,8 @@ def tilequeue_main(argv_args=None):
create_command_parser(tilequeue_dump_tiles_of_interest)),
('enqueue-tiles-of-interest',
create_command_parser(tilequeue_enqueue_tiles_of_interest)),
('prune-tiles-of-interest',
create_command_parser(tilequeue_prune_tiles_of_interest)),
('tile-size', create_command_parser(tilequeue_tile_sizes)),
('wof-process-neighbourhoods', create_command_parser(
tilequeue_process_wof_neighbourhoods)),
Expand Down
4 changes: 4 additions & 0 deletions tilequeue/tile.py
Expand Up @@ -33,6 +33,10 @@ def deserialize_coord(coord_string):
return coord


def create_coord(x, y, z):
return Coordinate(row=x, column=y, zoom=z)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

column and row transposed

Coordinate(column=x, row=y, zoom=z)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 254ac0f.



def parse_expired_coord_string(coord_string):
# we use the same format in the queue as the expired tile list from
# osm2pgsql
Expand Down