Skip to content

Commit

Permalink
_key and _type (#106)
Browse files Browse the repository at this point in the history
* Use _key or index for schema errors

* Store _key in dataframe index

* Use index instead of _key in rules

* Drop _type only from cloud items

* Skip _type, _key from schema validation, closes #104

* Refactor validator to func
  • Loading branch information
manycoding committed Jun 12, 2019
1 parent bc42578 commit 32927ac
Show file tree
Hide file tree
Showing 22 changed files with 354 additions and 397 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Note that the top-most release is changes in the unreleased master branch on Git
### Changed
- `Arche.report_all()` does not shorten report by default, added `short` parameter.
- `expand=True` which enables nested data flattening is more than 100x faster and consumes ~2x less memory than before, #94
- Data is consistent with Dash and Spidermon: `_type, _key` fields are dropped from dataframe, raw data, basic schema, #104, #106
- `df.index` now stores `_key` instead
- `basic_json_schema()` works with `deleted` jobs
### Fixed
- `Arche.glance()`, #88
Expand Down
14 changes: 11 additions & 3 deletions src/arche/arche.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ def validate_with_json_schema(self) -> None:
"""Run JSON schema check and output results. It will try to find all errors, but
there are no guarantees. Slower than `check_with_json_schema()`
"""
res = schema_rules.validate(self.schema, self.source_items.raw)
res = schema_rules.validate(
self.schema, self.source_items.raw, self.source_items.df.index
)
self.save_result(res)
res.show()

Expand All @@ -173,14 +175,20 @@ def glance(self) -> None:
only the first error per item. Usable for big jobs as it's about 100x faster than
`validate_with_json_schema()`.
"""
res = schema_rules.validate(self.schema, self.source_items.raw, fast=True)
res = schema_rules.validate(
self.schema, self.source_items.raw, self.source_items.df.index, fast=True
)
self.save_result(res)
res.show()

def run_schema_rules(self) -> None:
if not self.schema:
return
self.save_result(schema_rules.validate(self.schema, self.source_items.raw))
self.save_result(
schema_rules.validate(
self.schema, self.source_items.raw, self.source_items.df.index
)
)

tagged_fields = sr.Tags().get(self.schema)
target_columns = (
Expand Down
4 changes: 3 additions & 1 deletion src/arche/data_quality_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def create_figures(self, items):

validation_errors = self.report.results.get(
"JSON Schema Validation",
schema_rules.validate(self.schema, raw_items=items.raw, fast=False),
schema_rules.validate(
self.schema, raw_items=items.raw, keys=items.df.index, fast=False
),
).get_errors_count()

garbage_symbols_result = self.report.results.get(
Expand Down
14 changes: 6 additions & 8 deletions src/arche/readers/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ def __len__(self) -> int:
def flat_df(self) -> pd.DataFrame:
if self._flat_df is None:
if self.expand:
self._flat_df = pd.DataFrame(flatten(i) for i in self.raw)
self._flat_df["_key"] = self.df.get(
"_key", [str(i) for i in range(len(self))]
)
flat_df = pd.DataFrame(flatten(i) for i in self.raw)
flat_df.index = self.df.index
self._flat_df = flat_df.drop(columns=["_key", "_type"], errors="ignore")
else:
self._flat_df = self.df
return self._flat_df
Expand Down Expand Up @@ -63,9 +62,6 @@ def origin_column_name(self, new: str) -> str:

@classmethod
def from_df(cls, df: pd.DataFrame, expand: bool = True):
if "_key" not in df.columns:
df["_key"] = df.index
df["_key"] = df["_key"].apply(str)
return cls(raw=np.array(df.to_dict("records")), df=df, expand=expand)

@classmethod
Expand All @@ -88,7 +84,9 @@ def __init__(
self.expand = expand
raw = self.fetch_data()
df = pd.DataFrame(list(raw))
df["_key"] = self.format_keys(df["_key"])
df.index = self.format_keys(df["_key"])
df.index.name = None
df = df.drop(columns=["_key", "_type"], errors="ignore")
super().__init__(raw=raw, df=df, expand=expand)

@property
Expand Down
2 changes: 1 addition & 1 deletion src/arche/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@ def url(x: str) -> str:
if keys.dtype == np.dtype("object") and "/" in keys.iloc[0]:
sample = sample.apply(url)

return ", ".join(sample)
return ", ".join(sample.apply(str))
12 changes: 6 additions & 6 deletions src/arche/rules/duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def check_items(df: pd.DataFrame, tagged_fields: TaggedFields) -> Result:
errors = {}
name_field = name_fields[0]
url_field = url_fields[0]
df = df[[name_field, url_field, "_key"]]
df = df[[name_field, url_field]]
duplicates = df[df[[name_field, url_field]].duplicated(keep=False)]
if duplicates.empty:
return result
Expand All @@ -30,7 +30,7 @@ def check_items(df: pd.DataFrame, tagged_fields: TaggedFields) -> Result:
msg = (
f"same '{d[name_field].iloc[0]}' name and '{d[url_field].iloc[0]}' url"
)
errors[msg] = list(d["_key"])
errors[msg] = list(d.index)
result.add_error(
f"{len(duplicates)} duplicate(s) with same name and url", errors=errors
)
Expand All @@ -53,10 +53,10 @@ def check_uniqueness(df: pd.DataFrame, tagged_fields: TaggedFields) -> Result:
err_keys = set()
for field in unique_fields:
result.items_count = df[field].count()
duplicates = df[df[field].duplicated(keep=False)][[field, "_key"]]
duplicates = df[df[field].duplicated(keep=False)][[field]]
errors = {}
for _, d in duplicates.groupby([field]):
keys = list(d["_key"])
keys = list(d.index)
msg = f"same '{d[field].iloc[0]}' {field}"
errors[msg] = keys
err_keys = err_keys.union(keys)
Expand All @@ -79,7 +79,7 @@ def find_by(df: pd.DataFrame, columns: List[str]) -> Result:
result = Result("Items Uniqueness By Columns")
result.items_count = len(df)
df = df.dropna(subset=columns, how="all")
duplicates = df[df[columns].duplicated(keep=False)][columns + ["_key"]]
duplicates = df[df[columns].duplicated(keep=False)][columns]
if duplicates.empty:
return result

Expand All @@ -89,7 +89,7 @@ def find_by(df: pd.DataFrame, columns: List[str]) -> Result:
msg = "same"
for c in columns:
msg = f"{msg} '{d[c].iloc[0]}' {c}"
errors[msg] = list(d["_key"])
errors[msg] = list(d.index)

result.add_error(
f"{len(duplicates)} duplicate(s) with same {', '.join(columns)}", errors=errors
Expand Down
12 changes: 7 additions & 5 deletions src/arche/rules/json_schema.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from arche.readers.items import RawItems
from arche.readers.schema import Schema, Tag, TaggedFields
from arche.rules.result import Result
from arche.tools.json_schema_validator import JsonSchemaValidator
from arche.tools.schema import fast_validate, full_validate
import numpy as np
import pandas as pd


def validate(schema: Schema, raw_items: RawItems, fast: bool = False) -> Result:
def validate(
schema: Schema, raw_items: RawItems, keys: pd.Index, fast: bool = False
) -> Result:
"""Run JSON schema validation against data.
Args:
Expand All @@ -14,11 +17,10 @@ def validate(schema: Schema, raw_items: RawItems, fast: bool = False) -> Result:
Returns:
Schema errors if any
"""
validator = JsonSchemaValidator(schema)
validator.run(raw_items, fast)
validate_func = fast_validate if fast else full_validate
errors = validate_func(schema, raw_items, keys)
result = Result("JSON Schema Validation")

errors = validator.errors
schema_result_message = (
f"{len(raw_items)} items were checked, {len(errors)} error(s)"
)
Expand Down
2 changes: 1 addition & 1 deletion src/arche/rules/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def garbage_symbols(items: Items) -> Result:
matches = items.flat_df[column].str.extractall(garbage, flags=re.IGNORECASE)
matches = matches[["spaces", "html_entities", "css", "html_tags"]]
if not matches.empty:
error_keys = items.flat_df.iloc[matches.unstack().index.values]["_key"]
error_keys = items.flat_df.loc[matches.unstack().index.values].index
original_column = items.origin_column_name(column)
bad_texts = matches.stack().value_counts().index.sort_values().tolist()
error = (
Expand Down
28 changes: 12 additions & 16 deletions src/arche/rules/price.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):

df_prices_less = pd.DataFrame(
prices[prices[price_was_field] < prices[price_field]],
columns=["_key", price_was_field, price_field],
columns=[price_was_field, price_field],
)

price_less_percent = "{:.2%}".format(len(df_prices_less) / items_number)
Expand All @@ -37,12 +37,12 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):
result.add_error(
f"{price_less_percent} ({len(df_prices_less)}) of "
f"items with {price_was_field} < {price_field}",
detailed=f"{error}:\n{list(df_prices_less['_key'])}",
detailed=f"{error}:\n{list(df_prices_less.index)}",
)

df_prices_equals = pd.DataFrame(
prices[prices[price_was_field] == prices[price_field]],
columns=["_key", price_was_field, price_field],
columns=[price_was_field, price_field],
)
price_equal_percent = "{:.2%}".format(len(df_prices_equals) / items_number)

Expand All @@ -54,7 +54,7 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):
),
detailed=(
f"Prices equal for {len(df_prices_equals)} items:\n"
f"{list(df_prices_equals['_key'])}"
f"{list(df_prices_equals.index)}"
),
)

Expand Down Expand Up @@ -104,7 +104,7 @@ def compare_prices_for_same_urls(

missing_detailed_messages = []
for url in missing_urls:
key = target_df.loc[target_df[url_field] == url]["_key"].iloc[0]
key = target_df.loc[target_df[url_field] == url].index[0]
missing_detailed_messages.append(f"Missing {url} from {key}")

result.add_info(
Expand Down Expand Up @@ -135,8 +135,8 @@ def compare_prices_for_same_urls(
and ratio_diff(source_price, target_price) > 0.1
):
diff_prices_count += 1
source_key = source_df[source_df[url_field] == url]["_key"].iloc[0]
target_key = target_df[target_df[url_field] == url]["_key"].iloc[0]
source_key = source_df[source_df[url_field] == url].index[0]
target_key = target_df[target_df[url_field] == url].index[0]
msg = (
f"different prices for url: {url}\nsource price is {source_price} "
f"for {source_key}\ntarget price is {target_price} for {target_key}"
Expand Down Expand Up @@ -197,8 +197,8 @@ def compare_names_for_same_urls(
and target_name.strip() != "nan"
):
diff_names_count += 1
source_key = source_df[source_df[url_field] == url]["_key"].iloc[0]
target_key = target_df[target_df[url_field] == url]["_key"].iloc[0]
source_key = source_df[source_df[url_field] == url].index[0]
target_key = target_df[target_df[url_field] == url].index[0]
msg = (
f"different names for url: {url}\nsource name is {source_name} "
f"for {source_key}\ntarget name is {target_name} for {target_key}"
Expand Down Expand Up @@ -245,7 +245,7 @@ def compare_prices_for_same_names(

detailed_messages = []
for name in missing_names:
target_key = target_df.loc[target_df[name_field] == name]["_key"].iloc[0]
target_key = target_df.loc[target_df[name_field] == name].index[0]
msg = f"Missing {name} from {target_key}"
if product_url_field:
url = target_df.loc[target_df[name_field] == name][product_url_field].iloc[
Expand Down Expand Up @@ -277,12 +277,8 @@ def compare_prices_for_same_names(
if is_number(source_price) and is_number(target_price):
if ratio_diff(source_price, target_price) > 0.1:
count += 1
source_key = source_df[source_df[name_field] == name]["_key"].iloc[
0
]
target_key = target_df[target_df[name_field] == name]["_key"].iloc[
0
]
source_key = source_df[source_df[name_field] == name].index[0]
target_key = target_df[target_df[name_field] == name].index[0]
msg = (
f"different price for {name}\nsource price is {source_price} "
f"for {source_key}\ntarget price is {target_price} for {target_key}"
Expand Down
83 changes: 0 additions & 83 deletions src/arche/tools/json_schema_validator.py

This file was deleted.

Loading

0 comments on commit 32927ac

Please sign in to comment.