Skip to content

Commit

Permalink
Add run only variation of run method
Browse files Browse the repository at this point in the history
  • Loading branch information
Dominik Gmiterko authored and dee-gmiterko committed Jun 24, 2020
1 parent 56c347f commit 5d1509c
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 18 deletions.
41 changes: 30 additions & 11 deletions contessa/consistency_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Table,
ResultTable,
ConsistencyCheck,
CheckResult,
)
from contessa.settings import TIME_FILTER_DEFAULT
from contessa.utils import compose_where_time_filter
Expand Down Expand Up @@ -41,7 +42,7 @@ def run(
method: str,
left_check_table: Dict,
right_check_table: Dict,
result_table: Dict,
result_table: Optional[Dict] = None,
columns: Optional[List[str]] = None,
time_filter: str = TIME_FILTER_DEFAULT,
left_custom_sql: str = None,
Expand All @@ -56,7 +57,6 @@ def run(

left_check_table = Table(**left_check_table)
right_check_table = Table(**right_check_table)
result_table = ResultTable(**result_table, model_cls=self.model_cls)
context = self.get_context(left_check_table, right_check_table, context)

result = self.do_consistency_check(
Expand All @@ -70,9 +70,16 @@ def run(
context,
)

quality_check_class = create_default_check_class(result_table)
self.right_conn.ensure_table(quality_check_class.__table__)
self.upsert(quality_check_class, result)
if result_table:
result_table = ResultTable(**result_table, model_cls=self.model_cls)
quality_check_class = create_default_check_class(result_table)
self.right_conn.ensure_table(quality_check_class.__table__)
self.upsert(quality_check_class, result)
return result

obj = CheckResult()
obj.init_row_consistency(**result)
return obj

@staticmethod
def get_context(
Expand All @@ -88,7 +95,8 @@ def get_context(
"right_table_fullname": right_check_table.fullname,
"task_ts": datetime.now(),
}
ctx_defaults.update(context)
if context:
ctx_defaults.update(context)
return ctx_defaults

def do_consistency_check(
Expand Down Expand Up @@ -136,19 +144,30 @@ def do_consistency_check(
)
right_result = self.run_query(self.right_conn, right_sql, context)

valid, passed, failed = self.compare_results(left_result, right_result)

return {
"check": {"type": method, "description": "", "name": "consistency",},
"status": "valid"
if self.compare_results(left_result, right_result)
else "invalid",
"check": {
"type": method,
"description": "",
"name": "consistency",
"passed": passed,
"failed": failed,
},
"status": "valid" if valid else "invalid",
"left_table_name": left_check_table.fullname,
"right_table_name": right_check_table.fullname,
"time_filter": time_filter,
"context": context,
}

def compare_results(self, left_result, right_result):
return set(left_result) == set(right_result)
left_set = set(left_result)
right_set = set(right_result)
common = left_set.intersection(right_set)
passed = len(common)
failed = (len(left_set) - len(common)) + (len(right_set) - len(common))
return failed == 0, passed, failed

def construct_default_query(self, table_name, column, time_filter, context):
time_filter = compose_where_time_filter(time_filter, context["task_ts"])
Expand Down
52 changes: 52 additions & 0 deletions contessa/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def init_row(
right_table_name: str,
time_filter=None,
context: Dict = None,
**_,
):
"""
Set result to consistency check object.
Expand Down Expand Up @@ -294,3 +295,54 @@ class MyTableQualityCheck(QualityCheck):
cls.metadata.schema = result_table.schema_name
cls.__table__.schema = result_table.schema_name
return cls


class CheckResult:
def init_row(
self, rule: Rule, results: pd.Series, conn: Connector, context: Dict = None
):
if results.isnull().any():
raise ValueError("In results of rule.apply can't be any Null values.")

self.rule_name = rule.name
self.rule_type = rule.type
self.rule_description = rule.description

self.total_records = results.shape[0]
self.failed = results[results == False].shape[0]
self.passed = results[results == True].shape[0]

if isinstance(rule.time_filter, str):
self.time_filter = rule.time_filter
else:
self.time_filter = json.dumps(rule.time_filter)
self.failed_percentage = self._perc(self.failed, self.total_records)
self.passed_percentage = self._perc(self.passed, self.total_records)
self.status = "invalid" if self.failed > 0 else "valid"

def init_row_consistency(
self, check: Dict, status: str, time_filter=None, context: Dict = None,
):
self.rule_type = check["type"]
self.rule_name = check["name"]
self.rule_description = check["description"]

self.total_records = check["passed"] + check["failed"]
self.passed = check["passed"]
self.failed = check["failed"]

if isinstance(time_filter, str):
self.time_filter = time_filter
else:
self.time_filter = json.dumps(time_filter)
self.failed_percentage = self._perc(self.failed, self.total_records)
self.passed_percentage = self._perc(self.passed, self.total_records)
self.status = status

def _perc(self, a, b):
res = 0
try:
res = (a / b) * 100
except ZeroDivisionError:
pass
return res
29 changes: 22 additions & 7 deletions contessa/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
from contessa.base_rules import Rule
from contessa.db import Connector
from contessa.executor import get_executor, refresh_executors
from contessa.models import create_default_check_class, Table, ResultTable, QualityCheck
from contessa.models import (
create_default_check_class,
Table,
ResultTable,
QualityCheck,
CheckResult,
)
from contessa.normalizer import RuleNormalizer
from contessa.rules import get_rule_cls

Expand All @@ -25,22 +31,30 @@ def run(
self,
raw_rules: List[Dict[str, str]],
check_table: Dict,
result_table: Dict, # todo - docs for quality name, maybe defaults..
result_table: Optional[
Dict
] = None, # todo - docs for quality name, maybe defaults..
context: Optional[Dict] = None,
):
check_table = Table(**check_table)
result_table = ResultTable(**result_table, model_cls=self.model_cls)
context = self.get_context(check_table, context)

normalized_rules = self.normalize_rules(raw_rules)
refresh_executors(check_table, self.conn, context)
quality_check_class = self.get_quality_check_class(result_table)
self.conn.ensure_table(quality_check_class.__table__)

if result_table:
result_table = ResultTable(**result_table, model_cls=self.model_cls)
quality_check_class = self.get_quality_check_class(result_table)
self.conn.ensure_table(quality_check_class.__table__)
else:
quality_check_class = CheckResult

rules = self.build_rules(normalized_rules)
objs = self.do_quality_checks(quality_check_class, rules, context)

self.conn.upsert(objs)
if result_table:
self.conn.upsert(objs)
return objs

@staticmethod
def get_context(check_table: Table, context: Optional[Dict] = None) -> Dict:
Expand All @@ -51,7 +65,8 @@ def get_context(check_table: Table, context: Optional[Dict] = None) -> Dict:
"table_fullname": check_table.fullname,
"task_ts": datetime.now(), # todo - is now() ok ?
}
ctx_defaults.update(context)
if context:
ctx_defaults.update(context)
return ctx_defaults

def normalize_rules(self, raw_rules):
Expand Down

0 comments on commit 5d1509c

Please sign in to comment.