Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update and prettify documentation #38

Merged
merged 6 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .github/workflows/ci-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,10 @@ jobs:
pip install tox tox-gh-actions
- name: Install docker
run: |
# Workaround for https://github.com/actions/runner-images/issues/8104
brew remove --ignore-dependencies qemu
curl -o ./qemu.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/dc0669eca9479e9eeb495397ba3a7480aaa45c2e/Formula/qemu.rb
brew install ./qemu.rb

brew install docker
colima start
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v2
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Test with tox
Expand Down
2 changes: 1 addition & 1 deletion prism/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#############

# Version number
VERSION = '0.2.5'
VERSION = '0.2.6'

# Root directory of project
ROOT_DIR = str(Path(os.path.dirname(__file__)).parent)
Expand Down
1 change: 0 additions & 1 deletion prism/docs/build/54968a39190c43d592b9.svg

This file was deleted.

Binary file removed prism/docs/build/737ad70b3f2d3a9b5f6e.ico
Binary file not shown.
Binary file added prism/docs/build/ae8a93980ebb6c55123b.ico
Binary file not shown.
Binary file added prism/docs/build/d4df11de40d39920ff8c.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions prism/docs/build/index.html

Large diffs are not rendered by default.

91 changes: 86 additions & 5 deletions prism/infra/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,65 @@ class TaskManifest:
"""

def __init__(self):
self.manifest_dict: Dict[str, Any] = {"targets": {}, "tasks": [], "refs": {}}
self.manifest_dict: Dict[str, Any] = {"targets": {}, "tasks": {}, "refs": {}}

def update_tasks_dir_key(self,
key: str,
level: Dict[str, Any] = {},
):
if key not in level.keys():
level[key] = {}
return level[key]

def add_task(self,
task_module: Path,
task_name: str
):
"""
We want the `tasks` key in our manifest to be structured as follows
"tasks": {
"<module_name>": [
"task_name1",
"task_name2"
],
"<dir_name>/" {
"<nested_module_name1>": [
"nested_task_name3",
"nested_task_name3"
]
}
...
}
"""
task_module_no_py = re.sub(r'\.py$', '', str(task_module))
processed_task_name = f"{task_module_no_py}.{task_name}"
self.manifest_dict["tasks"].append(processed_task_name)

# Determine if the task exists in a directory
flag_in_dir = False
task_module_no_py_split = task_module_no_py.split("/")
if len(task_module_no_py_split) > 1:
flag_in_dir = True

# If the task lives in a module, then the module name should be the key
if not flag_in_dir:
if task_module_no_py in self.manifest_dict["tasks"].keys():
self.manifest_dict["tasks"][task_module_no_py].append(task_name)
else:
self.manifest_dict["tasks"][task_module_no_py] = [task_name]

# If task lives in a nested directory, then the directory name should be the
# first key.
else:

# Create necessary nested directory keys
base_level = self.manifest_dict["tasks"]
for _k in task_module_no_py_split[:-1]:
base_level = self.update_tasks_dir_key(f"{_k}/", base_level)

# Update the module / task name
if task_module_no_py_split[-1] in base_level.keys():
base_level[task_module_no_py_split[-1]].append(task_name)
else:
base_level[task_module_no_py_split[-1]] = [task_name]

def add_refs(self,
target_module: Path,
Expand Down Expand Up @@ -65,16 +115,47 @@ class Manifest:

def __init__(self, task_manifests: List[TaskManifest] = []):
self.manifest_dict: Dict[str, Any] = {
"targets": {}, "prism_project": "", "tasks": [], "refs": {}
"targets": {}, "prism_project": "", "tasks": {}, "refs": {}
}
self.task_manifests = task_manifests

# Iterate through task manifests and add to manifest
for mm in self.task_manifests:
self.manifest_dict["targets"].update(mm.manifest_dict["targets"])
self.manifest_dict["tasks"].extend(mm.manifest_dict["tasks"])
self.update(self.manifest_dict["tasks"], mm.manifest_dict["tasks"])
self.manifest_dict["refs"].update(mm.manifest_dict["refs"])

def update(self,
manifest_dict: Dict[str, Any],
task_manifest_dict: Dict[str, Any],
) -> Dict[str, Any]:
"""
Recursive function to update `manifest_dict` with the contents of
`task_manifest_dict`. We need a recursive function, because the `tasks` key
within the manifest.json can have a bunch of nested dictionaries.

