Skip to content

Commit

Permalink
Merge pull request #38 from runprism/docs-update
Browse files Browse the repository at this point in the history
Update and prettify documentation
  • Loading branch information
prism-admin committed Oct 2, 2023
2 parents 8bc79a8 + 6ac2ead commit 6e366a9
Show file tree
Hide file tree
Showing 23 changed files with 397 additions and 17 deletions.
9 changes: 2 additions & 7 deletions .github/workflows/ci-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,10 @@ jobs:
pip install tox tox-gh-actions
- name: Install docker
run: |
# Workaround for https://github.com/actions/runner-images/issues/8104
brew remove --ignore-dependencies qemu
curl -o ./qemu.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/dc0669eca9479e9eeb495397ba3a7480aaa45c2e/Formula/qemu.rb
brew install ./qemu.rb
brew install docker
colima start
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v2
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Test with tox
Expand Down
2 changes: 1 addition & 1 deletion prism/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#############

# Version number
VERSION = '0.2.5'
VERSION = '0.2.6'

# Root directory of project
ROOT_DIR = str(Path(os.path.dirname(__file__)).parent)
Expand Down
1 change: 0 additions & 1 deletion prism/docs/build/54968a39190c43d592b9.svg

This file was deleted.

Binary file removed prism/docs/build/737ad70b3f2d3a9b5f6e.ico
Binary file not shown.
Binary file added prism/docs/build/ae8a93980ebb6c55123b.ico
Binary file not shown.
Binary file added prism/docs/build/d4df11de40d39920ff8c.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions prism/docs/build/index.html

Large diffs are not rendered by default.

91 changes: 86 additions & 5 deletions prism/infra/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,65 @@ class TaskManifest:
"""

def __init__(self):
self.manifest_dict: Dict[str, Any] = {"targets": {}, "tasks": [], "refs": {}}
self.manifest_dict: Dict[str, Any] = {"targets": {}, "tasks": {}, "refs": {}}

def update_tasks_dir_key(self,
key: str,
level: Dict[str, Any] = {},
):
if key not in level.keys():
level[key] = {}
return level[key]

def add_task(self,
task_module: Path,
task_name: str
):
"""
We want the `tasks` key in our manifest to be structured as follows
"tasks": {
"<module_name>": [
"task_name1",
"task_name2"
],
"<dir_name>/" {
"<nested_module_name1>": [
"nested_task_name3",
"nested_task_name3"
]
}
...
}
"""
task_module_no_py = re.sub(r'\.py$', '', str(task_module))
processed_task_name = f"{task_module_no_py}.{task_name}"
self.manifest_dict["tasks"].append(processed_task_name)

# Determine if the task exists in a directory
flag_in_dir = False
task_module_no_py_split = task_module_no_py.split("/")
if len(task_module_no_py_split) > 1:
flag_in_dir = True

# If the task lives in a module, then the module name should be the key
if not flag_in_dir:
if task_module_no_py in self.manifest_dict["tasks"].keys():
self.manifest_dict["tasks"][task_module_no_py].append(task_name)
else:
self.manifest_dict["tasks"][task_module_no_py] = [task_name]

# If task lives in a nested directory, then the directory name should be the
# first key.
else:

# Create necessary nested directory keys
base_level = self.manifest_dict["tasks"]
for _k in task_module_no_py_split[:-1]:
base_level = self.update_tasks_dir_key(f"{_k}/", base_level)

# Update the module / task name
if task_module_no_py_split[-1] in base_level.keys():
base_level[task_module_no_py_split[-1]].append(task_name)
else:
base_level[task_module_no_py_split[-1]] = [task_name]

def add_refs(self,
target_module: Path,
Expand Down Expand Up @@ -65,16 +115,47 @@ class Manifest:

def __init__(self, task_manifests: List[TaskManifest] = []):
self.manifest_dict: Dict[str, Any] = {
"targets": {}, "prism_project": "", "tasks": [], "refs": {}
"targets": {}, "prism_project": "", "tasks": {}, "refs": {}
}
self.task_manifests = task_manifests

# Iterate through task manifests and add to manifest
for mm in self.task_manifests:
self.manifest_dict["targets"].update(mm.manifest_dict["targets"])
self.manifest_dict["tasks"].extend(mm.manifest_dict["tasks"])
self.update(self.manifest_dict["tasks"], mm.manifest_dict["tasks"])
self.manifest_dict["refs"].update(mm.manifest_dict["refs"])

def update(self,
manifest_dict: Dict[str, Any],
task_manifest_dict: Dict[str, Any],
) -> Dict[str, Any]:
"""
Recursive function to update `manifest_dict` with the contents of
`task_manifest_dict`. We need a recursive function, because the `tasks` key
within the manifest.json can have a bunch of nested dictionaries.
args:
manifest_dict: manifest dictionary
task_manifest_dict: task manifest dictionary
returns:
updated manifest_dict
"""
# Iterate through the task manifest's contents. Note that they should only have
# one key within `tasks`.
for k, v in task_manifest_dict.items():
if k not in manifest_dict.keys():
manifest_dict[k] = v
elif isinstance(manifest_dict[k], list):
for _item in v:
if _item not in manifest_dict[k]:
manifest_dict[k].append(_item)

# If the value is a dictionary and the manifest already has this dictionary,
# then we'll need to recursively update the manifest's dictionary.
elif isinstance(manifest_dict[k], dict):
self.update(manifest_dict[k], v)
return manifest_dict

def add_prism_project(self, prism_project_data: str):
self.manifest_dict["prism_project"] = prism_project_data

Expand Down
66 changes: 66 additions & 0 deletions prism/tests/integration/test_compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
###########

# Standard library imports
import json
import os
from pathlib import Path
import shutil
from typing import Any, Dict

# Prism imports
import prism.tests.integration.integration_test_class as integration_test_class
Expand Down Expand Up @@ -326,3 +328,67 @@ def test_project_with_bad_local_tasks(self):
# Set up wkdir for the next test case
shutil.rmtree(Path(wkdir / '.compiled'))
self._set_up_wkdir()

def test_project_with_nested_dir(self):
"""
`manifest` is compiled as expected when a project has nested directories
"""
def _compile_test(wkdir: Path, expected_tasks: Dict[str, Any]):
# Remove compiled directory, if it exists
os.chdir(wkdir)
self._remove_compiled_dir(wkdir)

# Compile the project
args = ['compile']
compile_run = self._run_prism(args)
compile_run_results = compile_run.get_results()
self.assertEqual(
' | '.join(simple_project_expected_events),
compile_run_results
)

# Check that .compiled directory is not created
self.assertTrue(Path(wkdir / '.compiled').is_dir())
self.assertTrue(Path(wkdir / '.compiled' / 'manifest.json').is_file())

# Open the manifest
with open(Path(wkdir / '.compiled' / 'manifest.json'), 'r') as f:
manifest_json = json.loads(f.read())
tasks = manifest_json["tasks"]
self.assertEqual(expected_tasks, tasks)

# Set up wkdir for the next test case
shutil.rmtree(Path(wkdir / '.compiled'))
self._set_up_wkdir()

# ------------------------------------------------------------------------------
# Just one nested directory

wkdir = Path(TEST_PROJECTS) / '010_project_nested_module_dirs'
expected_tasks = {
"extract/": {
"module01": ["Task01"],
"module02": ["Task02"],
},
"load/": {
"module03": ["Task03"],
},
"module04": ["Task04"]
}
_compile_test(wkdir, expected_tasks)

# ------------------------------------------------------------------------------
# Multiple nested directories

wkdir = Path(TEST_PROJECTS) / '010a_multiple_nested_dirs'
expected_tasks = {
"extract/": {
"module01": ["Task01"],
"module02": ["Task02"],
"load/": {
"module03": ["Task03"],
},
},
"module04": ["Task04"]
}
_compile_test(wkdir, expected_tasks)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "7eafd19a",
"metadata": {},
"source": [
"**Use this notebook for developing code before productionizing it within tasks**"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b771bf53",
"metadata": {},
"outputs": [],
"source": [
"## CODE HERE..."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello from task 1!
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Hello from task 1!
Hello from task 2!
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Prism project
"""

