Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
897e2b6
fix deployment issue in multi envs
zhawan Sep 22, 2020
79a9954
fix typo
zhawan Sep 22, 2020
09cd441
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 22, 2020
d9e117a
fix ~/.maro not exist issue in build
zhawan Sep 22, 2020
b94ff95
skip deploy when build
zhawan Sep 22, 2020
d0c94f4
update for comments
zhawan Sep 22, 2020
f371cf2
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 23, 2020
30c0b9c
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 23, 2020
1366871
temporarily disable weather info
zhawan Sep 23, 2020
afdb600
replace ecr with cim in setup.py
zhawan Sep 23, 2020
5f6dd92
replace ecr in manifest
zhawan Sep 23, 2020
92b31c0
remove weather check when read data
zhawan Sep 23, 2020
aa8567c
fix station id issue
zhawan Sep 23, 2020
829429c
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 23, 2020
12d59ac
fix format
zhawan Sep 23, 2020
b65ddad
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 23, 2020
e2d4d43
add TODO in comments
zhawan Sep 23, 2020
e728f13
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 23, 2020
d8f14b2
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 24, 2020
ed8b68b
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 24, 2020
8f53aee
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 24, 2020
5bee6ae
add noaa weather source
zhawan Sep 24, 2020
ee5badd
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 24, 2020
8b2d918
fix weather reset and weather comment
zhawan Sep 24, 2020
b33d755
add comment for weather data url
zhawan Sep 25, 2020
c60112a
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 25, 2020
6bf4243
some format update
zhawan Sep 25, 2020
b5fcf7b
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 27, 2020
c8c4c6f
add fall back function in weather download
zhawan Sep 27, 2020
b06a920
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 28, 2020
1eb330c
update comment
zhawan Sep 28, 2020
f832ca4
update for comments
zhawan Sep 30, 2020
fe58f36
update comment
zhawan Sep 30, 2020
c43f4af
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 30, 2020
2aaa305
add period
zhawan Sep 30, 2020
1875393
fix for pylint
zhawan Sep 30, 2020
6a65e3c
update for pylint check
zhawan Sep 30, 2020
b2c874d
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Sep 30, 2020
5bba512
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Oct 10, 2020
d5b851b
update data pipeline docs and data version
zhawan Oct 16, 2020
1594b23
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Oct 16, 2020
97b7501
update for comments
zhawan Oct 19, 2020
152a484
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Oct 19, 2020
fac0844
update for comments
zhawan Oct 19, 2020
56a3396
update for lint
zhawan Oct 20, 2020
999979f
update for lint
zhawan Oct 20, 2020
6fa22e8
update for lint
zhawan Oct 20, 2020
03d2150
Merge remote-tracking branch 'upstream/v0.1' into v0.1
zhawan Oct 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions docs/source/scenarios/citi_bike.rst
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,17 @@ Quick Start
Data Preparation
^^^^^^^^^^^^^^^^

To start the simulation of Citi Bike scenario, users need to first generate the
related data. Below is the introduction to the related commands:
To start the simulation of Citi Bike scenario, users have two options for the data preparation:

* If the topology data is not generated in advance, the system will automatically download and
process the relevant data when the environment is created. The data will be stored in a
temporary folder and be automatically deleted after the experiment.

* Before creating the environment, users can also manually download and generate relevant data.
This approach will save you a lot of time if you need to conduct several experiments on the
same topology. Therefore, we encourage you to generate the relevant data manually first.

The following is the introduction to related commands:

Comment thread
zhawan marked this conversation as resolved.
Environment List Command
~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -563,19 +572,19 @@ For the example above, the directory structure should be like:
|-- ~/.maro
Comment thread
zhawan marked this conversation as resolved.
|-- data
| |-- citi_bike
| |-- [topology]
| |-- .build # bin data file
| |-- .source
| |-- .download # original data file
| |-- .clean # cleaned data file
|-- temp # download temp file

Convert Command
~~~~~~~~~~~~~~~
| |-- .build # bin data file
| |-- [topology] # topology
| |-- .source
| |-- .download # original data file
| |-- .clean # cleaned data file
|-- temp # download temp file

Build Command
~~~~~~~~~~~~~

The data ``convert`` command is used to convert the CSV data files to binary data
The data ``build`` command is used to build the CSV data files to binary data
files that the simulator needs. Currently, there are three arguments for the data
``convert`` command:
``build`` command:

* ``--meta``\ : required, used to specify the path of the meta file. The source
columns that to be converted and the data type of each columns should be
Expand All @@ -587,7 +596,7 @@ files that the simulator needs. Currently, there are three arguments for the dat

.. code-block:: sh

maro data convert --meta ~/.maro/data/citibike/meta/trips.yml --file ~/.maro/data/citibike/source/_clean/ny201801/trip.csv --output ~/.maro/data/citibike/_build/ny201801/trip.bin
maro data build --meta ~/.maro/data/citibike/meta/trips.yml --file ~/.maro/data/citibike/source/_clean/ny201801/trip.csv --output ~/.maro/data/citibike/_build/ny201801/trip.bin