args:
manifest_dict: manifest dictionary
task_manifest_dict: task manifest dictionary
returns:
updated manifest_dict
"""
# Iterate through the task manifest's contents. Note that they should only have
# one key within `tasks`.
for k, v in task_manifest_dict.items():
if k not in manifest_dict.keys():
manifest_dict[k] = v
elif isinstance(manifest_dict[k], list):
for _item in v:
if _item not in manifest_dict[k]:
manifest_dict[k].append(_item)

# If the value is a dictionary and the manifest already has this dictionary,
# then we'll need to recursively update the manifest's dictionary.
elif isinstance(manifest_dict[k], dict):
self.update(manifest_dict[k], v)
return manifest_dict

def add_prism_project(self, prism_project_data: str):
self.manifest_dict["prism_project"] = prism_project_data

Expand Down
66 changes: 66 additions & 0 deletions prism/tests/integration/test_compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
###########

# Standard library imports
import json
import os
from pathlib import Path
import shutil
from typing import Any, Dict

# Prism imports
import prism.tests.integration.integration_test_class as integration_test_class
Expand Down Expand Up @@ -326,3 +328,67 @@ def test_project_with_bad_local_tasks(self):
# Set up wkdir for the next test case
shutil.rmtree(Path(wkdir / '.compiled'))
self._set_up_wkdir()

def test_project_with_nested_dir(self):
"""
`manifest` is compiled as expected when a project has nested directories
"""
def _compile_test(wkdir: Path, expected_tasks: Dict[str, Any]):
# Remove compiled directory, if it exists
os.chdir(wkdir)
self._remove_compiled_dir(wkdir)

# Compile the project
args = ['compile']
compile_run = self._run_prism(args)
compile_run_results = compile_run.get_results()
self.assertEqual(
' | '.join(simple_project_expected_events),
compile_run_results
)

# Check that .compiled directory is not created
self.assertTrue(Path(wkdir / '.compiled').is_dir())
self.assertTrue(Path(wkdir / '.compiled' / 'manifest.json').is_file())

# Open the manifest
with open(Path(wkdir / '.compiled' / 'manifest.json'), 'r') as f:
manifest_json = json.loads(f.read())
tasks = manifest_json["tasks"]
self.assertEqual(expected_tasks, tasks)

# Set up wkdir for the next test case
shutil.rmtree(Path(wkdir / '.compiled'))
self._set_up_wkdir()

# ------------------------------------------------------------------------------
# Just one nested directory

wkdir = Path(TEST_PROJECTS) / '010_project_nested_module_dirs'
expected_tasks = {
"extract/": {
"module01": ["Task01"],
"module02": ["Task02"],
},
"load/": {
"module03": ["Task03"],
},
"module04": ["Task04"]
}
_compile_test(wkdir, expected_tasks)

# ------------------------------------------------------------------------------
# Multiple nested directories

wkdir = Path(TEST_PROJECTS) / '010a_multiple_nested_dirs'
expected_tasks = {
"extract/": {
"module01": ["Task01"],
"module02": ["Task02"],
"load/": {
"module03": ["Task03"],
},
},
"module04": ["Task04"]
}
_compile_test(wkdir, expected_tasks)
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "7eafd19a",
"metadata": {},
"source": [
"**Use this notebook for developing code before productionizing it within tasks**"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b771bf53",
"metadata": {},
"outputs": [],
"source": [
"## CODE HERE..."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello from task 1!
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Hello from task 1!
Hello from task 2!
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Prism project
"""

# Imports
import logging
from pathlib import Path
from prism.admin import generate_run_id, generate_run_slug


# Project metadata
NAME = ""
AUTHOR = ""
VERSION = ""
DESCRIPTION = """
"""

# Admin
RUN_ID = generate_run_id() # don't delete this!
SLUG = generate_run_slug() # don't delete this!


# sys.path config. This gives your tasks access to local tasks / packages that exist
# outside of your project structure.
SYS_PATH_CONF = [
Path(__file__).parent,
Path(__file__).parent.parent,
]


# Thread count: number of workers to use to execute tasks concurrently. If set to 1,
# then 1 task is run at a time.
THREADS = 1


# Profile directory and name
PROFILE_YML_PATH = Path(__file__).parent / 'profile.yml'
PROFILE = None # name of profile within `profiles.yml`


# Logger
PRISM_LOGGER = logging.getLogger("PRISM_LOGGER")


# Other variables / parameters. Make sure to capitalize all of these!
VAR_1 = {'a': 'b'}
VAR_2 = 200
VAR_3 = '2015-01-01'

# Paths
WKDIR = Path(__file__).parent
DATA = WKDIR / 'data'
OUTPUT = WKDIR / 'output'
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
###########
# Imports #
###########

# Prism infrastructure imports
import prism.task
import prism.target
import prism.decorators


####################
# Class definition #
####################

class Task03(prism.task.PrismTask):

# Run
def run(self, tasks, hooks):
"""
Execute task.

args:
tasks: used to reference output of other tasks --> tasks.ref('...')
hooks: built-in Prism hooks. These include:
- hooks.dbt_ref --> for getting dbt tasks as a pandas DataFrame
- hooks.sql --> for executing sql query using an adapter in profile YML
- hooks.spark --> for accessing SparkSession
returns:
task output
"""
lines = tasks.ref('extract/module02.py')
return lines + '\n' + 'Hello from task 3!'
Loading
Loading