Skip to content

Commit

Permalink
Merge pull request #39 from runprism/issue-33
Browse files Browse the repository at this point in the history
Issue 33: `load_hooks` function
  • Loading branch information
prism-admin committed Oct 26, 2023
2 parents 6e366a9 + 0b92481 commit e517a91
Show file tree
Hide file tree
Showing 22 changed files with 123 additions and 69 deletions.
25 changes: 25 additions & 0 deletions prism/infra/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# Standard library imports
import pandas as pd
from typing import Any, Optional
from pathlib import Path

# Prism-specific imports
from prism.cli.base import get_project_dir
from prism.infra import project as prism_project
import prism.constants
import prism.exceptions
Expand Down Expand Up @@ -131,3 +133,26 @@ def dbt_ref(self,

df = dbt_project.handle_ref(target_1, target_2, target_version)
return df


# Function to load hooks in a script or environment
def load_hooks(project_dir: Optional[Path] = None):
"""
Load the PrismHooks associated with the current project
"""
if project_dir is None:
project_dir = get_project_dir()
project = prism_project.PrismProject(
project_dir=project_dir,
user_context={},
which="run"
)
project.setup()

# Hooks object
hooks = PrismHooks(project)

# Print a warning if the hooks are empty
if hooks.project.adapters_object_dict == {}:
print("WARNING: Your hooks are empty! Create a profile YAML to populate your hooks") # noqa
return hooks
71 changes: 71 additions & 0 deletions prism/tests/integration/test_load_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
Load hooks unit test
Table of Contents:
- Imports
- Test case class definition
"""

###########
# Imports #
###########

# Standard library imports
import os
from pathlib import Path

# Prism imports
from prism.infra.hooks import PrismHooks, load_hooks
from prism.profiles import snowflake, pyspark, postgres
from prism.tests.integration import integration_test_class

# Paths
TEST_PROJECTS = Path(__file__).parent / 'test_projects'


##############################
# Test case class definition #
##############################

class TestLoadHooks(integration_test_class.IntegrationTestCase):

def test_load_hooks_in_project(self):
"""
`load_hook` produces a PrismHooks object when the user is in a project directory
"""
PROJECT_PATH = TEST_PROJECTS / '013_hooks_sql_spark'
os.chdir(PROJECT_PATH)

# Load hooks
hooks = load_hooks()
adapter_dict = hooks.project.adapters_object_dict

# Test
self.assertTrue(isinstance(hooks, PrismHooks))
self.assertTrue("snowflake_base" in adapter_dict.keys())
self.assertTrue("pyspark_base" in adapter_dict.keys())
self.assertTrue("postgres_base" in adapter_dict.keys())

self.assertTrue(isinstance(adapter_dict["snowflake_base"], snowflake.Snowflake))
self.assertTrue(isinstance(adapter_dict["pyspark_base"], pyspark.Pyspark))
self.assertTrue(isinstance(adapter_dict["postgres_base"], postgres.Postgres))

# Cleanup
hooks.project.cleanup(hooks.project.run_context)
self._set_up_wkdir()

def test_load_hooks_not_in_project(self):
"""
`load_hook` produces a PrismHooks object when the user is in a project directory
"""
PROJECT_PATH = TEST_PROJECTS / '005_simple_project_no_null_tasks'
os.chdir(PROJECT_PATH)

# Load hooks
hooks = load_hooks()
self.assertTrue(isinstance(hooks, PrismHooks))
self.assertEqual({}, hooks.project.adapters_object_dict)

# Cleanup
hooks.project.cleanup(hooks.project.run_context)
self._set_up_wkdir()

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# Task
@task(
targets=[
target(type=JSON, loc=prism_project.OUTPUT / 'astros.json'),
target(type=JSON, loc=prism_project.OUTPUT / 'todos.json'),
target(type=Txt, loc=prism_project.OUTPUT / 'second_target.txt')
],
)
def extract(tasks, hooks):
url = "http://api.open-notify.org/astros.json"
url = "https://jsonplaceholder.typicode.com/todos"
resp = requests.get(url)
json_dict = json.loads(resp.text)
second_target_str = "second target"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ def load(tasks, hooks):
data, _ = tasks.ref("extract.py")

# Names
names = {}
for ppl in data["people"]:

# Formatted
name = ppl["name"].lower().replace(" ", "_")
names[f"{name}.txt"] = ppl["name"]
todos = {}
for todo in data:
todos[f"todo_{todo['id']}.txt"] = todo['title']

# Return
return names
return todos

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

# Task
@task(
targets=[target(type=JSON, loc=prism_project.OUTPUT / 'astros.json')],
targets=[target(type=JSON, loc=prism_project.OUTPUT / 'todos.json')],
)
def extract(tasks, hooks):
url = "http://api.open-notify.org/astros.json"
url = "https://jsonplaceholder.typicode.com/todos"
resp = requests.get(url)
return json.loads(resp.text)
61 changes: 19 additions & 42 deletions prism/tests/integration/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,42 +1142,30 @@ def test_decorator_tasks_with_targets(self):
self.assertEqual(' | '.join(expected_events), runtask_run_results)

# Check output of 'extract' task
self.assertTrue(Path(wkdir / 'output' / 'astros.json').is_file())
self.assertTrue(Path(wkdir / 'output' / 'todos.json').is_file())
self.assertTrue(Path(wkdir / 'output' / 'second_target.txt').is_file())

# Astros JSON file
with open(Path(wkdir / 'output' / 'astros.json'), 'r') as f:
astros_str = f.read()
astros_dict = json.loads(astros_str)
self.assertEqual(astros_dict["message"], "success")
with open(Path(wkdir / 'output' / 'todos.json'), 'r') as f:
todos_str = f.read()
todos_dict = json.loads(todos_str)
self.assertEqual(len(todos_dict), 200)

# Dummy second target text file
with open(Path(wkdir / 'output' / 'second_target.txt'), 'r') as f:
second_target = f.read()
self.assertEqual(second_target, "second target")

# Check output of 'load' task
names = [
"Sergey Prokopyev",
"Dmitry Petelin",
"Frank Rubio",
"Jing Haiping",
"Gui Haichow",
"Zhu Yangzhu",
"Jasmin Moghbeli",
"Andreas Mogensen",
"Satoshi Furukawa",
"Konstantin Borisov",
]
for n in names:
formatted_name = n.lower().replace(" ", "_")
self.assertTrue(Path(wkdir / 'output' / f'{formatted_name}.txt').is_file())
with open(Path(wkdir / 'output' / f'{formatted_name}.txt'), 'r') as f:
for i in range(1, 201):
self.assertTrue(Path(wkdir / 'output' / f'todo_{i}.txt').is_file())
with open(Path(wkdir / 'output' / f'todo_{i}.txt'), 'r') as f:
contents = f.read()
self.assertEqual(contents, n)
self.assertEqual(contents, todos_dict[i - 1]['title'])

# Remove the .compiled directory, if it exists
self._remove_compiled_dir(wkdir)
self._remove_files_in_output(wkdir)

# Set up the working directory
self._set_up_wkdir()
Expand Down Expand Up @@ -1230,31 +1218,20 @@ def test_decorator_tasks_with_retries(self):
self.assertEqual(' | '.join(expected_events), runtask_run_results)

# Check output of 'extract' task
self.assertTrue(Path(wkdir / 'output' / 'astros.json').is_file())
with open(Path(wkdir / 'output' / 'astros.json'), 'r') as f:
astros_str = f.read()
astros_dict = json.loads(astros_str)
self.assertEqual(astros_dict["message"], "success")
self.assertTrue(Path(wkdir / 'output' / 'todos.json').is_file())
with open(Path(wkdir / 'output' / 'todos.json'), 'r') as f:
todos_str = f.read()
todos_dict = json.loads(todos_str)
self.assertEqual(len(todos_dict), 200)

# Output of 'load' task was not created
names = [
"Andrey Fedyaev",
"Deng Qingming",
"Dmitry Petelin",
"Fei Junlong",
"Frank Rubio",
"Sergey Prokopyev",
"Stephen Bowen",
"Sultan Alneyadi",
"Warren Hoburg",
"Zhang Lu",
]
for n in names:
formatted_name = n.lower().replace(" ", "_")
self.assertFalse(Path(wkdir / 'output' / f'{formatted_name}.txt').is_file())
# Check output of 'load' task
for i in range(1, 201):
self.assertFalse(Path(wkdir / 'output' / f'todo_{i}.txt').is_file())

# Remove the .compiled directory, if it exists
self._remove_compiled_dir(wkdir)
self._remove_files_in_output(wkdir)

# Set up the working directory
self._set_up_wkdir()
Expand Down

0 comments on commit e517a91

Please sign in to comment.