Environment Interface
^^^^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion maro/__misc__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

__version__ = "0.1.1a11"

__data_version__ = "0.1"
__data_version__ = "0.2"
Comment thread
zhawan marked this conversation as resolved.
48 changes: 32 additions & 16 deletions maro/cli/grass/executors/grass_azure_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
from maro.cli.utils.details import (load_cluster_details, save_cluster_details, load_job_details, save_job_details,
load_schedule_details, save_schedule_details)
from maro.cli.utils.executors.azure_executor import AzureExecutor
from maro.cli.utils.naming import (generate_cluster_id, generate_job_id, generate_component_id, generate_node_name,
get_valid_file_name)
from maro.cli.utils.naming import (
generate_cluster_id, generate_job_id, generate_component_id, generate_node_name,
get_valid_file_name
)
from maro.cli.utils.params import GlobalParams, GlobalPaths
from maro.cli.utils.subprocess import SubProcess
from maro.cli.utils.validation import validate_and_fill_dict
Expand Down Expand Up @@ -194,18 +196,30 @@ def _init_master(self):
self.grass_executor.retry_until_connected(node_ip_address=master_public_ip_address)

# Create folders
sync_mkdir(remote_path=GlobalPaths.MARO_GRASS_LIB,
admin_username=admin_username, node_ip_address=master_public_ip_address)
sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}",
admin_username=admin_username, node_ip_address=master_public_ip_address)
sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/data",
admin_username=admin_username, node_ip_address=master_public_ip_address)
sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/images",
admin_username=admin_username, node_ip_address=master_public_ip_address)
sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs",
admin_username=admin_username, node_ip_address=master_public_ip_address)
sync_mkdir(remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/schedules",
admin_username=admin_username, node_ip_address=master_public_ip_address)
sync_mkdir(
remote_path=GlobalPaths.MARO_GRASS_LIB,
admin_username=admin_username, node_ip_address=master_public_ip_address
)
sync_mkdir(
remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}",
admin_username=admin_username, node_ip_address=master_public_ip_address
)
sync_mkdir(
remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/data",
admin_username=admin_username, node_ip_address=master_public_ip_address
)
sync_mkdir(
remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/images",
admin_username=admin_username, node_ip_address=master_public_ip_address
)
sync_mkdir(
remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs",
admin_username=admin_username, node_ip_address=master_public_ip_address
)
sync_mkdir(
remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/schedules",
admin_username=admin_username, node_ip_address=master_public_ip_address
)

