Skip to content

Commit

Permalink
Merge 784a498 into 2f86202
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrunner committed Nov 18, 2015
2 parents 2f86202 + 784a498 commit d75bb20
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 41 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Expand Up @@ -43,6 +43,7 @@ install:
- pip install bsddb3
- pip install -r requirements.txt
- pip install -r dev-requirements.txt
- pip install .
- touch tilecloud_chain/OpenLayers.js

script:
Expand All @@ -58,5 +59,5 @@ after_failure:

after_success:
# Report coverage results to coveralls.io
- sudo pip install coveralls
- pip install coveralls
- coveralls
20 changes: 20 additions & 0 deletions tilecloud_chain/__init__.py
Expand Up @@ -931,6 +931,12 @@ def get_geoms(self, layer, extent=None):
connection.close()
return layer_geoms

def add_local_process_filter(self): # pragma: no cover
self.ifilter(LocalProcessFilter(
self.config["ec2"]["number_process"],
self.options.local_process_number
))

def get_geoms_filter(self, layer, grid, geoms, queue_store=None):
return IntersectGeometryFilter(
grid=grid,
Expand Down Expand Up @@ -1371,6 +1377,20 @@ def __call__(self, tile):
return tile


class LocalProcessFilter: # pragma: no cover

def __init__(self, nb_process, process_nb):
self.nb_process = nb_process
self.process_nb = int(process_nb)

def filter(self, tilecoord):
nb = tilecoord.z + tilecoord.x / tilecoord.n + tilecoord.y / tilecoord.n
return nb % self.nb_process == self.process_nb

def __call__(self, tile):
return tile if self.filter(tile.tilecoord) else None


class IntersectGeometryFilter:

def __init__(self, grid, geoms=None, queue_store=None, px_buffer=0):
Expand Down
173 changes: 136 additions & 37 deletions tilecloud_chain/amazon.py
Expand Up @@ -6,7 +6,10 @@
import boto
import re
import socket
from os import environ
import subprocess
from os import path, environ
from time import sleep
from threading import Thread
from boto import sns
from datetime import timedelta
from subprocess import Popen, PIPE
Expand All @@ -19,6 +22,13 @@
logger = logging.getLogger(__name__)


def _get_path():
directory = path.dirname(sys.argv[0])
if len(directory) != 0: # pragma: no cover
directory += '/'
return directory


def main():
parser = ArgumentParser(
description='Used to generate the tiles from Amazon EC2, '
Expand Down Expand Up @@ -62,6 +72,14 @@ def main():
'--shutdown', default=False, action="store_true",
help='Shut done the remote host after the task.'
)
parser.add_argument(
'--wait', default=False, action="store_true",
help='Wait that all the tasks will finish.'
)
parser.add_argument(
'--local', default=False, action="store_true",
help='Run the generation locally'
)

options = parser.parse_args()
gene = TileGeneration(options.config, options, layer_name=options.layer)
Expand Down Expand Up @@ -94,7 +112,7 @@ def main():
else:
host = options.host

if options.geodata and 'geodata_folder' in gene.config['ec2']:
if not options.local and options.geodata and 'geodata_folder' in gene.config['ec2']:
print("==== Sync geodata ====")
ssh_options = ''
if 'ssh_options' in gene.config['ec2']: # pragma: no cover
Expand All @@ -106,7 +124,7 @@ def main():
host + ':' + gene.config['ec2']['geodata_folder']
])

if options.deploy_code:
if options.deploy_code and not options.local:
print("==== Sync and build code ====")
error = gene.validate(gene.config['ec2'], 'ec2', 'code_folder', required=True)
if error:
Expand All @@ -122,22 +140,23 @@ def main():
run_local(cmd)

