Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 33: load_hooks function #39

Merged
merged 4 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions prism/infra/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# Standard library imports
import pandas as pd
from typing import Any, Optional
from pathlib import Path

# Prism-specific imports
from prism.cli.base import get_project_dir
from prism.infra import project as prism_project
import prism.constants
import prism.exceptions
Expand Down Expand Up @@ -131,3 +133,26 @@ def dbt_ref(self,

df = dbt_project.handle_ref(target_1, target_2, target_version)
return df


# Function to load hooks in a script or environment
def load_hooks(project_dir: Optional[Path] = None):
"""
Load the PrismHooks associated with the current project
"""
if project_dir is None:
project_dir = get_project_dir()
project = prism_project.PrismProject(
project_dir=project_dir,
user_context={},
which="run"
)
project.setup()

# Hooks object
hooks = PrismHooks(project)

# Print a warning if the hooks are empty
if hooks.project.adapters_object_dict == {}:
print("WARNING: Your hooks are empty! Create a profile YAML to populate your hooks") # noqa
return hooks
71 changes: 71 additions & 0 deletions prism/tests/integration/test_load_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
Load hooks unit test

Table of Contents:
- Imports
- Test case class definition
"""

###########
# Imports #
###########

# Standard library imports
import os
from pathlib import Path

# Prism imports
from prism.infra.hooks import PrismHooks, load_hooks
from prism.profiles import snowflake, pyspark, postgres
from prism.tests.integration import integration_test_class

# Paths
TEST_PROJECTS = Path(__file__).parent / 'test_projects'


##############################
# Test case class definition #
##############################

class TestLoadHooks(integration_test_class.IntegrationTestCase):

def test_load_hooks_in_project(self):
"""
`load_hook` produces a PrismHooks object when the user is in a project directory
"""
PROJECT_PATH = TEST_PROJECTS / '013_hooks_sql_spark'
os.chdir(PROJECT_PATH)

# Load hooks
hooks = load_hooks()
adapter_dict = hooks.project.adapters_object_dict

# Test
self.assertTrue(isinstance(hooks, PrismHooks))
self.assertTrue("snowflake_base" in adapter_dict.keys())
self.assertTrue("pyspark_base" in adapter_dict.keys())
self.assertTrue("postgres_base" in adapter_dict.keys())

self.assertTrue(isinstance(adapter_dict["snowflake_base"], snowflake.Snowflake))
self.assertTrue(isinstance(adapter_dict["pyspark_base"], pyspark.Pyspark))
self.assertTrue(isinstance(adapter_dict["postgres_base"], postgres.Postgres))

# Cleanup
hooks.project.cleanup(hooks.project.run_context)
self._set_up_wkdir()

def test_load_hooks_not_in_project(self):
"""
`load_hook` produces a PrismHooks object when the user is in a project directory
"""
PROJECT_PATH = TEST_PROJECTS / '005_simple_project_no_null_tasks'
os.chdir(PROJECT_PATH)

# Load hooks
hooks = load_hooks()
self.assertTrue(isinstance(hooks, PrismHooks))
self.assertEqual({}, hooks.project.adapters_object_dict)

# Cleanup
hooks.project.cleanup(hooks.project.run_context)
self._set_up_wkdir()

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# Task
@task(
targets=[
target(type=JSON, loc=prism_project.OUTPUT / 'astros.json'),
target(type=JSON, loc=prism_project.OUTPUT / 'todos.json'),
target(type=Txt, loc=prism_project.OUTPUT / 'second_target.txt')
],
)
def extract(tasks, hooks):
url = "http://api.open-notify.org/astros.json"
url = "https://jsonplaceholder.typicode.com/todos"
resp = requests.get(url)
json_dict = json.loads(resp.text)
second_target_str = "second target"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ def load(tasks, hooks):
data, _ = tasks.ref("extract.py")

# Names
names = {}
for ppl in data["people"]:

# Formatted
name = ppl["name"].lower().replace(" ", "_")
names[f"{name}.txt"] = ppl["name"]
todos = {}
for todo in data:
todos[f"todo_{todo['id']}.txt"] = todo['title']

# Return
return names
return todos

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

# Task
@task(
targets=[target(type=JSON, loc=prism_project.OUTPUT / 'astros.json')],
targets=[target(type=JSON, loc=prism_project.OUTPUT / 'todos.json')],
)
def extract(tasks, hooks):
url = "http://api.open-notify.org/astros.json"
url = "https://jsonplaceholder.typicode.com/todos"
resp = requests.get(url)
return json.loads(resp.text)
61 changes: 19 additions & 42 deletions prism/tests/integration/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,42 +1142,30 @@ def test_decorator_tasks_with_targets(self):
self.assertEqual(' | '.join(expected_events), runtask_run_results)

# Check output of 'extract' task
self.assertTrue(Path(wkdir / 'output' / 'astros.json').is_file())
self.assertTrue(Path(wkdir / 'output' / 'todos.json').is_file())
self.assertTrue(Path(wkdir / 'output' / 'second_target.txt').is_file())

# Astros JSON file
with open(Path(wkdir / 'output' / 'astros.json'), 'r') as f:
astros_str = f.read()
astros_dict = json.loads(astros_str)
self.assertEqual(astros_dict["message"], "success")
with open(Path(wkdir / 'output' / 'todos.json'), 'r') as f:
todos_str = f.read()
todos_dict = json.loads(todos_str)
self.assertEqual(len(todos_dict), 200)

# Dummy second target text file
with open(Path(wkdir / 'output' / 'second_target.txt'), 'r') as f:
second_target = f.read()
self.assertEqual(second_target, "second target")

# Check output of 'load' task
names = [
"Sergey Prokopyev",
"Dmitry Petelin",
"Frank Rubio",
"Jing Haiping",
"Gui Haichow",
"Zhu Yangzhu",
"Jasmin Moghbeli",
"Andreas Mogensen",
"Satoshi Furukawa",
"Konstantin Borisov",
]
for n in names:
formatted_name = n.lower().replace(" ", "_")
self.assertTrue(Path(wkdir / 'output' / f'{formatted_name}.txt').is_file())
with open(Path(wkdir / 'output' / f'{formatted_name}.txt'), 'r') as f:
for i in range(1, 201):
self.assertTrue(Path(wkdir / 'output' / f'todo_{i}.txt').is_file())
with open(Path(wkdir / 'output' / f'todo_{i}.txt'), 'r') as f:
contents = f.read()
self.assertEqual(contents, n)
self.assertEqual(contents, todos_dict[i - 1]['title'])

# Remove the .compiled directory, if it exists
self._remove_compiled_dir(wkdir)
self._remove_files_in_output(wkdir)

# Set up the working directory
self._set_up_wkdir()
Expand Down Expand Up @@ -1230,31 +1218,20 @@ def test_decorator_tasks_with_retries(self):
self.assertEqual(' | '.join(expected_events), runtask_run_results)

# Check output of 'extract' task
self.assertTrue(Path(wkdir / 'output' / 'astros.json').is_file())
with open(Path(wkdir / 'output' / 'astros.json'), 'r') as f:
astros_str = f.read()
astros_dict = json.loads(astros_str)
self.assertEqual(astros_dict["message"], "success")
self.assertTrue(Path(wkdir / 'output' / 'todos.json').is_file())
with open(Path(wkdir / 'output' / 'todos.json'), 'r') as f:
todos_str = f.read()
todos_dict = json.loads(todos_str)
self.assertEqual(len(todos_dict), 200)

# Output of 'load' task was not created
names = [
"Andrey Fedyaev",
"Deng Qingming",
"Dmitry Petelin",
"Fei Junlong",
"Frank Rubio",
"Sergey Prokopyev",
"Stephen Bowen",
"Sultan Alneyadi",
"Warren Hoburg",
"Zhang Lu",
]
for n in names:
formatted_name = n.lower().replace(" ", "_")
self.assertFalse(Path(wkdir / 'output' / f'{formatted_name}.txt').is_file())
# Check output of 'load' task
for i in range(1, 201):
self.assertFalse(Path(wkdir / 'output' / f'todo_{i}.txt').is_file())

# Remove the .compiled directory, if it exists
self._remove_compiled_dir(wkdir)
self._remove_files_in_output(wkdir)

# Set up the working directory
self._set_up_wkdir()
Expand Down
Loading