Skip to content

Commit

Permalink
Merge pull request #24 from runprism/dev
Browse files Browse the repository at this point in the history
v0.2.0rc2
  • Loading branch information
mtrivedi50 committed Jul 20, 2023
2 parents 86988ca + 3372236 commit 65abebb
Show file tree
Hide file tree
Showing 309 changed files with 5,480 additions and 3,750 deletions.
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ Create a skeleton below with your project structure.
```
project
|-- prism_project.py
|-- modules
|-- module01.py
|-- tasks
|-- task01.py
...
...
```
Expand Down
10 changes: 5 additions & 5 deletions prism/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def check_conf_keys(self,
def prepare_paths_for_copy(self, project: PrismProject, tmpdir: str):
"""
Prism projects often rely on more than just their own directory. They can import
functions / modules or reference configuration files from other directories. We
functions / tasks or reference configuration files from other directories. We
need to copy all relevant directories into our agent, and we need to ensure that
these paths have the same relative location to the project directory.
Expand Down Expand Up @@ -174,7 +174,7 @@ def construct_command(self):
log_level = self.args.log_level
vars = self.args.vars
context = self.args.context
modules = self.args.modules
tasks = self.args.tasks
all_upstream = self.args.all_upstream
all_downstream = self.args.all_downstream

Expand All @@ -185,14 +185,14 @@ def construct_command(self):
f"{k}={v}" for k, v in vars.items()
])
context_cmd = "" if context == '{}' else f"--context '{context}'"
modules_cmd = "" if modules is None else " " .join([
f"--module {m.replace('.py', '')}" for m in modules
tasks_cmd = "" if tasks is None else " " .join([
f"--task {m}" for m in tasks
])
all_upstream_cmd = "" if not all_upstream else "--all-upstream"
all_downstream_cmd = "" if not all_downstream else "--all-downstream"

# Full command
full_cmd = f"prism run {full_tb_cmd} {log_level_cmd} {vars_cmd} {context_cmd} {modules_cmd} {all_upstream_cmd} {all_downstream_cmd}" # noqa: E501
full_cmd = f"prism run {full_tb_cmd} {log_level_cmd} {vars_cmd} {context_cmd} {tasks_cmd} {all_upstream_cmd} {all_downstream_cmd}" # noqa: E501
return full_cmd

def parse_environment_variables(self, agent_conf: Dict[str, Any]) -> Dict[str, str]:
Expand Down
4 changes: 2 additions & 2 deletions prism/agents/docker_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def prepare_paths_for_copy(self,
):
"""
Prism projects often rely on more than just their own directory. They can import
functions / modules or reference configuration files from other directories. We
functions / tasks or reference configuration files from other directories. We
need to copy all relevant directories into our agent, and we need to ensure that
these paths have the same relative location to the project directory.
Expand Down Expand Up @@ -366,7 +366,7 @@ def parse_base_image(self,
return agent_conf["image"]

# If the user doesn't specify a base image, then use the default image specified
# in the Prism constants module.
# in the Prism constants task.
else:
return prism.constants.DEFAULT_DOCKER_IMAGE

Expand Down
23 changes: 18 additions & 5 deletions prism/agents/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ def get_all_project_paths(self,
):
"""
Prism projects often rely on more than just their own directory. They can import
functions / modules or reference configuration files from other directories. We
functions / tasks or reference configuration files from other directories. We
need to copy all relevant directories into our agent, and we need to ensure that
these paths have the same relative location to the project directory.
Expand Down Expand Up @@ -866,7 +866,7 @@ def stream_logs(self,
break
self._log_output(color, which, output)

return process.stdout, process.stderr
return process.stdout, process.stderr, process.returncode

def apply(self):
"""
Expand Down Expand Up @@ -917,8 +917,17 @@ def apply(self):
]

# Open a subprocess and stream the logs
return_code = self.stream_logs(cmd, prism.ui.AGENT_WHICH_BUILD, "build")
return return_code
_, err, returncode = self.stream_logs(cmd, prism.ui.AGENT_WHICH_BUILD, "build")

# Log anything from stderr that was printed in the project
for line in err.readlines():
prism.prism_logging.DEFAULT_LOGGER.agent( # type: ignore
f"{prism.ui.AGENT_EVENT}{self.instance_name}{prism.ui.AGENT_WHICH_BUILD}[build]{prism.ui.RESET} | {line.rstrip()}" # noqa: E501
)

# Return the returncode. Return a dictionary in order to avoid confusing this
# output with the output of an event manager.
return {"return_code": returncode}

def run(self):
"""
Expand Down Expand Up @@ -947,14 +956,18 @@ def run(self):
'-d', str(self.project.project_dir),
'-c', full_cmd,
]
out, _ = self.stream_logs(cmd, prism.ui.AGENT_WHICH_RUN, "run")
out, _, returncode = self.stream_logs(cmd, prism.ui.AGENT_WHICH_RUN, "run")

# Log anything from stdout that was printed in the project
for line in out.readlines():
prism.prism_logging.DEFAULT_LOGGER.agent( # type: ignore
f"{prism.ui.AGENT_EVENT}{self.instance_name}{prism.ui.AGENT_WHICH_RUN}[run]{prism.ui.RESET} | {line.rstrip()}" # noqa: E501
)

# Return the returncode. Return a dictionary in order to avoid confusing this
# output with the output of an event manager.
return {"return_code": returncode}

def delete(self):
"""
Delete all resources associated with agent. This includes:
Expand Down
4 changes: 3 additions & 1 deletion prism/agents/scripts/apply.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ echo "Updating remote project and file paths"
# Copy project directory and other copy paths into the EC2 instance
ssh -i ${pem_path} ${user}@${public_dns_name} "sudo mkdir -p .${project_dir}; sudo chmod 777 -R .${project_dir}"
scp -r -i ${pem_path} ${project_dir} ${user}@${public_dns_name}:.${project_dir}
echo "Copied project directory into instance"

IFS=',' read -ra array <<< "${copy_paths}"
for path in "${array[@]}"; do
Expand All @@ -68,6 +69,7 @@ for path in "${array[@]}"; do

# Copy
scp -r -i ${pem_path} ${path} ${user}@${public_dns_name}:.${path%/*} 2> scp.log
echo "Copied path ${path} into instance"
done

# Environment variables. Environment variable are passed a comma-separated list of
Expand All @@ -85,12 +87,12 @@ for keyvalue in "${env_array[@]}"; do
else
ssh -i ${pem_path} ${user}@${public_dns_name} "echo 'export ${key}=${value}' >> ~/.bashrc"
fi
echo "Updated environment variable ${key}=${value}"
done

# Reload .bashrc to update environment variables
ssh -i ${pem_path} ${user}@${public_dns_name} "source ~/.bashrc"

# Move all folders into the root folder
ssh -i ${pem_path} ${user}@${public_dns_name} 'cd ~ && for dir in */; do sudo mv $dir ../../$dir ; done'

echo "Done updating remote project and file paths"
27 changes: 21 additions & 6 deletions prism/cli/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,19 @@ def build_agent(self,

elif args.which == "agent-delete":
agent.delete()
return agent

# Otherwise, we either want to build the agent (`prism agent apply`) or build &
# run the agent (`prism agent build`). We run the agent in a separate function,
# so focus on just building it for now.
else:
agent.apply()
return agent
return agent.apply()

def run_agent(self, agent: Agent):
"""
Thin wrapper around the agent `run` function.
"""
agent.run()
return
return agent.run()

def run(self) -> prism.cli.base.TaskRunReturnResult:
"""
Expand Down Expand Up @@ -231,7 +230,7 @@ def run(self) -> prism.cli.base.TaskRunReturnResult:
build_event_to_fire = build_manager_output.event_to_fire
event_list = build_manager_output.event_list

# Log an error if one occurs
# Log any Python error that we encounter
if agent == 0:
event_list = fire_console_event(
build_event_to_fire,
Expand All @@ -241,6 +240,14 @@ def run(self) -> prism.cli.base.TaskRunReturnResult:
event_list = self.fire_tail_event(event_list)
return prism.cli.base.TaskRunReturnResult(event_list, True)

# If the `apply` script exits with a non-zero return code, then exit
if isinstance(agent, dict) and agent["return_code"] != 0:
event_list = fire_console_event(
prism.prism_logging.SeparatorEvent(),
event_list
)
return prism.cli.base.TaskRunReturnResult(event_list, True)

# ------------------------------------------------------------------------------
# Execute the agent

Expand Down Expand Up @@ -271,7 +278,7 @@ def run(self) -> prism.cli.base.TaskRunReturnResult:
agent_event_to_fire = agent_event_manager_output.event_to_fire
event_list = agent_event_manager_output.event_list

# Log an error if one occurs
# Log any Python error that we encounter
if agent_output == 0:
event_list = fire_console_event(
agent_event_to_fire,
Expand All @@ -281,6 +288,14 @@ def run(self) -> prism.cli.base.TaskRunReturnResult:
event_list = self.fire_tail_event(event_list)
return prism.cli.base.TaskRunReturnResult(event_list, True)

# If the `run` script exits with a non-zero return code, then return
if isinstance(agent_output, dict) and agent_output["return_code"] != 0:
event_list = fire_console_event(
prism.prism_logging.SeparatorEvent(),
event_list
)
return prism.cli.base.TaskRunReturnResult(event_list, True)

# Now, we're done streaming logs
event_list = fire_empty_line_event()
event_list = fire_console_event(
Expand Down
28 changes: 20 additions & 8 deletions prism/cli/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,26 @@ def get_project_dir():
while cwd != root_path:
project_file = cwd / 'prism_project.py'
if project_file.is_file():
if not Path(cwd / 'modules').is_dir():
modules_dir_not_found_msg = ' '.join([
'modules directory not found in project directory or any',
'of its parents',
])
raise prism.exceptions.ModulesDirNotFoundException(
modules_dir_not_found_msg
)

# Check if `tasks` and/or `modules` directory exist
tasks_dir = Path(cwd) / 'tasks'
modules_dir = Path(cwd) / 'modules'
if not tasks_dir.is_dir():

# If the `modules`` directory exists, then skip... we'll throw a warning
# later
if modules_dir.is_dir():
pass

# Otherwise, throw an error
else:
tasks_dir_not_found_msg = ' '.join([
'tasks directory not found in project directory or any',
'of its parents',
])
raise prism.exceptions.TasksDirNotFoundException(
tasks_dir_not_found_msg
)
return cwd
else:
cwd = cwd.parent
Expand Down
Loading

0 comments on commit 65abebb

Please sign in to comment.