Skip to content

Commit

Permalink
fix: return consistency results, add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Dominik Gmiterko authored and dee-gmiterko committed Dec 10, 2020
1 parent 679c46a commit 2067594
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 13 deletions.
4 changes: 2 additions & 2 deletions contessa/base_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ def apply(self, **kwargs):
raise NotImplementedError

def __str__(self):
tf = f"- {self.time_filter}" or ""
return f"Rule {self.name} of type {self.type} {tf}"
tf = f" - {self.time_filter}" or ""
return f"Rule {self.name} of type {self.type}{tf}"
25 changes: 17 additions & 8 deletions contessa/consistency_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def do_consistency_check(
)
right_result = self.run_query(self.right_conn, right_sql, context)

valid, passed, failed = self.compare_results(left_result, right_result)
valid, passed, failed = self.compare_results(left_result, right_result, method)

return {
"check": {
Expand All @@ -161,13 +161,22 @@ def do_consistency_check(
"context": context,
}

def compare_results(self, left_result, right_result):
left_set = set(left_result)
right_set = set(right_result)
common = left_set.intersection(right_set)
passed = len(common)
failed = (len(left_set) - len(common)) + (len(right_set) - len(common))
return failed == 0, passed, failed
def compare_results(self, left_result, right_result, method):
if method == self.COUNT:
left_count = left_result[0][0]
right_count = right_result[0][0]
passed = min(left_count, right_count)
failed = (left_count - passed) - (right_count - passed)
return failed == 0, passed, failed
elif method == self.DIFF:
left_set = set(left_result)
right_set = set(right_result)
common = left_set.intersection(right_set)
passed = len(common)
failed = (len(left_set) - len(common)) + (len(right_set) - len(common))
return failed == 0, passed, failed
else:
raise NotImplementedError(f"Method {method} not implemented")

def construct_default_query(self, table_name, column, time_filter, context):
time_filter = compose_where_time_filter(time_filter, context["task_ts"])
Expand Down
28 changes: 26 additions & 2 deletions contessa/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,18 @@ class MyTableQualityCheck(QualityCheck):


class CheckResult:
rule_name: str
rule_type: str
rule_description: str
total_records: int
failed: int
passed: int
time_filter: str
failed_percentage: float
passed_percentage: float
status: str
context: Dict

def init_row(
self, rule: Rule, results: pd.Series, conn: Connector, context: Dict = None
):
Expand All @@ -319,15 +331,22 @@ def init_row(
self.failed_percentage = self._perc(self.failed, self.total_records)
self.passed_percentage = self._perc(self.passed, self.total_records)
self.status = "invalid" if self.failed > 0 else "valid"
self.context = context

def init_row_consistency(
self, check: Dict, status: str, time_filter=None, context: Dict = None,
self,
check: Dict,
status: str,
left_table_name,
right_table_name,
time_filter=None,
context: Dict = None,
):
self.rule_type = check["type"]
self.rule_name = check["name"]
self.rule_description = check["description"]

self.total_records = check["passed"] + check["failed"]
self.total_records = check["passed"] + abs(check["failed"])
self.passed = check["passed"]
self.failed = check["failed"]

Expand All @@ -339,6 +358,11 @@ def init_row_consistency(
self.passed_percentage = self._perc(self.passed, self.total_records)
self.status = status

context.update(
{"left_table_name": left_table_name, "right_table_name": right_table_name}
)
self.context = context

def _perc(self, a, b):
res = 0
try:
Expand Down
2 changes: 1 addition & 1 deletion test/integration/test_consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from contessa import ConsistencyChecker


class TestDataQualityOperator(unittest.TestCase):
class TestConsistencyChecker(unittest.TestCase):
def setUp(self):
"""
Init a temporary table with some data.
Expand Down
95 changes: 95 additions & 0 deletions test/integration/test_return_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from contessa.db import Connector
from contessa.models import DQBase
from test.conftest import FakedDatetime
from test.integration.conftest import TEST_DB_URI
import unittest
from unittest import mock

from contessa import ConsistencyChecker


class TestReturnResults(unittest.TestCase):
def setUp(self):
"""
Init a temporary table with some data.
"""
self.left_table_name = "raw_booking"
self.right_table_name = "booking"
self.ts_nodash = (
FakedDatetime.now().isoformat().replace("-", "").replace(":", "")
)
self.now = FakedDatetime.now()

sql = [
"DROP SCHEMA if exists tmp CASCADE;",
"CREATE SCHEMA IF NOT EXISTS tmp;",
"CREATE SCHEMA IF NOT EXISTS hello;",
f"""
CREATE TABLE IF NOT EXISTS tmp.{self.left_table_name}(
id SERIAL PRIMARY KEY,
src text,
dst text,
price int,
turnover_after_refunds double precision,
initial_price double precision,
created_at timestamptz
)
""",
f"""
CREATE TABLE IF NOT EXISTS hello.{self.right_table_name}(
id SERIAL PRIMARY KEY,
src text,
dst text,
price int,
turnover_after_refunds double precision,
initial_price double precision,
created_at timestamptz
)
""",
f"""
INSERT INTO tmp.{self.left_table_name}
(src, dst, price, turnover_after_refunds, initial_price, created_at)
VALUES
('BTS', NULL, 1, 100, 11, '2018-09-12T11:50:00'),
(NULL, 'PEK', 33, 1.1, 13, '2018-01-12T15:50:00'),
('VIE', 'JFK', 4, 5.5, 23.4, '2018-09-11T11:50:00'),
('VIE', 'VIE', 4, 0.0, 0.0, '2018-09-11T11:50:00')
""",
f"""
INSERT INTO hello.{self.right_table_name}
(src, dst, price, turnover_after_refunds, initial_price, created_at)
VALUES
('BTS', NULL, 1, 100, 11, '2018-09-12T11:50:00'),
(NULL, 'PEK', 33, 1.1, 13, '2018-01-12T15:50:00'),
('VIE', 'JFK', 4, 5.5, 23.4, '2018-09-11T11:50:00')
""",
]
self.conn = Connector(TEST_DB_URI)
for s in sql:
self.conn.execute(s)

self.consistency_checker = ConsistencyChecker(TEST_DB_URI)

def tearDown(self):
"""
Drop all created tables.
"""
self.conn.execute(f"DROP schema tmp CASCADE;")
self.conn.execute(f"DROP schema hello CASCADE;")
DQBase.metadata.clear()

@mock.patch("contessa.executor.datetime", FakedDatetime)
def test_execute_consistency(self):
result = self.consistency_checker.run(
self.consistency_checker.COUNT,
left_check_table={"schema_name": "tmp", "table_name": self.left_table_name},
right_check_table={
"schema_name": "hello",
"table_name": self.right_table_name,
},
context={"task_ts": self.now},
)

self.assertEqual(result.status, "invalid")
self.assertEqual(result.context["left_table_name"], "tmp.raw_booking")
self.assertEqual(result.context["right_table_name"], "hello.booking")

0 comments on commit 2067594

Please sign in to comment.