# Copy required files
copy_files_to_node(
Expand Down Expand Up @@ -698,8 +712,10 @@ def _count_running_containers(node_details: dict):

# maro grass image

def push_image(self, image_name: str, image_path: str, remote_context_path: str,
remote_image_name: str):
def push_image(
self, image_name: str, image_path: str, remote_context_path: str,
remote_image_name: str
):
# Load details
cluster_details = self.cluster_details
admin_username = cluster_details['user']['admin_username']
Expand Down
32 changes: 17 additions & 15 deletions maro/cli/k8s/executors/k8s_azure_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ def _create_k8s_secret(self):

# Create k8s secret
command = f'kubectl create secret generic {cluster_id}-k8s-secret ' \
f'--from-literal=azurestorageaccountname={cluster_id}st ' \
f'--from-literal=azurestorageaccountkey={storage_key}'
f'--from-literal=azurestorageaccountname={cluster_id}st ' \
f'--from-literal=azurestorageaccountkey={storage_key}'
_ = SubProcess.run(command)
logger.debug(command)

Expand Down Expand Up @@ -442,9 +442,9 @@ def push_data(self, local_path: str, remote_dir: str):
if not target_dir.startswith("/"):
raise CliException("Invalid remote path")
copy_command = f'azcopy copy ' \
f'"{source_path}" ' \
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{target_dir}?{sas}" ' \
f'--recursive=True'
f'"{source_path}" ' \
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{target_dir}?{sas}" ' \
f'--recursive=True'
_ = SubProcess.run(copy_command)

def pull_data(self, local_dir: str, remote_path: str):
Expand All @@ -464,9 +464,9 @@ def pull_data(self, local_dir: str, remote_path: str):
if not source_path.startswith("/"):
raise CliException("Invalid remote path")
copy_command = f'azcopy copy ' \
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{source_path}?{sas}" ' \
f'"{os.path.expanduser(target_dir)}" ' \
f'--recursive=True'
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{source_path}?{sas}" ' \
f'"{os.path.expanduser(target_dir)}" ' \
f'--recursive=True'
_ = SubProcess.run(copy_command)

def remove_data(self, remote_path: str):
Expand All @@ -481,8 +481,8 @@ def remove_data(self, remote_path: str):

# Remove data
copy_command = f'azcopy remove ' \
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{remote_path}?{sas}" ' \
f'--recursive=True'
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{remote_path}?{sas}" ' \
f'--recursive=True'
_ = SubProcess.run(copy_command)

def _check_and_get_account_sas(self):
Expand Down Expand Up @@ -545,13 +545,13 @@ def _start_job(self, job_details: dict):

# Apply k8s config
command = f"kubectl apply -f " \
f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs/{job_name}/k8s_configs/jobs.yml"
f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs/{job_name}/k8s_configs/jobs.yml"
_ = SubProcess.run(command)

def stop_job(self, job_name: str):
# Stop job
command = f"kubectl delete -f " \
f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs/{job_name}/k8s_configs/jobs.yml"
f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/jobs/{job_name}/k8s_configs/jobs.yml"
_ = SubProcess.run(command)

@staticmethod
Expand Down Expand Up @@ -613,8 +613,10 @@ def _create_k8s_job_config(self, job_name: str) -> dict:

return k8s_job_config

def _create_k8s_container_config(self, job_details: dict, k8s_container_config_template: dict,
component_type: str, component_index: int):
def _create_k8s_container_config(
self, job_details: dict, k8s_container_config_template: dict,
component_type: str, component_index: int
):
# Copy config
k8s_container_config = deepcopy(k8s_container_config_template)

Expand Down Expand Up @@ -717,7 +719,7 @@ def _build_image_address(self, image_name: str) -> str:

def list_job(self):
# Get jobs details
command = f"kubectl get jobs -o=json"
command = "kubectl get jobs -o=json"
return_str = SubProcess.run(command)
job_details_list = json.loads(return_str)["items"]
jobs_details = {}
Expand Down
20 changes: 12 additions & 8 deletions maro/cli/maro.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from maro.utils.logger import CliLogger

MARO_BANNER = """
__ __ _ ____ ___
 __ __ _ ____ ___
| \/ | / \ | _ \ / _ \
| |\/| | / _ \ | |_) | | | |
| | | |/ ___ \| _ <| |_| |
Expand Down Expand Up @@ -43,10 +43,12 @@ def main():
# maro env
parser_env = subparsers.add_parser(
'env',
help=('Get all environment-related information, '
'such as the supported scenarios, topologies. '
'And it is also responsible to generate data to the specific environment, '
'which has external data dependency.'),
help=(
'Get all environment-related information, '
'such as the supported scenarios, topologies. '
'And it is also responsible to generate data to the specific environment, '
'which has external data dependency.'
),
parents=[global_parser]
)
parser_env.set_defaults(func=_help_func(parser=parser_env))
Expand Down Expand Up @@ -803,9 +805,11 @@ def load_parser_data(prev_parser: ArgumentParser, global_parser: ArgumentParser)
type=int,
default=None,
required=False,
help=("Specified start timestamp (in UTC) for binary file, "
"then this timestamp will be considered as tick=0 for binary reader, "
"this can be used to adjust the reader pipeline."))
help="""
Specified start timestamp (in UTC) for binary file,
then this timestamp will be considered as tick=0 for binary reader,
this can be used to adjust the reader pipeline.
""")

build_cmd_parser.set_defaults(func=convert)

Expand Down
14 changes: 9 additions & 5 deletions maro/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ class Logger(object):
``DEBUG``.
"""

def __init__(self, tag: str, format_: LogFormat = LogFormat.full, dump_folder: str = cwd, dump_mode: str = 'w',
extension_name: str = 'log', auto_timestamp: bool = True, stdout_level="INFO"):
def __init__(
self, tag: str, format_: LogFormat = LogFormat.full, dump_folder: str = cwd, dump_mode: str = 'w',
extension_name: str = 'log', auto_timestamp: bool = True, stdout_level="INFO"
):
self._file_format = FORMAT_NAME_TO_FILE_FORMAT[format_]
self._stdout_format = FORMAT_NAME_TO_STDOUT_FORMAT[format_] \
if format_ in FORMAT_NAME_TO_STDOUT_FORMAT else \
Expand Down Expand Up @@ -207,9 +209,11 @@ def critical(self, msg, *args):
class InternalLogger(Logger):
"""An internal logger uses for recording the internal system's log."""

def __init__(self, component_name: str, tag: str = "maro_internal", format_: LogFormat = LogFormat.internal,
dump_folder: str = None, dump_mode: str = 'a', extension_name: str = 'log',
auto_timestamp: bool = False):
def __init__(
self, component_name: str, tag: str = "maro_internal", format_: LogFormat = LogFormat.internal,
dump_folder: str = None, dump_mode: str = 'a', extension_name: str = 'log',
auto_timestamp: bool = False
):
current_time = f"{datetime.now().strftime('%Y%m%d%H%M')}"
self._dump_folder = dump_folder if dump_folder else \
os.path.join(os.path.expanduser("~"), ".maro/log", current_time, str(os.getpid()))
Expand Down