From 897e2b6c410af86a99efea42dd3ee436d1814caa Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Tue, 22 Sep 2020 17:56:08 +0800 Subject: [PATCH 01/30] fix deployment issue in multi envs --- maro/utils/utils.py | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/maro/utils/utils.py b/maro/utils/utils.py index c63f61f81..0db65742f 100644 --- a/maro/utils/utils.py +++ b/maro/utils/utils.py @@ -82,22 +82,30 @@ def deploy(hide_info=True): error_list = [] try: clean_deployment_folder() - for target_dir, source_dir in target_source_pairs: - shutil.copytree(source_dir, target_dir) - # deploy success + # deploy started version_info = configparser.ConfigParser() version_info["MARO_DATA"] = {} version_info["MARO_DATA"]["version"] = __data_version__ version_info["MARO_DATA"]["deploy_time"] = str(int(time.time())) + version_info["MARO_DATA"]["deploy_status"] = "started" + with io.open(version_file_path, "w") as version_file: + version_info.write(version_file) + + for target_dir, source_dir in target_source_pairs: + shutil.copytree(source_dir, target_dir) + # deploy success + version_info["MARO_DATA"]["deploy_status"] = "deployed" with io.open(version_file_path, "w") as version_file: version_info.write(version_file) info_list.append("Data files for MARO deployed.") except Exception as e: + # deploy success error_list.append(f"An issue occured while deploying meta files for MARO. {e} Please run 'maro meta deploy' to deploy the data files.") - - for target_dir, _ in target_source_pairs: - if os.path.exists(target_dir): - shutil.rmtree(target_dir) + version_info["MARO_DATA"]["deploy_status"] = "failed" + with io.open(version_file_path, "w") as version_file: + version_info.write(version_file) + clean_deployment_folder() + finally: if len(error_list) > 0: for error in error_list: @@ -110,14 +118,15 @@ def deploy(hide_info=True): def check_deployment_status(): ret = False if os.path.exists(version_file_path): - with io.open(version_file_path, "r") as version_file: - version_info = configparser.ConfigParser() - version_info.read(version_file) - if "MARO_DATA" in version_info \ - and "deploy_time" in version_info["MARO_DATA"] \ - and "version" in version_info["MARO_DATA"] \ - and version_info["MARO_DATA"]["version"] == __data_version__: - ret = True + version_info = configparser.ConfigParser() + version_info.read(version_file_path) + if "MARO_DATA" in version_info \ + and "deploy_time" in version_info["MARO_DATA"] \ + and "version" in version_info["MARO_DATA"] \ + and "deploy_status" in version_info["MARO_DATA"] \ + and version_info["MARO_DATA"]["version"] == __data_version__ \ + and version_info["MARO_DATA"]["deploy_status"] != "failed" : + ret = True return ret From 79a995483dcd112bb4b3de6d24383671d54ff6f8 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Tue, 22 Sep 2020 21:07:11 +0800 Subject: [PATCH 02/30] fix typo --- maro/utils/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/maro/utils/utils.py b/maro/utils/utils.py index 0db65742f..4cd8684cf 100644 --- a/maro/utils/utils.py +++ b/maro/utils/utils.py @@ -99,13 +99,13 @@ def deploy(hide_info=True): version_info.write(version_file) info_list.append("Data files for MARO deployed.") except Exception as e: - # deploy success + # deploy failed error_list.append(f"An issue occured while deploying meta files for MARO. {e} Please run 'maro meta deploy' to deploy the data files.") version_info["MARO_DATA"]["deploy_status"] = "failed" with io.open(version_file_path, "w") as version_file: version_info.write(version_file) clean_deployment_folder() - + finally: if len(error_list) > 0: for error in error_list: From d9e117a08e559469aab4bf7b2894a46111c7723e Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Tue, 22 Sep 2020 21:15:28 +0800 Subject: [PATCH 03/30] fix ~/.maro not exist issue in build --- maro/utils/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/maro/utils/utils.py b/maro/utils/utils.py index 1d0cb0c54..cc1469b1f 100644 --- a/maro/utils/utils.py +++ b/maro/utils/utils.py @@ -84,6 +84,7 @@ def deploy(hide_info=True): clean_deployment_folder() # Deployment started. + os.makedirs(os.path.dirname(version_file_path), exist_ok=True) version_info = configparser.ConfigParser() version_info["MARO_DATA"] = {} version_info["MARO_DATA"]["version"] = __data_version__ From b94ff95720b9cd87eff53118b1a8b1e12df16b96 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Tue, 22 Sep 2020 21:32:29 +0800 Subject: [PATCH 04/30] skip deploy when build --- maro/utils/utils.py | 5 ++++- setup.py | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/maro/utils/utils.py b/maro/utils/utils.py index cc1469b1f..82c8cba39 100644 --- a/maro/utils/utils.py +++ b/maro/utils/utils.py @@ -121,7 +121,10 @@ def deploy(hide_info=True): def check_deployment_status(): ret = False - if os.path.exists(version_file_path): + skip_deployment = os.environ.get("SKIP_DEPLOY", "FALSE") + if skip_deployment == "TRUE": + ret = True + elif os.path.exists(version_file_path): version_info = configparser.ConfigParser() version_info.read(version_file_path) if "MARO_DATA" in version_info \ diff --git a/setup.py b/setup.py index e64bcb06b..ded55ef71 100644 --- a/setup.py +++ b/setup.py @@ -8,6 +8,9 @@ from setuptools.command.develop import develop import sys +# set environment variable to skip development process of MARO +os.environ["SKIP_DEPLOY"] = "TRUE" + from maro import __version__ # root path to backend From d0c94f43208f522d6ca9531fe1deee40f7e7f941 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Tue, 22 Sep 2020 22:33:52 +0800 Subject: [PATCH 05/30] update for comments --- maro/utils/utils.py | 2 +- setup.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/maro/utils/utils.py b/maro/utils/utils.py index 82c8cba39..1749fec59 100644 --- a/maro/utils/utils.py +++ b/maro/utils/utils.py @@ -121,7 +121,7 @@ def deploy(hide_info=True): def check_deployment_status(): ret = False - skip_deployment = os.environ.get("SKIP_DEPLOY", "FALSE") + skip_deployment = os.environ.get("SKIP_DEPLOYMENT", "FALSE") if skip_deployment == "TRUE": ret = True elif os.path.exists(version_file_path): diff --git a/setup.py b/setup.py index ded55ef71..cfb2eca29 100644 --- a/setup.py +++ b/setup.py @@ -8,8 +8,8 @@ from setuptools.command.develop import develop import sys -# set environment variable to skip development process of MARO -os.environ["SKIP_DEPLOY"] = "TRUE" +# Set environment variable to skip deployment process of MARO +os.environ["SKIP_DEPLOYMENT"] = "TRUE" from maro import __version__ From 13668711a2715ce179fa576f20e6f787f539cbd8 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 23 Sep 2020 11:42:40 +0800 Subject: [PATCH 06/30] temporarily disable weather info --- maro/cli/data_pipeline/citi_bike.py | 6 ++++-- .../scenarios/citi_bike/business_engine.py | 18 +++++++++++------- maro/utils/utils.py | 4 ++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index dc76cf150..1794a50a7 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -364,7 +364,8 @@ class CitiBikeTopology(DataTopology): def __init__(self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False): super().__init__() self._data_pipeline["trip"] = CitiBikePipeline(topology, trip_source, station_info, is_temp) - self._data_pipeline["weather"] = WeatherPipeline(topology, weather_source, is_temp) + # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # self._data_pipeline["weather"] = WeatherPipeline(topology, weather_source, is_temp) self._is_temp = is_temp def __del__(self): @@ -585,7 +586,8 @@ def __init__(self, topology: str, config_path: str, is_temp: bool = False): with open(config_path) as fp: cfg = safe_load(fp) self._data_pipeline["trip"] = CitiBikeToyPipeline(start_time=cfg["start_time"], end_time=cfg["end_time"], stations=cfg["stations"], trips=cfg["trips"], topology=topology, is_temp=is_temp) - self._data_pipeline["weather"] = WeatherToyPipeline(topology=topology, start_time=cfg["start_time"], end_time=cfg["end_time"], is_temp=is_temp) + # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # self._data_pipeline["weather"] = WeatherToyPipeline(topology=topology, start_time=cfg["start_time"], end_time=cfg["end_time"], is_temp=is_temp) else: logger.warning(f"Config file {config_path} for toy topology {topology} not found.") diff --git a/maro/simulator/scenarios/citi_bike/business_engine.py b/maro/simulator/scenarios/citi_bike/business_engine.py index bf2bdaf7d..4641e35cc 100644 --- a/maro/simulator/scenarios/citi_bike/business_engine.py +++ b/maro/simulator/scenarios/citi_bike/business_engine.py @@ -203,7 +203,8 @@ def _init(self): if (not os.path.exists(weather_data_path)) or (not os.path.exists(trip_data_path)): self._build_temp_data() - self._weather_lut = WeatherTable(self._conf["weather_data"], self._time_zone) + # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # self._weather_lut = WeatherTable(self._conf["weather_data"], self._time_zone) self._trip_reader = BinaryReader(self._conf["trip_data"]) @@ -301,7 +302,8 @@ def _update_station_extra_features(self, tick: int): self._last_date = cur_datetime - weather_info = self._weather_lut[cur_datetime] + # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # weather_info = self._weather_lut[cur_datetime] weekday = cur_datetime.weekday() holiday = cur_datetime in self._us_holidays @@ -310,9 +312,10 @@ def _update_station_extra_features(self, tick: int): weather = 0 temperature = 0 - if weather_info is not None: - weather = weather_info.weather - temperature = weather_info.temp + # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # if weather_info is not None: + # weather = weather_info.weather + # temperature = weather_info.temp for station in self._stations: station.weekday = weekday @@ -464,8 +467,9 @@ def _build_temp_data(self): self._citi_bike_data_pipeline.build() build_folders = self._citi_bike_data_pipeline.get_build_folders() trip_folder = build_folders["trip"] - weather_folder = build_folders["weather"] - self._conf["weather_data"] = chagne_file_path(self._conf["weather_data"], weather_folder) + # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # weather_folder = build_folders["weather"] + # self._conf["weather_data"] = chagne_file_path(self._conf["weather_data"], weather_folder) self._conf["trip_data"] = chagne_file_path(self._conf["trip_data"], trip_folder) self._conf["stations_init_data"] = chagne_file_path(self._conf["stations_init_data"], trip_folder) self._conf["distance_adj_data"] = chagne_file_path(self._conf["distance_adj_data"], trip_folder) diff --git a/maro/utils/utils.py b/maro/utils/utils.py index 1749fec59..4b57410aa 100644 --- a/maro/utils/utils.py +++ b/maro/utils/utils.py @@ -68,8 +68,8 @@ def set_seeds(seed): target_source_pairs = [ (os.path.expanduser("~/.maro/data/citi_bike/meta"), os.path.join(project_root, "simulator/scenarios/citi_bike/meta")), - (os.path.expanduser("~/.maro/data/ecr/meta"), - os.path.join(project_root, "simulator/scenarios/ecr/meta")), + (os.path.expanduser("~/.maro/data/cim/meta"), + os.path.join(project_root, "simulator/scenarios/cim/meta")), (os.path.expanduser("~/.maro/lib/k8s"), os.path.join(project_root, "cli/k8s/lib")), (os.path.expanduser("~/.maro/lib/grass"), From afdb600d1c2bca39a8f8b13558786624f0ded5c9 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 23 Sep 2020 11:50:21 +0800 Subject: [PATCH 07/30] replace ecr with cim in setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index dfa4bea08..4463ced1f 100644 --- a/setup.py +++ b/setup.py @@ -121,7 +121,7 @@ packages=find_packages(), include_package_data=True, package_data={ - "maro.simulator.scenarios.ecr": ["topologies/*/*.yml", "meta/*.yml"], + "maro.simulator.scenarios.cim": ["topologies/*/*.yml", "meta/*.yml"], "maro.simulator.scenarios.citi_bike": ["topologies/*/*.yml", "meta/*.yml"], "maro.cli.k8s": ["lib/**/*"], "maro.cli.grass": ["lib/**/*"], From 5f6dd920fc5971a5e0756aaa98b19cfc0ff02f7f Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 23 Sep 2020 11:54:45 +0800 Subject: [PATCH 08/30] replace ecr in manifest --- MANIFEST.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MANIFEST.in b/MANIFEST.in index 0ab99dcc1..2ddb7a72f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -5,5 +5,5 @@ prune examples prune samples include maro/backends/*.pxd -include maro/simulator/scenarios/ecr/topologies/*/*.yml +include maro/simulator/scenarios/cim/topologies/*/*.yml include maro/simulator/scenarios/citi_bike/topologies/*/*.yml From 92b31c01128647ad4d60073b50951e19177093b6 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 23 Sep 2020 12:10:35 +0800 Subject: [PATCH 09/30] remove weather check when read data --- maro/simulator/scenarios/citi_bike/business_engine.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/maro/simulator/scenarios/citi_bike/business_engine.py b/maro/simulator/scenarios/citi_bike/business_engine.py index 4641e35cc..ce96d6c89 100644 --- a/maro/simulator/scenarios/citi_bike/business_engine.py +++ b/maro/simulator/scenarios/citi_bike/business_engine.py @@ -200,7 +200,9 @@ def _init(self): if trip_data_path.startswith("~"): trip_data_path = os.path.expanduser(trip_data_path) - if (not os.path.exists(weather_data_path)) or (not os.path.exists(trip_data_path)): + # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # if (not os.path.exists(weather_data_path)) or (not os.path.exists(trip_data_path)): + if not os.path.exists(trip_data_path): self._build_temp_data() # Weather data source changed, temporarily disable, will enable it later when new data source is available. From aa8567c5d82664ebc9d91182f12e12bf79f6e8a6 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 23 Sep 2020 12:21:57 +0800 Subject: [PATCH 10/30] fix station id issue --- maro/simulator/scenarios/citi_bike/business_engine.py | 2 +- maro/simulator/scenarios/citi_bike/station.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/maro/simulator/scenarios/citi_bike/business_engine.py b/maro/simulator/scenarios/citi_bike/business_engine.py index ce96d6c89..591d1614b 100644 --- a/maro/simulator/scenarios/citi_bike/business_engine.py +++ b/maro/simulator/scenarios/citi_bike/business_engine.py @@ -254,7 +254,7 @@ def _init_stations(self, stations_states: list): # get related station, and set the init states station = self._stations[state.index] - station.set_init_state(state.bikes, state.capacity) + station.set_init_state(state.bikes, state.capacity, state.id) def _init_adj_matrix(self): # our distance adj diff --git a/maro/simulator/scenarios/citi_bike/station.py b/maro/simulator/scenarios/citi_bike/station.py index b516a1fcd..2186f86d7 100644 --- a/maro/simulator/scenarios/citi_bike/station.py +++ b/maro/simulator/scenarios/citi_bike/station.py @@ -18,6 +18,7 @@ class Station(NodeBase): fulfillment = NodeAttribute("i") capacity = NodeAttribute("i") + id = NodeAttribute("i") # additional features weekday = NodeAttribute("i2") @@ -34,11 +35,13 @@ class Station(NodeBase): def __init__(self): self._init_capacity = 0 # internal use for reset self._init_bikes = 0 # internal use for reset + self._id = 0 # original id in data file - def set_init_state(self, bikes:int, capacity:int): + def set_init_state(self, bikes:int, capacity:int, id: int): """set initialize state, usually for 1st using""" self._init_bikes = bikes self._init_capacity = capacity + self._id = id self.reset() @@ -48,6 +51,7 @@ def reset(self): self.capacity = self._init_capacity self.bikes = self._init_bikes self.min_bikes = self._init_bikes + self.id = self._id def _on_bikes_changed(self, value: int): """Update min bikes after bikes changed""" From 12d59acfbb043c1202ecb55a655a90157d4d2000 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 23 Sep 2020 12:30:24 +0800 Subject: [PATCH 11/30] fix format --- maro/simulator/scenarios/citi_bike/station.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maro/simulator/scenarios/citi_bike/station.py b/maro/simulator/scenarios/citi_bike/station.py index 2186f86d7..c21eac545 100644 --- a/maro/simulator/scenarios/citi_bike/station.py +++ b/maro/simulator/scenarios/citi_bike/station.py @@ -37,7 +37,7 @@ def __init__(self): self._init_bikes = 0 # internal use for reset self._id = 0 # original id in data file - def set_init_state(self, bikes:int, capacity:int, id: int): + def set_init_state(self, bikes:int, capacity:int, id:int): """set initialize state, usually for 1st using""" self._init_bikes = bikes self._init_capacity = capacity From e2d4d435c5a473666d19ff66ae7667526ad210a8 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 23 Sep 2020 13:51:03 +0800 Subject: [PATCH 12/30] add TODO in comments --- maro/cli/data_pipeline/citi_bike.py | 4 ++-- maro/simulator/scenarios/citi_bike/business_engine.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index 1794a50a7..21bf58ba0 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -364,7 +364,7 @@ class CitiBikeTopology(DataTopology): def __init__(self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False): super().__init__() self._data_pipeline["trip"] = CitiBikePipeline(topology, trip_source, station_info, is_temp) - # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. # self._data_pipeline["weather"] = WeatherPipeline(topology, weather_source, is_temp) self._is_temp = is_temp @@ -586,7 +586,7 @@ def __init__(self, topology: str, config_path: str, is_temp: bool = False): with open(config_path) as fp: cfg = safe_load(fp) self._data_pipeline["trip"] = CitiBikeToyPipeline(start_time=cfg["start_time"], end_time=cfg["end_time"], stations=cfg["stations"], trips=cfg["trips"], topology=topology, is_temp=is_temp) - # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. # self._data_pipeline["weather"] = WeatherToyPipeline(topology=topology, start_time=cfg["start_time"], end_time=cfg["end_time"], is_temp=is_temp) else: logger.warning(f"Config file {config_path} for toy topology {topology} not found.") diff --git a/maro/simulator/scenarios/citi_bike/business_engine.py b/maro/simulator/scenarios/citi_bike/business_engine.py index 591d1614b..c269ed152 100644 --- a/maro/simulator/scenarios/citi_bike/business_engine.py +++ b/maro/simulator/scenarios/citi_bike/business_engine.py @@ -200,12 +200,12 @@ def _init(self): if trip_data_path.startswith("~"): trip_data_path = os.path.expanduser(trip_data_path) - # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. # if (not os.path.exists(weather_data_path)) or (not os.path.exists(trip_data_path)): if not os.path.exists(trip_data_path): self._build_temp_data() - # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. # self._weather_lut = WeatherTable(self._conf["weather_data"], self._time_zone) self._trip_reader = BinaryReader(self._conf["trip_data"]) @@ -304,7 +304,7 @@ def _update_station_extra_features(self, tick: int): self._last_date = cur_datetime - # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. # weather_info = self._weather_lut[cur_datetime] weekday = cur_datetime.weekday() @@ -314,7 +314,7 @@ def _update_station_extra_features(self, tick: int): weather = 0 temperature = 0 - # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. # if weather_info is not None: # weather = weather_info.weather # temperature = weather_info.temp @@ -469,7 +469,7 @@ def _build_temp_data(self): self._citi_bike_data_pipeline.build() build_folders = self._citi_bike_data_pipeline.get_build_folders() trip_folder = build_folders["trip"] - # Weather data source changed, temporarily disable, will enable it later when new data source is available. + # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. # weather_folder = build_folders["weather"] # self._conf["weather_data"] = chagne_file_path(self._conf["weather_data"], weather_folder) self._conf["trip_data"] = chagne_file_path(self._conf["trip_data"], trip_folder) From 5bee6aef3d46542f6f0eeff86e87f12dfea06554 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Thu, 24 Sep 2020 18:03:07 +0800 Subject: [PATCH 13/30] add noaa weather source --- maro/cli/data_pipeline/citi_bike.py | 77 +++++++++++++++++-- ...{ecr.stops.meta.yml => cim.stops.meta.yml} | 0 .../scenarios/citi_bike/business_engine.py | 22 ++---- .../scenarios/citi_bike/meta/source_urls.yml | 62 +++++++++++++++ 4 files changed, 141 insertions(+), 20 deletions(-) rename maro/simulator/scenarios/cim/meta/{ecr.stops.meta.yml => cim.stops.meta.yml} (100%) diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index 21bf58ba0..1b9139fb6 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -364,8 +364,7 @@ class CitiBikeTopology(DataTopology): def __init__(self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False): super().__init__() self._data_pipeline["trip"] = CitiBikePipeline(topology, trip_source, station_info, is_temp) - # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. - # self._data_pipeline["weather"] = WeatherPipeline(topology, weather_source, is_temp) + self._data_pipeline["weather"] = NOAAWeatherPipeline(topology, weather_source, is_temp) self._is_temp = is_temp def __del__(self): @@ -586,8 +585,7 @@ def __init__(self, topology: str, config_path: str, is_temp: bool = False): with open(config_path) as fp: cfg = safe_load(fp) self._data_pipeline["trip"] = CitiBikeToyPipeline(start_time=cfg["start_time"], end_time=cfg["end_time"], stations=cfg["stations"], trips=cfg["trips"], topology=topology, is_temp=is_temp) - # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. - # self._data_pipeline["weather"] = WeatherToyPipeline(topology=topology, start_time=cfg["start_time"], end_time=cfg["end_time"], is_temp=is_temp) + self._data_pipeline["weather"] = WeatherToyPipeline(topology=topology, start_time=cfg["start_time"], end_time=cfg["end_time"], is_temp=is_temp) else: logger.warning(f"Config file {config_path} for toy topology {topology} not found.") @@ -617,7 +615,74 @@ def __init__(self, is_temp: bool = False): self._conf = safe_load(fp) for topology in self._conf["trips"].keys(): if topology.startswith("toy"): - self.topologies[topology] = CitiBikeToyTopology(topology=topology, config_path=self._conf["trips"][topology]["toy_meta_path"], is_temp=is_temp) + self.topologies[topology] = CitiBikeToyTopology(topology=topology, + config_path=self._conf["trips"][topology]["toy_meta_path"], + is_temp=is_temp) else: self.topologies[topology] = CitiBikeTopology(topology=topology, trip_source=self._conf["trips"][topology]["trip_remote_url"], - station_info=self._conf["station_info"]["ny_station_info_url"], weather_source=self._conf["weather"]["ny_weather_url"], is_temp=is_temp) + station_info=self._conf["station_info"]["ny_station_info_url"], + weather_source=self._conf["weather"][topology]["noaa_weather_url"], + is_temp=is_temp) + + +class NOAAWeatherPipeline(WeatherPipeline): + + def __init__(self, topology: str, source: str, is_temp: bool = False): + """ + Generate weather data bin for the specified topology from frontierweather.com. + Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. + Folder structure: + ~/.maro + /data/citi_bike/[topology] + /_build bin data file + /source + /_download original data file + /_clean cleaned data file + /temp download temp file + + Args: + topology(str): topology name of the data file + source(str): source url of original data file + is_temp(bool): (optional) if the data file is temporary + """ + super().__init__(topology, source, is_temp) + + def clean(self): + super().clean() + if os.path.exists(self._download_file): + self._new_file_list.append(self._clean_file) + logger.info_green("Cleaning weather data") + self._preprocess(input_file=self._download_file, output_file=self._clean_file) + else: + logger.warning(f"Not found downloaded weather data: {self._download_file}") + + def _weather(self, row): + water = row["PRCP"] if row["PRCP"] is not None else 0.0 + + snow = row["SNOW"] if row["SNOW"] is not None else 0.0 + + if snow > 0.0 and water > 0: + return WeatherPipeline.WeatherEnum.SLEET.value + elif water > 0.0: + return WeatherPipeline.WeatherEnum.RAINY.value + elif snow > 0.0: + return WeatherPipeline.WeatherEnum.SNOWY.value + else: + return WeatherPipeline.WeatherEnum.SUNNY.value + + def _preprocess(self, input_file: str, output_file: str): + data: pd.DataFrame = pd.DataFrame() + + with open(input_file, "rt") as fp: + org_data = pd.read_csv(fp) + org_data["PRCP"] = pd.to_numeric(org_data["PRCP"], errors="coerce", downcast="integer") + org_data["SNOW"] = pd.to_numeric(org_data["SNOW"], errors="coerce", downcast="integer") + org_data["TMAX"] = pd.to_numeric(org_data["TMAX"], errors="coerce", downcast="integer") + org_data["TMIN"] = pd.to_numeric(org_data["TMIN"], errors="coerce", downcast="integer") + + data["date"] = org_data["DATE"] + data["weather"] = org_data.apply(self._weather, axis=1) + data["temp"] = (org_data["TMAX"] + org_data["TMIN"])/2 + + with open(output_file, mode="w", encoding="utf-8", newline="") as f: + data.to_csv(f, index=False, header=True) diff --git a/maro/simulator/scenarios/cim/meta/ecr.stops.meta.yml b/maro/simulator/scenarios/cim/meta/cim.stops.meta.yml similarity index 100% rename from maro/simulator/scenarios/cim/meta/ecr.stops.meta.yml rename to maro/simulator/scenarios/cim/meta/cim.stops.meta.yml diff --git a/maro/simulator/scenarios/citi_bike/business_engine.py b/maro/simulator/scenarios/citi_bike/business_engine.py index 1c9ab259c..274622343 100644 --- a/maro/simulator/scenarios/citi_bike/business_engine.py +++ b/maro/simulator/scenarios/citi_bike/business_engine.py @@ -202,13 +202,10 @@ def _init(self): if trip_data_path.startswith("~"): trip_data_path = os.path.expanduser(trip_data_path) - # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. - # if (not os.path.exists(weather_data_path)) or (not os.path.exists(trip_data_path)): - if not os.path.exists(trip_data_path): + if (not os.path.exists(weather_data_path)) or (not os.path.exists(trip_data_path)): self._build_temp_data() - # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. - # self._weather_lut = WeatherTable(self._conf["weather_data"], self._time_zone) + self._weather_lut = WeatherTable(self._conf["weather_data"], self._time_zone) self._trip_reader = BinaryReader(self._conf["trip_data"]) @@ -305,8 +302,7 @@ def _update_station_extra_features(self, tick: int): self._last_date = cur_datetime - # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. - # weather_info = self._weather_lut[cur_datetime] + weather_info = self._weather_lut[cur_datetime] weekday = cur_datetime.weekday() holiday = cur_datetime in self._us_holidays @@ -315,10 +311,9 @@ def _update_station_extra_features(self, tick: int): weather = 0 temperature = 0 - # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. - # if weather_info is not None: - # weather = weather_info.weather - # temperature = weather_info.temp + if weather_info is not None: + weather = weather_info.weather + temperature = weather_info.temp for station in self._stations: station.weekday = weekday @@ -471,9 +466,8 @@ def _build_temp_data(self): self._citi_bike_data_pipeline.build() build_folders = self._citi_bike_data_pipeline.get_build_folders() trip_folder = build_folders["trip"] - # TODO: Weather data source changed, temporarily disable, will enable it later when new data source is available. - # weather_folder = build_folders["weather"] - # self._conf["weather_data"] = chagne_file_path(self._conf["weather_data"], weather_folder) + weather_folder = build_folders["weather"] + self._conf["weather_data"] = chagne_file_path(self._conf["weather_data"], weather_folder) self._conf["trip_data"] = chagne_file_path(self._conf["trip_data"], trip_folder) self._conf["stations_init_data"] = chagne_file_path(self._conf["stations_init_data"], trip_folder) self._conf["distance_adj_data"] = chagne_file_path(self._conf["distance_adj_data"], trip_folder) diff --git a/maro/simulator/scenarios/citi_bike/meta/source_urls.yml b/maro/simulator/scenarios/citi_bike/meta/source_urls.yml index 2b64057d2..5c8b7a012 100644 --- a/maro/simulator/scenarios/citi_bike/meta/source_urls.yml +++ b/maro/simulator/scenarios/citi_bike/meta/source_urls.yml @@ -69,5 +69,67 @@ trips: weather: ny_weather_url: "http://www.frontierweather.com/historicaldataonly/KNYC_daily.txt" + ny.201801: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-01-01&endDate=2018-02-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201802: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-02-01&endDate=2018-03-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201803: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-03-01&endDate=2018-04-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201804: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-04-01&endDate=2018-05-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201805: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-05-01&endDate=2018-06-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201806: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-06-01&endDate=2018-07-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201807: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-07-01&endDate=2018-08-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201808: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-08-01&endDate=2018-09-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201809: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-09-01&endDate=2018-10-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201810: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-10-01&endDate=2018-11-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201811: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-11-01&endDate=2018-12-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201812: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-12-01&endDate=2019-01-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201901: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-01-01&endDate=2019-02-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201902: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-02-01&endDate=2019-03-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201903: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-03-01&endDate=2019-04-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201904: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-04-01&endDate=2019-05-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201905: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-05-01&endDate=2019-06-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201906: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-06-01&endDate=2019-07-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201907: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-07-01&endDate=2019-08-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201908: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-08-01&endDate=2019-09-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201909: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-09-01&endDate=2019-10-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201910: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-10-01&endDate=2019-11-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201911: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-11-01&endDate=2019-12-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.201912: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2019-12-01&endDate=2020-01-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.202001: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2020-01-01&endDate=2020-02-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.202002: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2020-02-01&endDate=2020-03-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.202003: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2020-03-01&endDate=2020-04-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.202004: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2020-04-01&endDate=2020-05-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.202005: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2020-05-01&endDate=2020-06-01&boundingBox=40.78,-74.0,40.76,-73.7" + ny.202006: + noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2020-06-01&endDate=2020-07-01&boundingBox=40.78,-74.0,40.76,-73.7" + + station_info: ny_station_info_url: "https://gbfs.citibikenyc.com/gbfs/en/station_information.json" \ No newline at end of file From 8b2d9180a19f03cbc3e9dd4f355344a36260d619 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Thu, 24 Sep 2020 22:01:04 +0800 Subject: [PATCH 14/30] fix weather reset and weather comment --- maro/cli/data_pipeline/citi_bike.py | 2 +- maro/simulator/scenarios/citi_bike/business_engine.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index 1b9139fb6..22072b214 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -629,7 +629,7 @@ class NOAAWeatherPipeline(WeatherPipeline): def __init__(self, topology: str, source: str, is_temp: bool = False): """ - Generate weather data bin for the specified topology from frontierweather.com. + Generate weather data bin for the specified topology from ncei.noaa.gov. Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. Folder structure: ~/.maro diff --git a/maro/simulator/scenarios/citi_bike/business_engine.py b/maro/simulator/scenarios/citi_bike/business_engine.py index 274622343..0e19ca13e 100644 --- a/maro/simulator/scenarios/citi_bike/business_engine.py +++ b/maro/simulator/scenarios/citi_bike/business_engine.py @@ -162,6 +162,8 @@ def reset(self): self._decision_strategy.reset() + self._last_date = None + def get_agent_idx_list(self) -> List[int]: return [station.index for station in self._stations] @@ -296,7 +298,7 @@ def _tick_2_date(self, tick: int): def _update_station_extra_features(self, tick: int): """update features that not related to trips""" cur_datetime = self._tick_2_date(tick) - + if self._last_date == cur_datetime: return From b33d755c604fcd9be305effdb94238d64e0f4af9 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Fri, 25 Sep 2020 11:01:08 +0800 Subject: [PATCH 15/30] add comment for weather data url --- maro/simulator/scenarios/citi_bike/meta/source_urls.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maro/simulator/scenarios/citi_bike/meta/source_urls.yml b/maro/simulator/scenarios/citi_bike/meta/source_urls.yml index 5c8b7a012..a2d88918a 100644 --- a/maro/simulator/scenarios/citi_bike/meta/source_urls.yml +++ b/maro/simulator/scenarios/citi_bike/meta/source_urls.yml @@ -68,7 +68,7 @@ trips: weather: ny_weather_url: "http://www.frontierweather.com/historicaldataonly/KNYC_daily.txt" - + # Use www.ncei.noaa.gov's daily-summaries data of "US NY NY CENTRAL PARK, NY US" station (USW00094728) to represent the weather conditions in New York City. ny.201801: noaa_weather_url: "https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&dataTypes=PRCP,SNOW,TMAX,TMIN,AWND&stations=USW00094728&startDate=2018-01-01&endDate=2018-02-01&boundingBox=40.78,-74.0,40.76,-73.7" ny.201802: From 6bf42435361eaf1511c83f530320cbd38b288f72 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Fri, 25 Sep 2020 11:15:25 +0800 Subject: [PATCH 16/30] some format update --- maro/simulator/scenarios/citi_bike/business_engine.py | 2 +- maro/simulator/scenarios/citi_bike/meta/source_urls.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/maro/simulator/scenarios/citi_bike/business_engine.py b/maro/simulator/scenarios/citi_bike/business_engine.py index 0e19ca13e..de4c056c4 100644 --- a/maro/simulator/scenarios/citi_bike/business_engine.py +++ b/maro/simulator/scenarios/citi_bike/business_engine.py @@ -298,7 +298,7 @@ def _tick_2_date(self, tick: int): def _update_station_extra_features(self, tick: int): """update features that not related to trips""" cur_datetime = self._tick_2_date(tick) - + if self._last_date == cur_datetime: return diff --git a/maro/simulator/scenarios/citi_bike/meta/source_urls.yml b/maro/simulator/scenarios/citi_bike/meta/source_urls.yml index a2d88918a..e5b0a207f 100644 --- a/maro/simulator/scenarios/citi_bike/meta/source_urls.yml +++ b/maro/simulator/scenarios/citi_bike/meta/source_urls.yml @@ -132,4 +132,4 @@ weather: station_info: - ny_station_info_url: "https://gbfs.citibikenyc.com/gbfs/en/station_information.json" \ No newline at end of file + ny_station_info_url: "https://gbfs.citibikenyc.com/gbfs/en/station_information.json" From c8c4c6f999393aa793561187bcff76d854f1a39b Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Sun, 27 Sep 2020 20:45:52 +0800 Subject: [PATCH 17/30] add fall back function in weather download --- maro/cli/data_pipeline/base.py | 69 +++++++++++------ maro/cli/data_pipeline/citi_bike.py | 115 +++++++++++++++------------- maro/utils/logger.py | 3 +- 3 files changed, 111 insertions(+), 76 deletions(-) diff --git a/maro/cli/data_pipeline/base.py b/maro/cli/data_pipeline/base.py index 50611f898..3bbefe697 100644 --- a/maro/cli/data_pipeline/base.py +++ b/maro/cli/data_pipeline/base.py @@ -8,6 +8,7 @@ from abc import ABC, abstractmethod from maro.cli.data_pipeline.utils import convert, download_file, StaticParameter, generate_name_with_uuid +from maro.utils.exception.cli_exception import CommandError from maro.utils.logger import CliLogger logger = CliLogger(name=__name__) @@ -22,12 +23,11 @@ class DataPipeline(ABC): _meta_file_name = "" def __init__(self, scenario: str, topology: str, source: str, is_temp: bool = False): - """ - Base class of data pipeline. + """Base class of data pipeline. Generate scenario/topology specific data for the business engine. General workflow: - Step 1: Download the original data file from the source to download folder - Step 2: Generate the clean data in clean folder + Step 1: Download the original data file from the source to download folder. + Step 2: Generate the clean data in clean folder. Step 3: Build a binary data file in build folder. The folder structer is: ~/.maro @@ -39,10 +39,10 @@ def __init__(self, scenario: str, topology: str, source: str, is_temp: bool = Fa Args: - scenario(str): the scenario of the data - topology(str): the topology of the scenario - source(str): the original source of data file - is_temp(bool): (optional) if the data file is temporary + scenario(str): the scenario of the data. + topology(str): the topology of the scenario. + source(str): the original source of data file. + is_temp(bool): (optional) if the data file is temporary. """ self._scenario = scenario self._topology = topology @@ -75,8 +75,13 @@ def __init__(self, scenario: str, topology: str, source: str, is_temp: bool = Fa def build_folder(self): return self._build_folder - def download(self, is_force: bool): - """download the original data file""" + def download(self, is_force: bool, fall_back: callable = None): + """Download the original data file. + + Args: + is_force(bool): If forced re-download the data file. + fall_back(callable): (optional)Fallback function to execute when download failed. + """ self._new_folder_list.append(self._download_folder) os.makedirs(self._download_folder, exist_ok=True) @@ -85,35 +90,43 @@ def download(self, is_force: bool): if (not is_force) and os.path.exists(self._download_file): logger.info_green("File already exists, skipping download.") else: - logger.info_green(f"Downloading data from {self._source} to {self._download_file}") - download_file(source=self._source, destination=self._download_file) + logger.info_green(f"Downloading data from {self._source} to {self._download_file}.") + try: + download_file(source=self._source, destination=self._download_file) + except Exception as e: + logger.warning_yellow(f"Failed to download from {self._source} to {self._download_file}.") + if fall_back is not None: + logger.warning_yellow(f"Calling fall_back function: {fall_back}.") + fall_back() + else: + raise CommandError("generate", f"Download error: {e}.") def clean(self): - """clean the original data file""" + """Clean the original data file.""" self._new_folder_list.append(self._clean_folder) os.makedirs(self._clean_folder, exist_ok=True) self._new_folder_list.append(self._build_folder) os.makedirs(self._build_folder, exist_ok=True) def build(self): - """build the cleaned data file to binary data file""" + """Build the cleaned data file to binary data file.""" self._new_file_list.append(self._build_file) if os.path.exists(self._clean_file): - logger.info_green(f"Building binary data from {self._clean_file} to {self._build_file}") + logger.info_green(f"Building binary data from {self._clean_file} to {self._build_file}.") convert(meta=self._build_meta_file, file=[self._clean_file], output=self._build_file) else: - logger.info_green(f"Not found cleaned data: {self._clean_file}") + logger.warning_yellow(f"Not found cleaned data: {self._clean_file}.") def remove_file(self): - """remove the temporary files""" + """Remove the temporary files.""" for new_file in self._new_file_list: if os.path.exists(new_file): os.remove(new_file) self._new_file_list.clear() def remove_folder(self): - """remove the temporary folders""" + """Remove the temporary folders.""" for new_folder in self._new_folder_list: if os.path.exists(new_folder): shutil.rmtree(new_folder) @@ -123,30 +136,42 @@ def remove_folder(self): class DataTopology(ABC): def __init__(self): - """ - Data topology manage multi data pipelines for a specified topology of a research scenario. - """ + """Data topology manage multi data pipelines for a specified topology of a research scenario.""" self._data_pipeline = {} - def get_build_folders(self): + def get_build_folders(self)-> dict: + """Get the build file folders of all data pipelines for the topology. + + Returns: + dict : dictionary of build folders, keys are data pipeline names, values + are paths of the build folders. + """ ret = {} for pipeline in self._data_pipeline: ret[pipeline] = self._data_pipeline[pipeline].build_folder return ret def download(self, is_force: bool = False): + """Download the original data files of all data pipelines. + + Args: + is_force(bool): If forced re-download the data file. + """ for pipeline in self._data_pipeline: self._data_pipeline[pipeline].download(is_force) def clean(self): + """Clean the original data files of all data pipelines.""" for pipeline in self._data_pipeline: self._data_pipeline[pipeline].clean() def build(self): + """Build the cleaned data files of all data pipelines to binary data file.""" for pipeline in self._data_pipeline: self._data_pipeline[pipeline].build() def remove(self): + """Remove the temporary files and folders of all data pipelines.""" for pipeline in self._data_pipeline: self._data_pipeline[pipeline].remove_file() for pipeline in self._data_pipeline: diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index 22072b214..c7bc7d8c9 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -40,8 +40,7 @@ class CitiBikePipeline(DataPipeline): _meta_file_name = "trips.yml" def __init__(self, topology: str, source: str, station_info: str, is_temp: bool = False): - """ - Generate citi_bike data bin and other necessary files for the specified topology from specified source. + """Generate citi_bike data bin and other necessary files for the specified topology from specified source. They will be generated in ~/.maro/data/citi_bike/[topology]/_build. Folder structure: ~/.maro @@ -70,29 +69,29 @@ def __init__(self, topology: str, source: str, station_info: str, is_temp: bool self._common_data = {} def download(self, is_force: bool = False): - """download the zip file""" + """Download the zip file.""" super().download(is_force) self._new_file_list.append(self._station_info_file) if (not is_force) and os.path.exists(self._station_info_file): logger.info_green("File already exists, skipping download.") else: - logger.info_green(f"Downloading trip data from {self._station_info} to {self._station_info_file}") + logger.info_green(f"Downloading trip data from {self._station_info} to {self._station_info_file}.") download_file(source=self._station_info, destination=self._station_info_file) def clean(self): - """unzip the csv file and process it for building binary file""" + """Unzip the csv file and process it for building binary file.""" super().clean() - logger.info_green("Cleaning trip data") + logger.info_green("Cleaning trip data.") if os.path.exists(self._download_file): # unzip - logger.info_green("Unzip start") + logger.info_green("Unzip start.") with zipfile.ZipFile(self._download_file, "r") as zip_ref: for filename in zip_ref.namelist(): # Only one csv file is expected. if filename.endswith(".csv") and (not (filename.startswith("__MACOSX") or filename.startswith("."))): - logger.info_green(f"Unzip {filename} from {self._download_file}") + logger.info_green(f"Unzip {filename} from {self._download_file}.") zip_ref.extractall(self._clean_folder, [filename]) unzip_file = os.path.join(self._clean_folder, filename) @@ -100,10 +99,10 @@ def clean(self): self._preprocess(unzipped_file=unzip_file) break else: - logger.warning(f"Not found downloaded trip data: {self._download_file}") + logger.warning(f"Not found downloaded trip data: {self._download_file}.") def _read_common_data(self): - """read and full init data and existed stations""" + """Read and full init data and existed stations.""" full_stations = None @@ -133,7 +132,7 @@ def _read_common_data(self): self._common_data["full_dock_num"] = self._common_data["full_stations"]["capacity"].sum() def _read_src_file(self, file: str): - """read and return processed rows""" + """Read and return processed rows.""" ret = [] if os.path.exists(file): @@ -228,9 +227,9 @@ def _process_distance(self, station_info: pd.DataFrame): def _preprocess(self, unzipped_file: str): self._read_common_data() - logger.info_green("Reading raw data") + logger.info_green("Reading raw trip data.") org_data = self._read_src_file(file=unzipped_file) - logger.info_green("Processing trip data") + logger.info_green("Processing trip data.") trip_data, used_bikes, in_data_station, stations_existed = self._process_src_file(src_data=org_data) self._new_file_list.append(self._clean_file) @@ -240,12 +239,12 @@ def _preprocess(self, unzipped_file: str): with open(self._clean_file, mode="w", encoding="utf-8", newline="") as f: trip_data.to_csv(f, index=False, header=True) - logger.info_green("Processing init data") + logger.info_green("Processing station info data.") station_info = self._process_current_topo_station_info(stations_existed=stations_existed, used_bikes=used_bikes, loc_ref=in_data_station) with open(self._station_meta_file, mode="w", encoding="utf-8", newline="") as f: station_info.to_csv(f, index=False, header=True) - logger.info_green("Processing distance data") + logger.info_green("Processing station distance data.") station_distance = self._process_distance(station_info=station_info) with open(self._distance_file, mode="w", encoding="utf-8", newline="") as f: station_distance.to_csv(f, index=False, header=True) @@ -270,8 +269,7 @@ class WeatherEnum(Enum): SLEET = 3 def __init__(self, topology: str, source: str, is_temp: bool = False): - """ - Generate weather data bin for the specified topology from frontierweather.com. + """Generate weather data bin for the specified topology from frontierweather.com. Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. Folder structure: ~/.maro @@ -295,10 +293,10 @@ def clean(self): super().clean() if os.path.exists(self._download_file): self._new_file_list.append(self._clean_file) - logger.info_green("Cleaning weather data") + logger.info_green("Cleaning weather data.") self._preprocess(input_file=self._download_file, output_file=self._clean_file) else: - logger.warning(f"Not found downloaded weather data: {self._download_file}") + logger.warning(f"Not found downloaded weather data: {self._download_file}.") def _weather(self, row: dict): water_str = row["Precipitation Water Equiv"] @@ -351,8 +349,7 @@ def _preprocess(self, input_file: str, output_file: str): class CitiBikeTopology(DataTopology): - """ - Data topology for a predefined topology of citi_bike scenario. + """Data topology for a predefined topology of citi_bike scenario. Args: topology(str): topology name of the data file @@ -383,8 +380,7 @@ class CitiBikeToyPipeline(DataPipeline): _meta_file_name = "trips.yml" def __init__(self, start_time: str, end_time: str, stations: list, trips: list, topology: str, is_temp: bool = False): - """ - Generate synthetic business events and station initialization distribution for Citi Bike scenario, from the predefined toy topologies. + """Generate synthetic business events and station initialization distribution for Citi Bike scenario, from the predefined toy topologies. Folder structure: ~/.maro /data/citi_bike/[topology] @@ -409,10 +405,11 @@ def __init__(self, start_time: str, end_time: str, stations: list, trips: list, self._station_meta_file = os.path.join(self._build_folder, self._station_meta_file_name) def download(self, is_force: bool): + """Toy datapipeline not need download process.""" pass def _station_dict_to_pd(self, station_dict): - """convert dictionary of station information to pd series""" + """Convert dictionary of station information to pd series.""" return pd.Series( [ station_dict["id"], @@ -424,7 +421,7 @@ def _station_dict_to_pd(self, station_dict): index=["station_index", "capacity", "init", "latitude", "longitude"]) def _gen_stations(self): - """generate station meta csv""" + """Generate station meta csv.""" self._new_file_list.append(self._station_meta_file) stations = pd.Series(self._stations).apply(self._station_dict_to_pd) @@ -438,7 +435,7 @@ def _gen_stations(self): return stations def _gen_trip(self, tick): - """generate trip record""" + """Generate trip record.""" ret_list = [] cur_probability = random.uniform(0, 1) for trip in self._trips: @@ -454,7 +451,7 @@ def _gen_trip(self, tick): return ret_list def _gen_trips(self): - """generate trip records csv files""" + """Generate trip records csv files.""" cur_tick = pd.to_datetime(self._start_time) end_tick = pd.to_datetime(self._end_time) @@ -475,7 +472,7 @@ def _gen_trips(self): return trips_df def _gen_distance(self, station_init: pd.DataFrame): - """generate distance metrix csv file""" + """Generate distance metrix csv file.""" distance_adj = pd.DataFrame(0, index=station_init["station_index"], columns=station_init["station_index"], dtype=np.float) look_up_df = station_init[["latitude", "longitude"]] distance_df = distance_adj.apply(lambda x: pd.DataFrame(x).apply(lambda y: geopy.distance.distance( @@ -487,7 +484,8 @@ def _gen_distance(self, station_init: pd.DataFrame): return distance_df def clean(self): - logger.info_green(f"Generating trip data for topology {self._topology} .") + """Clean the original data file.""" + logger.info_green(f"Generating trip data for topology {self._topology}.") super().clean() stations = self._gen_stations() self._gen_trips() @@ -497,8 +495,7 @@ def clean(self): class WeatherToyPipeline(WeatherPipeline): def __init__(self, topology: str, start_time: str, end_time: str, is_temp: bool = False): - """ - Generate weather data bin for the specified topology from frontierweather.com. + """Generate weather data bin for the specified topology from frontierweather.com. It will be generated in ~/.maro/data/citi_bike/[topology]/_build. folder structure: ~/.maro @@ -510,20 +507,22 @@ def __init__(self, topology: str, start_time: str, end_time: str, is_temp: bool /temp download temp file Args: - topology(str): topology name of the data file - start_time(str): start time of the toy data - end_time(str): end time of the toy data - is_temp(bool): (optional) if the data file is temporary + topology(str): topology name of the data file. + start_time(str): start time of the toy data. + end_time(str): end time of the toy data. + is_temp(bool): (optional) if the data file is temporary. """ super().__init__(topology, "", is_temp) self._start_time = start_time self._end_time = end_time def download(self, is_force: bool): + """Toy datapipeline not need download process.""" pass def clean(self): - logger.info_green("Cleaning weather data") + """clean the original data file.""" + logger.info_green("Cleaning weather data.") DataPipeline.clean(self) self._new_file_list.append(self._clean_file) self._preprocess(output_file=self._clean_file) @@ -568,13 +567,12 @@ def _preprocess(self, output_file: str): class CitiBikeToyTopology(DataTopology): - """ - Data topology for a predefined toy topology of citi_bike scenario. + """Data topology for a predefined toy topology of citi_bike scenario. Args: - topology(str): topology name of the data file - config_path(str): config file path of the topology - is_temp(bool): (optional) if the data file is temporary + topology(str): topology name of the data file. + config_path(str): config file path of the topology. + is_temp(bool): (optional) if the data file is temporary. """ def __init__(self, topology: str, config_path: str, is_temp: bool = False): super().__init__() @@ -597,11 +595,10 @@ def __del__(self): self.remove() class CitiBikeProcess: - """ - Contains all predefined data topologies of citi_bike scenario. + """Contains all predefined data topologies of citi_bike scenario. Args: - is_temp(bool): (optional) if the data file is temporary + is_temp(bool): (optional) if the data file is temporary. """ meta_file_name = "source_urls.yml" meta_root = os.path.join(StaticParameter.data_root, "citi_bike/meta") @@ -628,8 +625,7 @@ def __init__(self, is_temp: bool = False): class NOAAWeatherPipeline(WeatherPipeline): def __init__(self, topology: str, source: str, is_temp: bool = False): - """ - Generate weather data bin for the specified topology from ncei.noaa.gov. + """Generate weather data bin for the specified topology from ncei.noaa.gov. Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. Folder structure: ~/.maro @@ -641,20 +637,25 @@ def __init__(self, topology: str, source: str, is_temp: bool = False): /temp download temp file Args: - topology(str): topology name of the data file - source(str): source url of original data file - is_temp(bool): (optional) if the data file is temporary + topology(str): topology name of the data file. + source(str): source url of original data file. + is_temp(bool): (optional) if the data file is temporary. """ super().__init__(topology, source, is_temp) + def download(self, is_force: bool): + """download the original data file.""" + super().download(is_force, self._gen_fall_back_file) + def clean(self): - super().clean() + """Clean the original data file.""" + DataPipeline.clean(self) if os.path.exists(self._download_file): self._new_file_list.append(self._clean_file) - logger.info_green("Cleaning weather data") + logger.info_green("Cleaning weather data.") self._preprocess(input_file=self._download_file, output_file=self._clean_file) else: - logger.warning(f"Not found downloaded weather data: {self._download_file}") + logger.warning(f"Not found downloaded weather data: {self._download_file}.") def _weather(self, row): water = row["PRCP"] if row["PRCP"] is not None else 0.0 @@ -683,6 +684,14 @@ def _preprocess(self, input_file: str, output_file: str): data["date"] = org_data["DATE"] data["weather"] = org_data.apply(self._weather, axis=1) data["temp"] = (org_data["TMAX"] + org_data["TMIN"])/2 - + data.dropna(inplace=True) with open(output_file, mode="w", encoding="utf-8", newline="") as f: data.to_csv(f, index=False, header=True) + + def _gen_fall_back_file(self): + fall_back_content = [ + "\"STATION\",\"DATE\",\"AWND\",\"PRCP\",\"SNOW\",\"TMAX\",\"TMIN\"\n", + ",,,,,,\n" + ] + with open(self._download_file, mode="w", encoding="utf-8", newline="") as f: + f.writelines(fall_back_content) diff --git a/maro/utils/logger.py b/maro/utils/logger.py index 30773aa64..c664b5c68 100644 --- a/maro/utils/logger.py +++ b/maro/utils/logger.py @@ -46,7 +46,8 @@ class LogFormat(Enum): } FORMAT_NAME_TO_STDOUT_FORMAT = { - LogFormat.cli_info: logging.Formatter(fmt='%(message)s'), + LogFormat.cli_info: logging.Formatter( + fmt='%(asctime)s | %(tag)s | %(message)s', datefmt='%H:%M:%S'), } PROGRESS = 60 # progress of training, we give it a highest level From 1eb330c2622c7b022d1539aedc8d79b04c888d8f Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Mon, 28 Sep 2020 19:17:25 +0800 Subject: [PATCH 18/30] update comment --- maro/cli/data_pipeline/base.py | 47 ++++---- maro/cli/data_pipeline/citi_bike.py | 168 ++++++++++++++-------------- 2 files changed, 110 insertions(+), 105 deletions(-) diff --git a/maro/cli/data_pipeline/base.py b/maro/cli/data_pipeline/base.py index 3bbefe697..e3bbc1fa5 100644 --- a/maro/cli/data_pipeline/base.py +++ b/maro/cli/data_pipeline/base.py @@ -14,6 +14,28 @@ logger = CliLogger(name=__name__) class DataPipeline(ABC): + """Base class of data pipeline. + Generate scenario/topology specific data for the business engine. + General workflow: + Step 1: Download the original data file from the source to download folder. + Step 2: Generate the clean data in clean folder. + Step 3: Build a binary data file in build folder. + The folder structer is: + ~/.maro + /data/[scenario]/[topology] + /_download original data file + /_clean cleaned data file + /_build bin data file and other necessory files + /meta meta files for data pipeline + + + Args: + scenario(str): the scenario of the data. + topology(str): the topology of the scenario. + source(str): the original source of data file. + is_temp(bool): (optional) if the data file is temporary. + """ + _download_file_name = "" _clean_file_name = "" @@ -23,27 +45,6 @@ class DataPipeline(ABC): _meta_file_name = "" def __init__(self, scenario: str, topology: str, source: str, is_temp: bool = False): - """Base class of data pipeline. - Generate scenario/topology specific data for the business engine. - General workflow: - Step 1: Download the original data file from the source to download folder. - Step 2: Generate the clean data in clean folder. - Step 3: Build a binary data file in build folder. - The folder structer is: - ~/.maro - /data/[scenario]/[topology] - /_download original data file - /_clean cleaned data file - /_build bin data file and other necessory files - /meta meta files for data pipeline - - - Args: - scenario(str): the scenario of the data. - topology(str): the topology of the scenario. - source(str): the original source of data file. - is_temp(bool): (optional) if the data file is temporary. - """ self._scenario = scenario self._topology = topology self._is_temp = is_temp @@ -134,9 +135,9 @@ def remove_folder(self): class DataTopology(ABC): - + """Data topology manage multi data pipelines for a specified topology of a research scenario.""" + def __init__(self): - """Data topology manage multi data pipelines for a specified topology of a research scenario.""" self._data_pipeline = {} def get_build_folders(self)-> dict: diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index c7bc7d8c9..9905774d8 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -27,6 +27,23 @@ class CitiBikePipeline(DataPipeline): + """Generate citi_bike data bin and other necessary files for the specified topology from specified source. + They will be generated in ~/.maro/data/citi_bike/[topology]/_build. + Folder structure: + ~/.maro + /data/citi_bike/[topology] + /_build bin data file and other necessory files + /source + /_download original data files + /_clean cleaned data files + /temp download temp files + + Args: + topology(str): topology name of the data files + source(str): source url of original data file + station_info(str): source url of station info file + is_temp(bool): (optional) if the data file is temporary + """ _download_file_name = "trips.zip" _station_info_file_name = "full_station.json" @@ -40,23 +57,6 @@ class CitiBikePipeline(DataPipeline): _meta_file_name = "trips.yml" def __init__(self, topology: str, source: str, station_info: str, is_temp: bool = False): - """Generate citi_bike data bin and other necessary files for the specified topology from specified source. - They will be generated in ~/.maro/data/citi_bike/[topology]/_build. - Folder structure: - ~/.maro - /data/citi_bike/[topology] - /_build bin data file and other necessory files - /source - /_download original data files - /_clean cleaned data files - /temp download temp files - - Args: - topology(str): topology name of the data files - source(str): source url of original data file - station_info(str): source url of station info file - is_temp(bool): (optional) if the data file is temporary - """ super().__init__("citi_bike", topology, source, is_temp) self._station_info = station_info @@ -251,6 +251,22 @@ def _preprocess(self, unzipped_file: str): class WeatherPipeline(DataPipeline): + """Generate weather data bin for the specified topology from frontierweather.com. + Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. + Folder structure: + ~/.maro + /data/citi_bike/[topology] + /_build bin data file + /source + /_download original data file + /_clean cleaned data file + /temp download temp file + + Args: + topology(str): topology name of the data file + source(str): source url of original data file + is_temp(bool): (optional) if the data file is temporary + """ _last_day_temp = None # used to fill the temp for days which have no temp info @@ -269,22 +285,6 @@ class WeatherEnum(Enum): SLEET = 3 def __init__(self, topology: str, source: str, is_temp: bool = False): - """Generate weather data bin for the specified topology from frontierweather.com. - Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. - Folder structure: - ~/.maro - /data/citi_bike/[topology] - /_build bin data file - /source - /_download original data file - /_clean cleaned data file - /temp download temp file - - Args: - topology(str): topology name of the data file - source(str): source url of original data file - is_temp(bool): (optional) if the data file is temporary - """ super().__init__("citi_bike", topology, source, is_temp) self._common_data = {} @@ -358,6 +358,7 @@ class CitiBikeTopology(DataTopology): weather_source(str): original source url of weather data is_temp(bool): (optional) if the data file is temporary """ + def __init__(self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False): super().__init__() self._data_pipeline["trip"] = CitiBikePipeline(topology, trip_source, station_info, is_temp) @@ -370,7 +371,22 @@ def __del__(self): class CitiBikeToyPipeline(DataPipeline): - + """Generate synthetic business events and station initialization distribution for Citi Bike scenario, from the predefined toy topologies. + Folder structure: + ~/.maro + /data/citi_bike/[topology] + /_build bin data file and other necessory files + + + Args: + start_time(str): start time of the toy data + end_time(str): end time of the toy data + stations(list): list of stations info + trips(list): list of trips probability + topology(str): topology name of the data files + is_temp(bool): (optional) if the data file is temporary + """ + _clean_file_name = "trips.csv" _build_file_name = "trips.bin" @@ -380,21 +396,6 @@ class CitiBikeToyPipeline(DataPipeline): _meta_file_name = "trips.yml" def __init__(self, start_time: str, end_time: str, stations: list, trips: list, topology: str, is_temp: bool = False): - """Generate synthetic business events and station initialization distribution for Citi Bike scenario, from the predefined toy topologies. - Folder structure: - ~/.maro - /data/citi_bike/[topology] - /_build bin data file and other necessory files - - - Args: - start_time(str): start time of the toy data - end_time(str): end time of the toy data - stations(list): list of stations info - trips(list): list of trips probability - topology(str): topology name of the data files - is_temp(bool): (optional) if the data file is temporary - """ super().__init__("citi_bike", topology, "", is_temp) self._start_time = start_time self._end_time = end_time @@ -493,25 +494,26 @@ def clean(self): class WeatherToyPipeline(WeatherPipeline): + """Generate weather data bin for the specified topology from frontierweather.com. + It will be generated in ~/.maro/data/citi_bike/[topology]/_build. + folder structure: + ~/.maro + /data/citi_bike/[topology] + /_build bin data file + /source + /_download original data file + /_clean cleaned data file + /temp download temp file + + Args: + topology(str): topology name of the data file. + start_time(str): start time of the toy data. + end_time(str): end time of the toy data. + is_temp(bool): (optional) if the data file is temporary. + """ def __init__(self, topology: str, start_time: str, end_time: str, is_temp: bool = False): - """Generate weather data bin for the specified topology from frontierweather.com. - It will be generated in ~/.maro/data/citi_bike/[topology]/_build. - folder structure: - ~/.maro - /data/citi_bike/[topology] - /_build bin data file - /source - /_download original data file - /_clean cleaned data file - /temp download temp file - - Args: - topology(str): topology name of the data file. - start_time(str): start time of the toy data. - end_time(str): end time of the toy data. - is_temp(bool): (optional) if the data file is temporary. - """ + super().__init__(topology, "", is_temp) self._start_time = start_time self._end_time = end_time @@ -574,6 +576,7 @@ class CitiBikeToyTopology(DataTopology): config_path(str): config file path of the topology. is_temp(bool): (optional) if the data file is temporary. """ + def __init__(self, topology: str, config_path: str, is_temp: bool = False): super().__init__() self._is_temp = is_temp @@ -600,6 +603,7 @@ class CitiBikeProcess: Args: is_temp(bool): (optional) if the data file is temporary. """ + meta_file_name = "source_urls.yml" meta_root = os.path.join(StaticParameter.data_root, "citi_bike/meta") @@ -623,24 +627,24 @@ def __init__(self, is_temp: bool = False): class NOAAWeatherPipeline(WeatherPipeline): + """Generate weather data bin for the specified topology from ncei.noaa.gov. + Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. + Folder structure: + ~/.maro + /data/citi_bike/[topology] + /_build bin data file + /source + /_download original data file + /_clean cleaned data file + /temp download temp file + + Args: + topology(str): topology name of the data file. + source(str): source url of original data file. + is_temp(bool): (optional) if the data file is temporary. + """ def __init__(self, topology: str, source: str, is_temp: bool = False): - """Generate weather data bin for the specified topology from ncei.noaa.gov. - Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. - Folder structure: - ~/.maro - /data/citi_bike/[topology] - /_build bin data file - /source - /_download original data file - /_clean cleaned data file - /temp download temp file - - Args: - topology(str): topology name of the data file. - source(str): source url of original data file. - is_temp(bool): (optional) if the data file is temporary. - """ super().__init__(topology, source, is_temp) def download(self, is_force: bool): From f832ca467918467be517cb9a8c4c63a6f75ed02e Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 30 Sep 2020 11:33:50 +0800 Subject: [PATCH 19/30] update for comments --- maro/cli/data_pipeline/base.py | 18 ++++---- maro/cli/data_pipeline/citi_bike.py | 69 ++++++++++++++++------------- 2 files changed, 46 insertions(+), 41 deletions(-) diff --git a/maro/cli/data_pipeline/base.py b/maro/cli/data_pipeline/base.py index e3bbc1fa5..b70cf32da 100644 --- a/maro/cli/data_pipeline/base.py +++ b/maro/cli/data_pipeline/base.py @@ -15,12 +15,13 @@ class DataPipeline(ABC): """Base class of data pipeline. + Generate scenario/topology specific data for the business engine. General workflow: Step 1: Download the original data file from the source to download folder. Step 2: Generate the clean data in clean folder. Step 3: Build a binary data file in build folder. - The folder structer is: + The folder structure is: ~/.maro /data/[scenario]/[topology] /_download original data file @@ -28,12 +29,11 @@ class DataPipeline(ABC): /_build bin data file and other necessory files /meta meta files for data pipeline - Args: - scenario(str): the scenario of the data. - topology(str): the topology of the scenario. - source(str): the original source of data file. - is_temp(bool): (optional) if the data file is temporary. + scenario(str): The scenario of the data. + topology(str): The topology of the scenario. + source(str): The original source of data file. + is_temp(bool): (optional) If the data file is temporary. """ _download_file_name = "" @@ -81,7 +81,7 @@ def download(self, is_force: bool, fall_back: callable = None): Args: is_force(bool): If forced re-download the data file. - fall_back(callable): (optional)Fallback function to execute when download failed. + fall_back(callable): (optional) Fallback function to execute when download failed. """ self._new_folder_list.append(self._download_folder) os.makedirs(self._download_folder, exist_ok=True) @@ -136,7 +136,7 @@ def remove_folder(self): class DataTopology(ABC): """Data topology manage multi data pipelines for a specified topology of a research scenario.""" - + def __init__(self): self._data_pipeline = {} @@ -144,7 +144,7 @@ def get_build_folders(self)-> dict: """Get the build file folders of all data pipelines for the topology. Returns: - dict : dictionary of build folders, keys are data pipeline names, values + dict: Dictionary of build folders, keys are data pipeline names, values are paths of the build folders. """ ret = {} diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index 9905774d8..40bca874b 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -28,6 +28,7 @@ class CitiBikePipeline(DataPipeline): """Generate citi_bike data bin and other necessary files for the specified topology from specified source. + They will be generated in ~/.maro/data/citi_bike/[topology]/_build. Folder structure: ~/.maro @@ -39,10 +40,10 @@ class CitiBikePipeline(DataPipeline): /temp download temp files Args: - topology(str): topology name of the data files - source(str): source url of original data file - station_info(str): source url of station info file - is_temp(bool): (optional) if the data file is temporary + topology(str): Topology name of the data files + source(str): Source url of original data file + station_info(str): Source url of station info file + is_temp(bool): (optional) If the data file is temporary """ _download_file_name = "trips.zip" @@ -252,6 +253,7 @@ def _preprocess(self, unzipped_file: str): class WeatherPipeline(DataPipeline): """Generate weather data bin for the specified topology from frontierweather.com. + Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. Folder structure: ~/.maro @@ -263,9 +265,9 @@ class WeatherPipeline(DataPipeline): /temp download temp file Args: - topology(str): topology name of the data file - source(str): source url of original data file - is_temp(bool): (optional) if the data file is temporary + topology(str): Topology name of the data file + source(str): Source url of original data file + is_temp(bool): (optional) If the data file is temporary """ _last_day_temp = None # used to fill the temp for days which have no temp info @@ -290,6 +292,7 @@ def __init__(self, topology: str, source: str, is_temp: bool = False): self._common_data = {} def clean(self): + """Clean the original data file.""" super().clean() if os.path.exists(self._download_file): self._new_file_list.append(self._clean_file) @@ -352,11 +355,11 @@ class CitiBikeTopology(DataTopology): """Data topology for a predefined topology of citi_bike scenario. Args: - topology(str): topology name of the data file - trip_source(str): original source url of citi_bike data - station_info(str): current status station info of the stations - weather_source(str): original source url of weather data - is_temp(bool): (optional) if the data file is temporary + topology(str): Topology name of the data file + trip_source(str): Original source url of citi_bike data + station_info(str): Current status station info of the stations + weather_source(str): Original source url of weather data + is_temp(bool): (optional) If the data file is temporary """ def __init__(self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False): @@ -372,19 +375,19 @@ def __del__(self): class CitiBikeToyPipeline(DataPipeline): """Generate synthetic business events and station initialization distribution for Citi Bike scenario, from the predefined toy topologies. + Folder structure: ~/.maro /data/citi_bike/[topology] /_build bin data file and other necessory files - Args: - start_time(str): start time of the toy data - end_time(str): end time of the toy data - stations(list): list of stations info - trips(list): list of trips probability - topology(str): topology name of the data files - is_temp(bool): (optional) if the data file is temporary + start_time(str): Start time of the toy data + end_time(str): End time of the toy data + stations(list): List of stations info + trips(list): List of trips probability + topology(str): Topology name of the data files + is_temp(bool): (optional) If the data file is temporary """ _clean_file_name = "trips.csv" @@ -495,8 +498,9 @@ def clean(self): class WeatherToyPipeline(WeatherPipeline): """Generate weather data bin for the specified topology from frontierweather.com. + It will be generated in ~/.maro/data/citi_bike/[topology]/_build. - folder structure: + Folder structure: ~/.maro /data/citi_bike/[topology] /_build bin data file @@ -506,10 +510,10 @@ class WeatherToyPipeline(WeatherPipeline): /temp download temp file Args: - topology(str): topology name of the data file. - start_time(str): start time of the toy data. - end_time(str): end time of the toy data. - is_temp(bool): (optional) if the data file is temporary. + topology(str): Topology name of the data file. + start_time(str): Start time of the toy data. + end_time(str): End time of the toy data. + is_temp(bool): (optional) If the data file is temporary. """ def __init__(self, topology: str, start_time: str, end_time: str, is_temp: bool = False): @@ -572,9 +576,9 @@ class CitiBikeToyTopology(DataTopology): """Data topology for a predefined toy topology of citi_bike scenario. Args: - topology(str): topology name of the data file. - config_path(str): config file path of the topology. - is_temp(bool): (optional) if the data file is temporary. + topology(str): Topology name of the data file. + config_path(str): Config file path of the topology. + is_temp(bool): (optional) If the data file is temporary. """ def __init__(self, topology: str, config_path: str, is_temp: bool = False): @@ -601,7 +605,7 @@ class CitiBikeProcess: """Contains all predefined data topologies of citi_bike scenario. Args: - is_temp(bool): (optional) if the data file is temporary. + is_temp(bool): (optional) If the data file is temporary. """ meta_file_name = "source_urls.yml" @@ -628,6 +632,7 @@ def __init__(self, is_temp: bool = False): class NOAAWeatherPipeline(WeatherPipeline): """Generate weather data bin for the specified topology from ncei.noaa.gov. + Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. Folder structure: ~/.maro @@ -639,16 +644,16 @@ class NOAAWeatherPipeline(WeatherPipeline): /temp download temp file Args: - topology(str): topology name of the data file. - source(str): source url of original data file. - is_temp(bool): (optional) if the data file is temporary. + topology(str): Topology name of the data file. + source(str): Source url of original data file. + is_temp(bool): (optional) If the data file is temporary. """ def __init__(self, topology: str, source: str, is_temp: bool = False): super().__init__(topology, source, is_temp) def download(self, is_force: bool): - """download the original data file.""" + """Download the original data file.""" super().download(is_force, self._gen_fall_back_file) def clean(self): From fe58f36312ab52e819edb579326f7c083869fc08 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 30 Sep 2020 11:44:57 +0800 Subject: [PATCH 20/30] update comment --- maro/cli/data_pipeline/citi_bike.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index 40bca874b..5b3c91dd9 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -527,7 +527,7 @@ def download(self, is_force: bool): pass def clean(self): - """clean the original data file.""" + """Clean the original data file.""" logger.info_green("Cleaning weather data.") DataPipeline.clean(self) self._new_file_list.append(self._clean_file) From 2aaa305028dd8788263d33571b4236459df00dd2 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 30 Sep 2020 23:22:55 +0800 Subject: [PATCH 21/30] add period --- maro/cli/data_pipeline/citi_bike.py | 44 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index 504b56a38..4c5339281 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -34,10 +34,10 @@ class CitiBikePipeline(DataPipeline): /temp download temp files Args: - topology(str): Topology name of the data files - source(str): Source url of original data file - station_info(str): Source url of station info file - is_temp(bool): (optional) If the data file is temporary + topology(str): Topology name of the data files. + source(str): Source url of original data file. + station_info(str): Source url of station info file. + is_temp(bool): (optional) If the data file is temporary. """ _download_file_name = "trips.zip" @@ -293,7 +293,7 @@ def _preprocess(self, unzipped_file: str): logger.info_green("Processing station info data.") station_info = self._process_current_topo_station_info( stations_existed=stations_existed, used_bikes=used_bikes, loc_ref=in_data_station - ) + ) with open(self._station_meta_file, mode="w", encoding="utf-8", newline="") as f: station_info.to_csv(f, index=False, header=True) @@ -317,9 +317,9 @@ class WeatherPipeline(DataPipeline): /temp download temp file Args: - topology(str): Topology name of the data file - source(str): Source url of original data file - is_temp(bool): (optional) If the data file is temporary + topology(str): Topology name of the data file. + source(str): Source url of original data file. + is_temp(bool): (optional) If the data file is temporary. """ _last_day_temp = None # used to fill the temp for days which have no temp info @@ -407,14 +407,16 @@ class CitiBikeTopology(DataTopology): """Data topology for a predefined topology of citi_bike scenario. Args: - topology(str): Topology name of the data file - trip_source(str): Original source url of citi_bike data - station_info(str): Current status station info of the stations - weather_source(str): Original source url of weather data - is_temp(bool): (optional) If the data file is temporary + topology(str): Topology name of the data file. + trip_source(str): Original source url of citi_bike data. + station_info(str): Current status station info of the stations. + weather_source(str): Original source url of weather data. + is_temp(bool): (optional) If the data file is temporary. """ - def __init__(self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False): + def __init__( + self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False + ): super().__init__() self._data_pipeline["trip"] = CitiBikePipeline(topology, trip_source, station_info, is_temp) self._data_pipeline["weather"] = NOAAWeatherPipeline(topology, weather_source, is_temp) @@ -434,12 +436,12 @@ class CitiBikeToyPipeline(DataPipeline): /_build bin data file and other necessory files Args: - start_time(str): Start time of the toy data - end_time(str): End time of the toy data - stations(list): List of stations info - trips(list): List of trips probability - topology(str): Topology name of the data files - is_temp(bool): (optional) If the data file is temporary + start_time(str): Start time of the toy data. + end_time(str): End time of the toy data. + stations(list): List of stations info. + trips(list): List of trips probability. + topology(str): Topology name of the data files. + is_temp(bool): (optional) If the data file is temporary. """ _clean_file_name = "trips.csv" @@ -538,7 +540,7 @@ def _gen_distance(self, station_init: pd.DataFrame): index=station_init["station_index"], columns=station_init["station_index"], dtype=np.float - ) + ) look_up_df = station_init[["latitude", "longitude"]] distance_df = distance_adj.apply(lambda x: pd.DataFrame(x).apply(lambda y: geopy.distance.distance( (look_up_df.at[x.name, "latitude"], look_up_df.at[x.name, "longitude"]), From 187539351512bba1b9d57cf4a25ae50fe19ce8aa Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 30 Sep 2020 23:35:11 +0800 Subject: [PATCH 22/30] fix for pylint --- maro/cli/data_pipeline/base.py | 2 +- maro/cli/data_pipeline/citi_bike.py | 34 ++++++++++++++--------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/maro/cli/data_pipeline/base.py b/maro/cli/data_pipeline/base.py index 7c061fd97..86287b7d0 100644 --- a/maro/cli/data_pipeline/base.py +++ b/maro/cli/data_pipeline/base.py @@ -108,7 +108,7 @@ def clean(self): os.makedirs(self._build_folder, exist_ok=True) def build(self): - """Build the cleaned data file to binary data file.""" + """Build the cleaned data file to binary data file.""" self._new_file_list.append(self._build_file) if os.path.exists(self._clean_file): logger.info_green(f"Building binary data from {self._clean_file} to {self._build_file}.") diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index 4c5339281..00342af2c 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -24,7 +24,7 @@ class CitiBikePipeline(DataPipeline): """Generate citi_bike data bin and other necessary files for the specified topology from specified source. They will be generated in ~/.maro/data/citi_bike/[topology]/_build. - Folder structure: + Folder structure: ~/.maro /data/citi_bike/[topology] /_build bin data file and other necessory files @@ -245,8 +245,7 @@ def _process_src_file(self, src_data: pd.DataFrame): return trip_data, used_bikes, in_data_station, stations_existed def _process_current_topo_station_info( - self, stations_existed: pd.DataFrame, used_bikes: int, loc_ref: pd.DataFrame - ): + self, stations_existed: pd.DataFrame, used_bikes: int, loc_ref: pd.DataFrame): data_station_init = stations_existed.join( self._common_data["full_stations"][["station_id", "capacity"]].set_index("station_id"), on="station_id" @@ -294,7 +293,7 @@ def _preprocess(self, unzipped_file: str): station_info = self._process_current_topo_station_info( stations_existed=stations_existed, used_bikes=used_bikes, loc_ref=in_data_station ) - with open(self._station_meta_file, mode="w", encoding="utf-8", newline="") as f: + with open(self._station_meta_file, mode="w", encoding="utf-8", newline="") as f: station_info.to_csv(f, index=False, header=True) logger.info_green("Processing station distance data.") @@ -307,7 +306,7 @@ class WeatherPipeline(DataPipeline): """Generate weather data bin for the specified topology from frontierweather.com. Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. - Folder structure: + Folder structure: ~/.maro /data/citi_bike/[topology] /_build bin data file @@ -415,8 +414,7 @@ class CitiBikeTopology(DataTopology): """ def __init__( - self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False - ): + self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False): super().__init__() self._data_pipeline["trip"] = CitiBikePipeline(topology, trip_source, station_info, is_temp) self._data_pipeline["weather"] = NOAAWeatherPipeline(topology, weather_source, is_temp) @@ -428,9 +426,10 @@ def __del__(self): class CitiBikeToyPipeline(DataPipeline): - """Generate synthetic business events and station initialization distribution for Citi Bike scenario, from the predefined toy topologies. + """Generate synthetic business events and station initialization distribution for Citi Bike scenario, + from the predefined toy topologies. - Folder structure: + Folder structure: ~/.maro /data/citi_bike/[topology] /_build bin data file and other necessory files @@ -453,8 +452,7 @@ class CitiBikeToyPipeline(DataPipeline): _meta_file_name = "trips.yml" def __init__( - self, start_time: str, end_time: str, stations: list, trips: list, topology: str, is_temp: bool = False - ): + self, start_time: str, end_time: str, stations: list, trips: list, topology: str, is_temp: bool = False): super().__init__("citi_bike", topology, "", is_temp) self._start_time = start_time self._end_time = end_time @@ -536,9 +534,9 @@ def _gen_trips(self): def _gen_distance(self, station_init: pd.DataFrame): """Generate distance metrix csv file.""" distance_adj = pd.DataFrame( - 0, - index=station_init["station_index"], - columns=station_init["station_index"], + 0, + index=station_init["station_index"], + columns=station_init["station_index"], dtype=np.float ) look_up_df = station_init[["latitude", "longitude"]] @@ -565,7 +563,7 @@ class WeatherToyPipeline(WeatherPipeline): """Generate weather data bin for the specified topology from frontierweather.com. It will be generated in ~/.maro/data/citi_bike/[topology]/_build. - Folder structure: + Folder structure: ~/.maro /data/citi_bike/[topology] /_build bin data file @@ -716,7 +714,7 @@ class NOAAWeatherPipeline(WeatherPipeline): """Generate weather data bin for the specified topology from ncei.noaa.gov. Generated files will be generated in ~/.maro/data/citi_bike/[topology]/_build. - Folder structure: + Folder structure: ~/.maro /data/citi_bike/[topology] /_build bin data file @@ -774,9 +772,9 @@ def _preprocess(self, input_file: str, output_file: str): data["date"] = org_data["DATE"] data["weather"] = org_data.apply(self._weather, axis=1) - data["temp"] = (org_data["TMAX"] + org_data["TMIN"])/2 + data["temp"] = (org_data["TMAX"] + org_data["TMIN"]) / 2 data.dropna(inplace=True) - with open(output_file, mode="w", encoding="utf-8", newline="") as f: + with open(output_file, mode="w", encoding="utf-8", newline="") as f: data.to_csv(f, index=False, header=True) def _gen_fall_back_file(self): From 6a65e3c21b2f802202fe0c2095a8f8fe14e05fc3 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 30 Sep 2020 23:46:30 +0800 Subject: [PATCH 23/30] update for pylint check --- maro/cli/data_pipeline/base.py | 6 +++--- maro/cli/data_pipeline/citi_bike.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/maro/cli/data_pipeline/base.py b/maro/cli/data_pipeline/base.py index 86287b7d0..21da01299 100644 --- a/maro/cli/data_pipeline/base.py +++ b/maro/cli/data_pipeline/base.py @@ -137,9 +137,9 @@ class DataTopology(ABC): def __init__(self): self._data_pipeline = {} - def get_build_folders(self)-> dict: + def get_build_folders(self) -> dict: """Get the build file folders of all data pipelines for the topology. - + Returns: dict: Dictionary of build folders, keys are data pipeline names, values are paths of the build folders. @@ -164,7 +164,7 @@ def clean(self): self._data_pipeline[pipeline].clean() def build(self): - """Build the cleaned data files of all data pipelines to binary data file.""" + """Build the cleaned data files of all data pipelines to binary data file.""" for pipeline in self._data_pipeline: self._data_pipeline[pipeline].build() diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index 00342af2c..7aa2fd1cc 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -245,7 +245,7 @@ def _process_src_file(self, src_data: pd.DataFrame): return trip_data, used_bikes, in_data_station, stations_existed def _process_current_topo_station_info( - self, stations_existed: pd.DataFrame, used_bikes: int, loc_ref: pd.DataFrame): + self, stations_existed: pd.DataFrame, used_bikes: int, loc_ref: pd.DataFrame): data_station_init = stations_existed.join( self._common_data["full_stations"][["station_id", "capacity"]].set_index("station_id"), on="station_id" @@ -414,7 +414,7 @@ class CitiBikeTopology(DataTopology): """ def __init__( - self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False): + self, topology: str, trip_source: str, station_info: str, weather_source: str, is_temp: bool = False): super().__init__() self._data_pipeline["trip"] = CitiBikePipeline(topology, trip_source, station_info, is_temp) self._data_pipeline["weather"] = NOAAWeatherPipeline(topology, weather_source, is_temp) @@ -452,7 +452,7 @@ class CitiBikeToyPipeline(DataPipeline): _meta_file_name = "trips.yml" def __init__( - self, start_time: str, end_time: str, stations: list, trips: list, topology: str, is_temp: bool = False): + self, start_time: str, end_time: str, stations: list, trips: list, topology: str, is_temp: bool = False): super().__init__("citi_bike", topology, "", is_temp) self._start_time = start_time self._end_time = end_time From d5b851b55b53b89d73c711c941378b5bd357da53 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Fri, 16 Oct 2020 15:41:19 +0800 Subject: [PATCH 24/30] update data pipeline docs and data version --- docs/source/scenarios/citi_bike.rst | 24 ++++++++++++------------ maro/__misc__.py | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/docs/source/scenarios/citi_bike.rst b/docs/source/scenarios/citi_bike.rst index 8b09db207..f14d72849 100644 --- a/docs/source/scenarios/citi_bike.rst +++ b/docs/source/scenarios/citi_bike.rst @@ -563,19 +563,19 @@ For the example above, the directory structure should be like: |-- ~/.maro |-- data | |-- citi_bike - | |-- [topology] - | |-- .build # bin data file - | |-- .source - | |-- .download # original data file - | |-- .clean # cleaned data file - |-- temp # download temp file - -Convert Command -~~~~~~~~~~~~~~~ + | |-- .build # bin data file + | |-- [topology] # topology + | |-- .source + | |-- .download # original data file + | |-- .clean # cleaned data file + |-- temp # download temp file + +Build Command +~~~~~~~~~~~~~ -The data ``convert`` command is used to convert the CSV data files to binary data +The data ``build`` command is used to build the CSV data files to binary data files that the simulator needs. Currently, there are three arguments for the data -``convert`` command: +``build`` command: * ``--meta``\ : required, used to specify the path of the meta file. The source columns that to be converted and the data type of each columns should be @@ -587,7 +587,7 @@ files that the simulator needs. Currently, there are three arguments for the dat .. code-block:: sh - maro data convert --meta ~/.maro/data/citibike/meta/trips.yml --file ~/.maro/data/citibike/source/_clean/ny201801/trip.csv --output ~/.maro/data/citibike/_build/ny201801/trip.bin + maro data build --meta ~/.maro/data/citibike/meta/trips.yml --file ~/.maro/data/citibike/source/_clean/ny201801/trip.csv --output ~/.maro/data/citibike/_build/ny201801/trip.bin Environment Interface ^^^^^^^^^^^^^^^^^^^^^ diff --git a/maro/__misc__.py b/maro/__misc__.py index 0686a28ea..e65e87ecc 100644 --- a/maro/__misc__.py +++ b/maro/__misc__.py @@ -4,4 +4,4 @@ __version__ = "0.1.1a10" -__data_version__ = "0.1" +__data_version__ = "0.2" From 97b7501ffd9f5e4521699082ec308b524d5bd309 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Mon, 19 Oct 2020 10:22:25 +0800 Subject: [PATCH 25/30] update for comments --- docs/source/scenarios/citi_bike.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/source/scenarios/citi_bike.rst b/docs/source/scenarios/citi_bike.rst index f14d72849..7c703b607 100644 --- a/docs/source/scenarios/citi_bike.rst +++ b/docs/source/scenarios/citi_bike.rst @@ -516,7 +516,10 @@ Data Preparation ^^^^^^^^^^^^^^^^ To start the simulation of Citi Bike scenario, users need to first generate the -related data. Below is the introduction to the related commands: +related data. (If the simulator did not detect the related data, it will automatically +try to generate the related data in a temp folder, and remove the temp data after the +experiment. So we encourage the users to generate the related data manually first.) +Below is the introduction to the related commands: Environment List Command ~~~~~~~~~~~~~~~~~~~~~~~~ From fac0844ffb12e297428962fb065bd0597d18d9f4 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Mon, 19 Oct 2020 17:11:03 +0800 Subject: [PATCH 26/30] update for comments --- docs/source/scenarios/citi_bike.rst | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/docs/source/scenarios/citi_bike.rst b/docs/source/scenarios/citi_bike.rst index 7c703b607..1b25b9685 100644 --- a/docs/source/scenarios/citi_bike.rst +++ b/docs/source/scenarios/citi_bike.rst @@ -515,11 +515,17 @@ Quick Start Data Preparation ^^^^^^^^^^^^^^^^ -To start the simulation of Citi Bike scenario, users need to first generate the -related data. (If the simulator did not detect the related data, it will automatically -try to generate the related data in a temp folder, and remove the temp data after the -experiment. So we encourage the users to generate the related data manually first.) -Below is the introduction to the related commands: +To start the simulation of Citi Bike scenario, users have two options for the data preparation: + +* If the topology data is not generated in advance, the system will automatically download and + process the relevant data when the environment is created. The data will be stored in a + temporary folder and be automatically deleted after the experiment. + +* Before creating the environment, users can also manually download and generate relevant data. + This approach will save you a lot of time if you need to conduct several experiments on the + same topology. Therefore, we encourage you to generate the relevant data manually first. + +The following is the introduction to related commands: Environment List Command ~~~~~~~~~~~~~~~~~~~~~~~~ From 56a3396cca75fe55127e82293f636bd5ca9c10f3 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Tue, 20 Oct 2020 11:45:16 +0800 Subject: [PATCH 27/30] update for lint --- .../grass/executors/grass_azure_executor.py | 48 ++++++++++++------- maro/cli/k8s/executors/k8s_azure_executor.py | 32 +++++++------ maro/cli/maro.py | 18 ++++--- maro/utils/logger.py | 14 ++++-- 4 files changed, 69 insertions(+), 43 deletions(-) diff --git a/maro/cli/grass/executors/grass_azure_executor.py b/maro/cli/grass/executors/grass_azure_executor.py index 5517fe541..0fa156f46 100644 --- a/maro/cli/grass/executors/grass_azure_executor.py +++ b/maro/cli/grass/executors/grass_azure_executor.py @@ -19,8 +19,10 @@ from maro.cli.utils.details import (load_cluster_details, save_cluster_details, load_job_details, save_job_details, load_schedule_details, save_schedule_details) from maro.cli.utils.executors.azure_executor import AzureExecutor -from maro.cli.utils.naming import (generate_cluster_id, generate_job_id, generate_component_id, generate_node_name, - get_valid_file_name) +from maro.cli.utils.naming import ( + generate_cluster_id, generate_job_id, generate_component_id, generate_node_name, + get_valid_file_name +) from maro.cli.utils.params import GlobalParams, GlobalPaths from maro.cli.utils.subprocess import SubProcess from maro.cli.utils.validation import validate_and_fill_dict @@ -194,18 +196,30 @@ def _init_master(self): self.grass_executor.retry_until_connected(node_ip_address=master_public_ip_address) # Create folders - sync_mkdir(remote_path=GlobalPaths.MARO_GRASS_LIB, - admin_username=admin_username, node_ip_address=master_public_ip_address) - sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}", - admin_username=admin_username, node_ip_address=master_public_ip_address) - sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/data", - admin_username=admin_username, node_ip_address=master_public_ip_address) - sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/images", - admin_username=admin_username, node_ip_address=master_public_ip_address) - sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs", - admin_username=admin_username, node_ip_address=master_public_ip_address) - sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/schedules", - admin_username=admin_username, node_ip_address=master_public_ip_address) + sync_mkdir( + remote_path=GlobalPaths.MARO_GRASS_LIB, + admin_username=admin_username, node_ip_address=master_public_ip_address + ) + sync_mkdir( + remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}", + admin_username=admin_username, node_ip_address=master_public_ip_address + ) + sync_mkdir( + remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/data", + admin_username=admin_username, node_ip_address=master_public_ip_address + ) + sync_mkdir( + remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/images", + admin_username=admin_username, node_ip_address=master_public_ip_address + ) + sync_mkdir( + remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs", + admin_username=admin_username, node_ip_address=master_public_ip_address + ) + sync_mkdir( + remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/schedules", + admin_username=admin_username, node_ip_address=master_public_ip_address + ) # Copy required files copy_files_to_node( @@ -698,8 +712,10 @@ def _count_running_containers(node_details: dict): # maro grass image - def push_image(self, image_name: str, image_path: str, remote_context_path: str, - remote_image_name: str): + def push_image( + self, image_name: str, image_path: str, remote_context_path: str, + remote_image_name: str + ): # Load details cluster_details = self.cluster_details admin_username = cluster_details['user']['admin_username'] diff --git a/maro/cli/k8s/executors/k8s_azure_executor.py b/maro/cli/k8s/executors/k8s_azure_executor.py index 90181113a..cb4363e37 100644 --- a/maro/cli/k8s/executors/k8s_azure_executor.py +++ b/maro/cli/k8s/executors/k8s_azure_executor.py @@ -239,8 +239,8 @@ def _create_k8s_secret(self): # Create k8s secret command = f'kubectl create secret generic {cluster_id}-k8s-secret ' \ - f'--from-literal=azurestorageaccountname={cluster_id}st ' \ - f'--from-literal=azurestorageaccountkey={storage_key}' + f'--from-literal=azurestorageaccountname={cluster_id}st ' \ + f'--from-literal=azurestorageaccountkey={storage_key}' _ = SubProcess.run(command) logger.debug(command) @@ -442,9 +442,9 @@ def push_data(self, local_path: str, remote_dir: str): if not target_dir.startswith("/"): raise CliException("Invalid remote path") copy_command = f'azcopy copy ' \ - f'"{source_path}" ' \ - f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{target_dir}?{sas}" ' \ - f'--recursive=True' + f'"{source_path}" ' \ + f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{target_dir}?{sas}" ' \ + f'--recursive=True' _ = SubProcess.run(copy_command) def pull_data(self, local_dir: str, remote_path: str): @@ -464,9 +464,9 @@ def pull_data(self, local_dir: str, remote_path: str): if not source_path.startswith("/"): raise CliException("Invalid remote path") copy_command = f'azcopy copy ' \ - f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{source_path}?{sas}" ' \ - f'"{os.path.expanduser(target_dir)}" ' \ - f'--recursive=True' + f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{source_path}?{sas}" ' \ + f'"{os.path.expanduser(target_dir)}" ' \ + f'--recursive=True' _ = SubProcess.run(copy_command) def remove_data(self, remote_path: str): @@ -481,8 +481,8 @@ def remove_data(self, remote_path: str): # Remove data copy_command = f'azcopy remove ' \ - f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{remote_path}?{sas}" ' \ - f'--recursive=True' + f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{remote_path}?{sas}" ' \ + f'--recursive=True' _ = SubProcess.run(copy_command) def _check_and_get_account_sas(self): @@ -545,13 +545,13 @@ def _start_job(self, job_details: dict): # Apply k8s config command = f"kubectl apply -f " \ - f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs/{job_name}/k8s_configs/jobs.yml" + f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs/{job_name}/k8s_configs/jobs.yml" _ = SubProcess.run(command) def stop_job(self, job_name: str): # Stop job command = f"kubectl delete -f " \ - f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs/{job_name}/k8s_configs/jobs.yml" + f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs/{job_name}/k8s_configs/jobs.yml" _ = SubProcess.run(command) @staticmethod @@ -613,8 +613,10 @@ def _create_k8s_job_config(self, job_name: str) -> dict: return k8s_job_config - def _create_k8s_container_config(self, job_details: dict, k8s_container_config_template: dict, - component_type: str, component_index: int): + def _create_k8s_container_config( + self, job_details: dict, k8s_container_config_template: dict, + component_type: str, component_index: int + ): # Copy config k8s_container_config = deepcopy(k8s_container_config_template) @@ -717,7 +719,7 @@ def _build_image_address(self, image_name: str) -> str: def list_job(self): # Get jobs details - command = f"kubectl get jobs -o=json" + command = "kubectl get jobs -o=json" return_str = SubProcess.run(command) job_details_list = json.loads(return_str)["items"] jobs_details = {} diff --git a/maro/cli/maro.py b/maro/cli/maro.py index 119829538..f2a03944c 100644 --- a/maro/cli/maro.py +++ b/maro/cli/maro.py @@ -15,7 +15,7 @@ from maro.utils.logger import CliLogger MARO_BANNER = """ - __ __ _ ____ ___ + __ __ _ ____ ___ | \/ | / \ | _ \ / _ \ | |\/| | / _ \ | |_) | | | | | | | |/ ___ \| _ <| |_| | @@ -43,10 +43,12 @@ def main(): # maro env parser_env = subparsers.add_parser( 'env', - help=('Get all environment-related information, ' - 'such as the supported scenarios, topologies. ' - 'And it is also responsible to generate data to the specific environment, ' - 'which has external data dependency.'), + help=( + 'Get all environment-related information, ' + 'such as the supported scenarios, topologies. ' + 'And it is also responsible to generate data to the specific environment, ' + 'which has external data dependency.' + ), parents=[global_parser] ) parser_env.set_defaults(func=_help_func(parser=parser_env)) @@ -804,8 +806,10 @@ def load_parser_data(prev_parser: ArgumentParser, global_parser: ArgumentParser) default=None, required=False, help=("Specified start timestamp (in UTC) for binary file, " - "then this timestamp will be considered as tick=0 for binary reader, " - "this can be used to adjust the reader pipeline.")) + "then this timestamp will be considered as tick=0 for binary reader, " + "this can be used to adjust the reader pipeline." + ) + ) build_cmd_parser.set_defaults(func=convert) diff --git a/maro/utils/logger.py b/maro/utils/logger.py index 280b3ec23..b4ab792dc 100644 --- a/maro/utils/logger.py +++ b/maro/utils/logger.py @@ -112,8 +112,10 @@ class Logger(object): ``DEBUG``. """ - def __init__(self, tag: str, format_: LogFormat = LogFormat.full, dump_folder: str = cwd, dump_mode: str = 'w', - extension_name: str = 'log', auto_timestamp: bool = True, stdout_level="INFO"): + def __init__( + self, tag: str, format_: LogFormat = LogFormat.full, dump_folder: str = cwd, dump_mode: str = 'w', + extension_name: str = 'log', auto_timestamp: bool = True, stdout_level="INFO" + ): self._file_format = FORMAT_NAME_TO_FILE_FORMAT[format_] self._stdout_format = FORMAT_NAME_TO_STDOUT_FORMAT[format_] \ if format_ in FORMAT_NAME_TO_STDOUT_FORMAT else \ @@ -207,9 +209,11 @@ def critical(self, msg, *args): class InternalLogger(Logger): """An internal logger uses for recording the internal system's log.""" - def __init__(self, component_name: str, tag: str = "maro_internal", format_: LogFormat = LogFormat.internal, - dump_folder: str = None, dump_mode: str = 'a', extension_name: str = 'log', - auto_timestamp: bool = False): + def __init__( + self, component_name: str, tag: str = "maro_internal", format_: LogFormat = LogFormat.internal, + dump_folder: str = None, dump_mode: str = 'a', extension_name: str = 'log', + auto_timestamp: bool = False + ): current_time = f"{datetime.now().strftime('%Y%m%d%H%M')}" self._dump_folder = dump_folder if dump_folder else \ os.path.join(os.path.expanduser("~"), ".maro/log", current_time, str(os.getpid())) From 999979f6fc29367966706cbcbfbad42893478c75 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Tue, 20 Oct 2020 11:54:22 +0800 Subject: [PATCH 28/30] update for lint --- maro/cli/maro.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/maro/cli/maro.py b/maro/cli/maro.py index f2a03944c..582181f08 100644 --- a/maro/cli/maro.py +++ b/maro/cli/maro.py @@ -805,11 +805,11 @@ def load_parser_data(prev_parser: ArgumentParser, global_parser: ArgumentParser) type=int, default=None, required=False, - help=("Specified start timestamp (in UTC) for binary file, " - "then this timestamp will be considered as tick=0 for binary reader, " - "this can be used to adjust the reader pipeline." - ) - ) + help=""" + Specified start timestamp (in UTC) for binary file, + then this timestamp will be considered as tick=0 for binary reader, + this can be used to adjust the reader pipeline. + """) build_cmd_parser.set_defaults(func=convert) From 6fa22e8359f11d00530f6d49a59074e6d8d688c6 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Tue, 20 Oct 2020 14:11:19 +0800 Subject: [PATCH 29/30] update for lint --- maro/cli/maro.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/maro/cli/maro.py b/maro/cli/maro.py index 582181f08..9d0d4a500 100644 --- a/maro/cli/maro.py +++ b/maro/cli/maro.py @@ -806,8 +806,8 @@ def load_parser_data(prev_parser: ArgumentParser, global_parser: ArgumentParser) default=None, required=False, help=""" - Specified start timestamp (in UTC) for binary file, - then this timestamp will be considered as tick=0 for binary reader, + Specified start timestamp (in UTC) for binary file, + then this timestamp will be considered as tick=0 for binary reader, this can be used to adjust the reader pipeline. """) From 89f108ccbd69dca1a10fd6c23300a8e977865cf7 Mon Sep 17 00:00:00 2001 From: pocket5084 Date: Wed, 21 Oct 2020 14:37:33 +0800 Subject: [PATCH 30/30] revert data version to 0.1 --- maro/__misc__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maro/__misc__.py b/maro/__misc__.py index cd38f5bc1..5f55447fa 100644 --- a/maro/__misc__.py +++ b/maro/__misc__.py @@ -4,4 +4,4 @@ __version__ = "0.1.1a11" -__data_version__ = "0.2" +__data_version__ = "0.1"