for cmd in gene.config['ec2']['build_cmds']:
run_remote(cmd % environ, host, project_dir, gene)
run(options, cmd % environ, host, project_dir, gene)
if 'apache_content' in gene.config['ec2'] and 'apache_config' in gene.config['ec2']:
run_remote(
run(
options,
'echo %s > %s' % (
gene.config['ec2']['apache_content'],
gene.config['ec2']['apache_config']
), host, project_dir, gene
)
run_remote('sudo apache2ctl graceful', host, project_dir, gene)
run(options, 'sudo apache2ctl graceful', host, project_dir, gene)

# deploy
if options.deploy_database:
if options.deploy_database and not options.local:
_deploy(gene, host)

if options.deploy_code or options.deploy_database \
or options.geodata:
or options.geodata and not options.local:
# TODO not implemented yet
create_snapshot(host, gene)

Expand All @@ -146,13 +165,15 @@ def main():
arguments.extend(['--role', 'local'])
arguments.extend(['--time', str(options.time)])

project_dir = gene.config['ec2']['code_folder']
project_dir = None if options.local else gene.config['ec2']['code_folder']
processes = []
for i in range(gene.config['ec2']['number_process']):
processes.append(
run_remote_process(
'.build/venv/bin/generate_tiles ' +
' '.join([str(a) for a in arguments]), host, project_dir, gene
"%sgenerate_tiles %s" % (
_get_path(),
' '.join([str(a) for a in arguments])
), host, project_dir, gene
)
)

Expand Down Expand Up @@ -194,45 +215,86 @@ def main():
tile_size: %0.3f''' % (mean_time_ms, mean_time_ms, mean_size_kb))

if options.shutdown: # pragma: no cover
run_remote('sudo shutdown 0', host, project_dir, gene)
run(options, 'sudo shutdown 0', host, project_dir, gene)
sys.exit(0)

if options.fill_queue: # pragma: no cover
if options.fill_queue and not options.local: # pragma: no cover
print("==== Till queue ====")
# TODO test
arguments = _get_arguments(options)
arguments.extend(['--role', 'master'])
arguments.extend(['--role', 'master', '--quiet'])

project_dir = gene.config['ec2']['code_folder']
run_remote(
'.build/venv/bin/generate_tiles ' +
' '.join([str(a) for a in arguments]), host, project_dir, gene
run_remote_process(
options,
"%sgenerate_tiles %s" % (
_get_path(),
' '.join([str(a) for a in arguments])
), host, project_dir, gene
)
sleep(5)
attributes = gene.get_sqs_queue().get_attributes()
print(
"\rTiles to generate: %s/%s" % (
attributes['ApproximateNumberOfMessages'],
attributes['ApproximateNumberOfMessagesNotVisible'],
)
)

if options.tiles_gen: # pragma: no cover
print("==== Generate tiles ====")
# TODO test

if options.wait and not options.local:
print("")

class Status(Thread):
def run(self): # pragma: no cover
while True:
attributes = gene.get_sqs_queue().get_attributes()
print(
"\rTiles to generate/generating: %s/%s" % (
attributes['ApproximateNumberOfMessages'],
attributes['ApproximateNumberOfMessagesNotVisible'],
)
)

sleep(1)
status_thread = Status()
status_thread.setDaemon(True)
status_thread.start()

arguments = _get_arguments(options)
arguments.extend(['--role', 'slave'])
arguments.append("--daemonize")
arguments.extend(['--quiet'])
if not options.local:
arguments.extend(['--role', 'slave'])

project_dir = gene.config['ec2']['code_folder']
processes = []
project_dir = None if options.local else gene.config['ec2']['code_folder']
threads = []
for i in range(gene.config['ec2']['number_process']):
processes.append(
if options.local:
threads.append(run_local_process(
"%sgenerate_tiles --local-process-number %i %s" % (
_get_path(),
i, ' '.join([str(a) for a in arguments])
)
))
else:
run_remote_process(
'.build/venv/bin/generate_tiles ' +
' '.join([str(a) for a in arguments]), host, project_dir, gene)
)
"%sgenerate_tiles %s" % (
_get_path(),
' '.join([str(a) for a in arguments])
), host, project_dir, gene
)

if options.shutdown:
for p in processes:
p.communicate() # wait process end
else:
print('Tile generation started in background')
print('Tile generation started')

if options.shutdown:
run_remote('sudo shutdown 0')
run(options, 'sudo shutdown 0')

if options.wait and options.local:
while len(threads) > 0:
threads = [t for t in threads if t.is_alive()]
sleep(1)

if 'sns' in gene.config:
if 'region' in gene.config['sns']:
Expand Down Expand Up @@ -303,11 +365,22 @@ def run_local(cmd):

logger.debug('Run: %s.' % ' '.join([quote(c) for c in cmd]))
result = Popen(cmd, stdout=PIPE, stderr=PIPE).communicate()
logger.info(result[0])
logger.error(result[1])
if len(result[0]) != 0: # pragma: no cover
logger.info(result[0])
if len(result[1]) != 0:
logger.error(result[1])
return result


def run_local_process(cmd):
if type(cmd) != list:
cmd = cmd.split(' ')
logger.debug('Run: %s.' % ' '.join([quote(c) for c in cmd]))
task = Run(cmd)
task.start()
return task


def run_remote_process(remote_cmd, host, project_dir, gene):
cmd = ['ssh']
if 'ssh_options' in gene.config['ec2']: # pragma: no cover
Expand Down Expand Up @@ -335,10 +408,36 @@ def run_remote_process(remote_cmd, host, project_dir, gene):
return Popen(cmd, stdout=PIPE, stderr=PIPE)


def run_remote(remote_cmd, host, project_dir, gene):
class Run(Thread):
def __init__(self, cmd):
Thread.__init__(self)
if type(cmd) != list: # pragma: no cover
cmd = cmd.split(' ')
self.cmd = cmd

def run(self):
subprocess.call(self.cmd)


def run(options, cmd, host, project_dir, gene):
if options.local: # pragma: no cover
if type(cmd) != list:
cmd = cmd.split(' ')
subprocess.call(cmd)
else:
result = run_remote_process(cmd, host, project_dir, gene).communicate()
if len(result[0]) != 0:
logger.info(result[0])
if len(result[1]) != 0:
logger.error(result[1])


def run_remote(remote_cmd, host, project_dir, gene): # pragma: no cover
result = run_remote_process(remote_cmd, host, project_dir, gene).communicate()
logger.info(result[0])
logger.error(result[1])
if len(result[0]) != 0:
logger.info(result[0])
if len(result[1]) != 0:
logger.error(result[1])
return result


Expand Down
7 changes: 7 additions & 0 deletions tilecloud_chain/generate.py
Expand Up @@ -75,6 +75,9 @@ def gene(self, options, gene, layer):
gene.init_tilecoords()
gene.add_geom_filter()

if options.local_process_number is not None: # pragma: no cover
gene.add_local_process_filter()

elif options.role == 'slave':
# Get the metatiles from the SQS queue
gene.set_store(sqs_tilestore) # pragma: no cover
Expand Down Expand Up @@ -386,6 +389,10 @@ def main():
help='local/master/slave, master to file the queue and '
'slave to generate the tiles'
)
parser.add_argument(
"--local-process-number", default=None,
help="The number of process that we run in parallel"
)
parser.add_argument(
'--daemonize', default=False, action="store_true",
help='run as a daemon'
Expand Down
2 changes: 1 addition & 1 deletion tilecloud_chain/server.py
Expand Up @@ -41,7 +41,7 @@
from tilecloud_chain import TileGeneration

if sys.version_info.major >= 3:
buffer = memoryview
buffer = memoryview # pragma: no cover
else:
memoryview = buffer

Expand Down
2 changes: 1 addition & 1 deletion tilecloud_chain/tests/__init__.py
Expand Up @@ -34,7 +34,7 @@ def assert_result_equals(self, result, expected, regex=False):
else:
log.info(" %i %s" % (i, result[i]))
raise e
self.assertEqual(len(expected), len(result))
self.assertEqual(len(expected), len(result), repr(result))

def run_cmd(self, cmd, main_func):
old_stdout = sys.stdout
Expand Down

0 comments on commit d75bb20

Please sign in to comment.