Skip to content

Commit

Permalink
Add recipe runner with tests (#26)
Browse files Browse the repository at this point in the history
Add method for resetting globs for clean test runs.
Should really be cleaned after each test, so this implementation
is not pretty.
  • Loading branch information
paalvibe committed Oct 14, 2019
1 parent 16c5c2e commit 20b05bc
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 46 deletions.
8 changes: 5 additions & 3 deletions birgitta/dataframe/dataiku.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Functions for loading and writing dataiku spark dataframes."""

import dataiku
import dataiku.spark as dkuspark
try: # Enable modules importing this module to load even if it isn't used # noqa 501
import dataiku
import dataiku.spark as dkuspark
except ImportError:
pass

__all__ = ['get', 'write']

Expand Down
18 changes: 12 additions & 6 deletions birgitta/glob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@


class Glob():
TODAY = datetime.date.today()


def reset():
# BIRGITTA_DATASET_STORAGE = "DATAIKU" # Store fixtures in Dataiku
# BIRGITTA_DATASET_STORAGE = "MEM" # Store fixtures in dfs in memory
BIRGITTA_DATASET_STORAGE = "FILE" # Store fixtures in parquet files
BIRGITTA_FILEBASED_DATASETS = None
BIRGITTA_MEMBASED_DATASETS = None
BIRGITTA_TEST_COVERAGE = {}
BIRGITTA_DATASET_OVERRIDES = None
TODAY = datetime.date.today()
Glob.BIRGITTA_DATASET_STORAGE = "FILE" # Store fixtures in parquet files
Glob.BIRGITTA_FILEBASED_DATASETS = None
Glob.BIRGITTA_MEMBASED_DATASETS = None
Glob.BIRGITTA_TEST_COVERAGE = {}
Glob.BIRGITTA_DATASET_OVERRIDES = None


reset() # Init glob


def get(key, default=None):
Expand Down
92 changes: 92 additions & 0 deletions birgitta/recipe/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# import pkgutil
import inspect
import os
import sys
import traceback

__all__ = ['exec_code', 'run']


def exec_code(code, globals_dict):
"""Execute the recipe code and handle error conditions.
Args:
code (str): The code to be executed.
globals_dict (dict): globals_dict as used by exec().
Returns:
Output of python exec() function.
"""
e = False
try:
ret = exec(code, globals_dict)
except SyntaxError as err:
error_class = err.__class__.__name__
# detail = err.args[0]
lineno = err.lineno
e = err
except Exception as err:
error_class = err.__class__.__name__
# detail = err.args[0]
cl, exc, tb = sys.exc_info()
lineno = traceback.extract_tb(tb)[-1][1]
e = err
if e:
lines = code.splitlines()
lenlines = len(lines)
if lineno < lenlines:
print("Recipe execution",
error_class,
"at recipe line:\n",
lines[lineno-1])
else:
print("Recipe execution",
error_class)
raise e
return ret


def run(root_mod, recipe, storage, replacements=[]):
"""Obtain a dataframe. It will adjust to whatever
storage the environment has set. Currently storage is supported in
file, memory or dataiku (HDFS).
Args:
root_mod (module): The root module on which to base the path.
recipe (str): Relative path to the recipe file from the module dir.
storage (str): Storage type e.g. DATAIKU, FILE
replacements (list): List of text replacements to enable recipe
debugging. Example on how to limit data amount:
[
{
"old": "dataframe.get(sql_context, ds_foo.name)",
"new": "dataframe.get(sql_context, ds_foo.name).limit(10)"
}
]
Returns:
Output of python exec() function.
"""
if storage not in ["DATAIKU", "FILE"]:
raise ValueError(f"Storage unknown {storage}")
mod_path = os.path.dirname(inspect.getfile(root_mod))
recipe_path = f"{mod_path}/{recipe}"
with open(recipe_path) as f:
code = prepare_code(f.read(), recipe, storage, replacements)
globals_dict = {
'BIRGITTA_DATASET_STORAGE': storage
}
return exec_code(code, globals_dict)


def prepare_code(code, recipe, storage, replacements):
for replacement in replacements:
code = code.replace(replacement["old"], replacement["new"])
glob_stmts = f"""import glob
from birgitta import glob
glob.set("BIRGITTA_DATASET_STORAGE", "{storage}")"""
completed = f"""
print("=== Recipe {recipe} complete ===")"""
return glob_stmts + code + completed
40 changes: 4 additions & 36 deletions birgitta/recipetest/localtest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""Functionality for running fixture based test cases.
"""
import os
import sys
import traceback
from functools import partial

import pytest
from birgitta import glob
from birgitta import timing
from birgitta.recipe import runner
from birgitta.recipetest import localtest
from birgitta.recipetest.coverage import report
from birgitta.recipetest.localtest import fixturing, assertion, script_prepend # noqa 401
Expand Down Expand Up @@ -154,44 +153,13 @@ def process_recipe(path,
tmpdir,
spark_session)
full_code = script_prepend.code() + code_w_reporting
execute_recipe(full_code, globals_dict)
timing.time("execute_recipe before exec")
runner.exec_code(full_code, globals_dict)
timing.time("execute_recipe after exec")


def dump_test_recipe(test_case, tmpdir, code):
dump_path = tmpdir + "/" + test_case + ".py"
print("\nTest recipe python file:\n", repr(dump_path), "\n")
with open(dump_path, "w") as f:
f.write(code)


def execute_recipe(code, globals_dict):
"""Execute the recipe and handle error conditions.
"""
e = False
try:
timing.time("execute_recipe before exec")
exec(code, globals_dict)
timing.time("execute_recipe after exec")
except SyntaxError as err:
error_class = err.__class__.__name__
# detail = err.args[0]
lineno = err.lineno
e = err
except Exception as err:
error_class = err.__class__.__name__
# detail = err.args[0]
cl, exc, tb = sys.exc_info()
lineno = traceback.extract_tb(tb)[-1][1]
e = err
if e:
lines = code.splitlines()
lenlines = len(lines)
if lineno < lenlines:
print("Recipe execution",
error_class,
"at recipe line:\n",
lines[lineno-1])
else:
print("Recipe execution",
error_class)
raise e
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from setuptools import setup


version = '0.1.6'
version = '0.1.7'
here = path.abspath(path.dirname(__file__))

long_description = """Birgitta is a Python ETL test and schema framework,
Expand Down
Empty file added tests/recipe/__init__.py
Empty file.
26 changes: 26 additions & 0 deletions tests/recipe/test_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest
# from birgitta import spark
from birgitta import glob
from birgitta.recipe import runner
from examples.organizations.newsltd.projects import tribune


def test_run():
glob.reset()
with pytest.raises(TypeError):
runner.run(tribune, "recipes/compute_filtered_contracts.py", "FILE")


def test_syntax_error():
glob.reset()
replacements = [
{
"old": "sql_context = ",
"new": "sql_contesdaf@_0=~> = "
}
]
with pytest.raises(SyntaxError):
runner.run(tribune,
"recipes/compute_filtered_contracts.py",
"FILE",
replacements)

0 comments on commit 20b05bc

Please sign in to comment.