-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add config filter parameter to simulate_factors() #14
Changes from 6 commits
3142c63
ab8ab48
190ff12
3383cf5
006567d
49242a8
a1a33b8
aa35322
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,12 @@ | |
from multiprocessing import cpu_count, Process, Queue | ||
from pprint import pprint | ||
from threading import Thread | ||
try: | ||
from future_builtins import filter | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use
|
||
except ImportError: | ||
# Assume Python 3, which already has filter built in | ||
pass | ||
|
||
import json | ||
import os | ||
import random | ||
|
@@ -21,6 +27,7 @@ | |
consume_multi_progress) | ||
from desmod.timescale import parse_time, scale_time | ||
from desmod.tracer import TraceManager | ||
from desmod.workspacesync.s3 import S3Sync | ||
|
||
|
||
class SimEnvironment(simpy.Environment): | ||
|
@@ -124,11 +131,12 @@ def schedule(self, delay=0): | |
class _Workspace(object): | ||
"""Context manager for workspace directory management.""" | ||
def __init__(self, config): | ||
self.config = config | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I maintain that it is important to fully extract all configuration from
We also want |
||
self.workspace = config.setdefault('meta.sim.workspace', | ||
config.setdefault('sim.workspace', | ||
os.curdir)) | ||
self.overwrite = config.setdefault('sim.workspace.overwrite', False) | ||
self.prev_dir = os.getcwd() | ||
self.orig_dir = os.getcwd() | ||
|
||
def __enter__(self): | ||
if os.path.relpath(self.workspace) != os.curdir: | ||
|
@@ -140,7 +148,20 @@ def __enter__(self): | |
os.chdir(self.workspace) | ||
|
||
def __exit__(self, *exc): | ||
os.chdir(self.prev_dir) | ||
os.chdir(self.orig_dir) | ||
try: | ||
os.chdir(self.config['sim.workspace']) | ||
self.sync() | ||
finally: | ||
os.chdir(self.orig_dir) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I question whether it is right for It seems like this paradigm of changing to the multi-sim session level directory ( To put a fine point on it, what I'm suggesting is that the control-flow here should be:
Sorry I didn't pick up on this in my first review. |
||
|
||
def sync(self): | ||
if self.config.setdefault('sim.sync.s3.enable', False): | ||
artifacts = [] | ||
for root, _, files in os.walk(os.curdir, topdown=False): | ||
for filename in files: | ||
artifacts.append(os.path.join(root, filename)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a little curious that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I originally had this logic in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
S3Sync(self.config, artifacts).sync() | ||
|
||
|
||
def simulate(config, top_type, env_type=SimEnvironment, reraise=True, | ||
|
@@ -205,7 +226,7 @@ def simulate(config, top_type, env_type=SimEnvironment, reraise=True, | |
|
||
|
||
def simulate_factors(base_config, factors, top_type, | ||
env_type=SimEnvironment, jobs=None): | ||
env_type=SimEnvironment, jobs=None, config_filter=None): | ||
"""Run multi-factor simulations in separate processes. | ||
|
||
The `factors` are used to compose specialized config dictionaries for the | ||
|
@@ -220,15 +241,20 @@ def simulate_factors(base_config, factors, top_type, | |
:param top_type: The model's top-level Component subclass. | ||
:param env_type: :class:`SimEnvironment` subclass. | ||
:param int jobs: User specified number of concurent processes. | ||
:param function config_filter: | ||
A function which will be passed a config and returns a bool to filter. | ||
:returns: Sequence of result dictionaries for each simulation. | ||
|
||
""" | ||
configs = list(factorial_config(base_config, factors, 'meta.sim.special')) | ||
ws = base_config.setdefault('sim.workspace', os.curdir) | ||
overwrite = base_config.setdefault('sim.workspace.overwrite', False) | ||
|
||
for index, config in enumerate(configs): | ||
config['meta.sim.index'] = index | ||
config['meta.sim.workspace'] = os.path.join(ws, str(index)) | ||
if config_filter is not None: | ||
configs[:] = filter(config_filter, configs) | ||
if overwrite and os.path.relpath(ws) != os.curdir and os.path.isdir(ws): | ||
shutil.rmtree(ws) | ||
return simulate_many(configs, top_type, env_type, jobs) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
"""Synchronization of workspace artifacts to Amazon S3 cloud storage.""" | ||
import os | ||
|
||
|
||
class S3Sync(object): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like this could have been implemented as a regular function instead of as a class with a single method. I.e.
versus:
I don't see it as imperative that we change this. It is not a user-facing API and we could thus revisit later if desired. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main reason I went with this approach to perhaps have common set of methods for multiple backend synchronizers, which could register themselves and perhaps one day be called in a loop like so: for cls in sync_registry:
cls(config, artifacts).sync() That said, that's a lot of hand waving, and we only have one sync backend now, so I probably shouldn't be solving problems that don't exist yet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even without using classes, each synchronization backend module could register its I do think there is at least one compelling case for having a class: extract and check the configuration before simulation. |
||
"""S3 workspace synchronization. | ||
|
||
The :class:`S3Sync` class implements a sync method that synchronizes the | ||
workspace artifacts to an S3 bucket with a configured key prefix. The | ||
s3 destination is of the format: /{prefix}/{workspace name}/{artifact path} | ||
|
||
:param dict config: A fully-initialized configuration dictionary. | ||
:param list artifacts: A list of artifact paths relative to the workspace | ||
directory. E.g., ["./0/results.yaml"]. | ||
|
||
""" | ||
|
||
MAX_THREADS = 10 | ||
|
||
def __init__(self, config, artifacts): | ||
self.client = None | ||
self.config = config | ||
self.workspace = config['sim.workspace'] | ||
self.artifacts = artifacts | ||
|
||
def _upload_artifact(self, artifact): | ||
dest = os.path.join( | ||
self.config['sim.sync.s3.prefix'], | ||
os.path.split(self.workspace)[1], | ||
(artifact[2:])) | ||
self.client.upload_file( | ||
artifact, self.config['sim.sync.s3.bucket'], dest) | ||
|
||
def sync(self): | ||
"""Concurrently upload the artifacts to s3.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A potential problem with I assume What is missing is any concept of removing files that exist in S3, but are not in the list of artifacts to synchronize. We would ostensibly need to interrogate S3 with However, doing that alone would leave extra per-simulation directories unaccounted for. E.g. using the same workspace, if I first run a multi-factor set of 10 simulations and then run again with only 5 simulations, even if we synchronized per-simulation directories 0..4 correclty, S3 would still contain bogus/stale directories 5..9. |
||
from concurrent.futures import ThreadPoolExecutor | ||
import boto3 | ||
|
||
self.config.setdefault('sim.sync.s3.prefix', '') | ||
self.client = boto3.client('s3') | ||
|
||
if len(self.artifacts) == 0: | ||
return | ||
|
||
futures = [] | ||
with ThreadPoolExecutor(max_workers=self.MAX_THREADS) as executor: | ||
for artifact in self.artifacts: | ||
futures.append( | ||
executor.submit(self._upload_artifact, artifact)) | ||
[future.result() for future in futures] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
boto3 >= 1.4.2 | ||
futures >= 3.1.1; python_version < '3.0' | ||
pyvcd >= 0.1.1 | ||
PyYAML >= 3.11 | ||
simpy >= 3.0.8 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ def config(): | |
'sim.vcd.start_time': '', | ||
'sim.vcd.stop_time': '', | ||
'sim.workspace': 'workspace', | ||
'sim.sync.s3.enable': False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like since |
||
'test.raise': False, | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
enumerate()
to completely unwind the changes in this function.