# Imports
import logging
from pathlib import Path
from prism.admin import generate_run_id, generate_run_slug


# Project metadata
NAME = ""
AUTHOR = ""
VERSION = ""
DESCRIPTION = """
"""

# Admin
RUN_ID = generate_run_id() # don't delete this!
SLUG = generate_run_slug() # don't delete this!


# sys.path config. This gives your tasks access to local tasks / packages that exist
# outside of your project structure.
SYS_PATH_CONF = [
Path(__file__).parent,
Path(__file__).parent.parent,
]


# Thread count: number of workers to use to execute tasks concurrently. If set to 1,
# then 1 task is run at a time.
THREADS = 1


# Profile directory and name
PROFILE_YML_PATH = Path(__file__).parent / 'profile.yml'
PROFILE = None # name of profile within `profiles.yml`


# Logger
PRISM_LOGGER = logging.getLogger("PRISM_LOGGER")


# Other variables / parameters. Make sure to capitalize all of these!
VAR_1 = {'a': 'b'}
VAR_2 = 200
VAR_3 = '2015-01-01'

# Paths
WKDIR = Path(__file__).parent
DATA = WKDIR / 'data'
OUTPUT = WKDIR / 'output'
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
###########
# Imports #
###########

# Prism infrastructure imports
import prism.task
import prism.target
import prism.decorators


####################
# Class definition #
####################

class Task03(prism.task.PrismTask):

# Run
def run(self, tasks, hooks):
"""
Execute task.
args:
tasks: used to reference output of other tasks --> tasks.ref('...')
hooks: built-in Prism hooks. These include:
- hooks.dbt_ref --> for getting dbt tasks as a pandas DataFrame
- hooks.sql --> for executing sql query using an adapter in profile YML
- hooks.spark --> for accessing SparkSession
returns:
task output
"""
lines = tasks.ref('extract/module02.py')
return lines + '\n' + 'Hello from task 3!'
Loading

0 comments on commit 6e366a9

Please sign in to comment.