Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

_key and _type #106

Merged
merged 11 commits into from
Jun 12, 2019
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Note that the top-most release is changes in the unreleased master branch on Git
### Changed
- `Arche.report_all()` does not shorten report by default, added `short` parameter.
- `expand=True` which enables nested data flattening is more than 100x faster and consumes ~2x less memory than before, #94
- `_type` field is dropped from dataframe by default
### Fixed
- `Arche.glance()`, #88
- Item links in Schema validation errors, #89
Expand Down
14 changes: 11 additions & 3 deletions src/arche/arche.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ def validate_with_json_schema(self) -> None:
"""Run JSON schema check and output results. It will try to find all errors, but
there are no guarantees. Slower than `check_with_json_schema()`
"""
res = schema_rules.validate(self.schema, self.source_items.raw)
res = schema_rules.validate(
self.schema, self.source_items.raw, self.source_items.df.index
)
self.save_result(res)
res.show()

Expand All @@ -173,14 +175,20 @@ def glance(self) -> None:
only the first error per item. Usable for big jobs as it's about 100x faster than
`validate_with_json_schema()`.
"""
res = schema_rules.validate(self.schema, self.source_items.raw, fast=True)
res = schema_rules.validate(
self.schema, self.source_items.raw, self.source_items.df.index, fast=True
)
self.save_result(res)
res.show()

def run_schema_rules(self) -> None:
if not self.schema:
return
self.save_result(schema_rules.validate(self.schema, self.source_items.raw))
self.save_result(
schema_rules.validate(
self.schema, self.source_items.raw, self.source_items.df.index
)
)

tagged_fields = sr.Tags().get(self.schema)
target_columns = (
Expand Down
4 changes: 3 additions & 1 deletion src/arche/data_quality_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def create_figures(self, items):

validation_errors = self.report.results.get(
"JSON Schema Validation",
schema_rules.validate(self.schema, raw_items=items.raw, fast=False),
schema_rules.validate(
self.schema, raw_items=items.raw, keys=items.df.index, fast=False
),
).get_errors_count()

garbage_symbols_result = self.report.results.get(
Expand Down
14 changes: 6 additions & 8 deletions src/arche/readers/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ def __len__(self) -> int:
def flat_df(self) -> pd.DataFrame:
if self._flat_df is None:
if self.expand:
self._flat_df = pd.DataFrame(flatten(i) for i in self.raw)
self._flat_df["_key"] = self.df.get(
"_key", [str(i) for i in range(len(self))]
)
flat_df = pd.DataFrame(flatten(i) for i in self.raw)
flat_df.index = self.df.index
self._flat_df = flat_df.drop(columns=["_key", "_type"], errors="ignore")
else:
self._flat_df = self.df
return self._flat_df
Expand Down Expand Up @@ -63,9 +62,6 @@ def origin_column_name(self, new: str) -> str:

@classmethod
def from_df(cls, df: pd.DataFrame, expand: bool = True):
if "_key" not in df.columns:
df["_key"] = df.index
df["_key"] = df["_key"].apply(str)
return cls(raw=np.array(df.to_dict("records")), df=df, expand=expand)

@classmethod
Expand All @@ -88,7 +84,9 @@ def __init__(
self.expand = expand
raw = self.fetch_data()
df = pd.DataFrame(list(raw))
df["_key"] = self.format_keys(df["_key"])
df.index = self.format_keys(df["_key"])
df.index.name = None
df = df.drop(columns=["_key", "_type"], errors="ignore")
super().__init__(raw=raw, df=df, expand=expand)

@property
Expand Down
12 changes: 6 additions & 6 deletions src/arche/rules/duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def check_items(df: pd.DataFrame, tagged_fields: TaggedFields) -> Result:
errors = {}
name_field = name_fields[0]
url_field = url_fields[0]
df = df[[name_field, url_field, "_key"]]
df = df[[name_field, url_field]]
duplicates = df[df[[name_field, url_field]].duplicated(keep=False)]
if duplicates.empty:
return result
Expand All @@ -30,7 +30,7 @@ def check_items(df: pd.DataFrame, tagged_fields: TaggedFields) -> Result:
msg = (
f"same '{d[name_field].iloc[0]}' name and '{d[url_field].iloc[0]}' url"
)
errors[msg] = list(d["_key"])
errors[msg] = list(d.index)
result.add_error(
f"{len(duplicates)} duplicate(s) with same name and url", errors=errors
)
Expand All @@ -53,10 +53,10 @@ def check_uniqueness(df: pd.DataFrame, tagged_fields: TaggedFields) -> Result:
err_keys = set()
for field in unique_fields:
result.items_count = df[field].count()
duplicates = df[df[field].duplicated(keep=False)][[field, "_key"]]
duplicates = df[df[field].duplicated(keep=False)][[field]]
errors = {}
for _, d in duplicates.groupby([field]):
keys = list(d["_key"])
keys = list(d.index)
msg = f"same '{d[field].iloc[0]}' {field}"
errors[msg] = keys
err_keys = err_keys.union(keys)
Expand All @@ -79,7 +79,7 @@ def find_by(df: pd.DataFrame, columns: List[str]) -> Result:
result = Result("Items Uniqueness By Columns")
result.items_count = len(df)
df = df.dropna(subset=columns, how="all")
duplicates = df[df[columns].duplicated(keep=False)][columns + ["_key"]]
duplicates = df[df[columns].duplicated(keep=False)][columns]
if duplicates.empty:
return result

Expand All @@ -89,7 +89,7 @@ def find_by(df: pd.DataFrame, columns: List[str]) -> Result:
msg = "same"
for c in columns:
msg = f"{msg} '{d[c].iloc[0]}' {c}"
errors[msg] = list(d["_key"])
errors[msg] = list(d.index)

result.add_error(
f"{len(duplicates)} duplicate(s) with same {', '.join(columns)}", errors=errors
Expand Down
7 changes: 5 additions & 2 deletions src/arche/rules/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from arche.rules.result import Result
from arche.tools.json_schema_validator import JsonSchemaValidator
import numpy as np
import pandas as pd


def validate(schema: Schema, raw_items: RawItems, fast: bool = False) -> Result:
def validate(
schema: Schema, raw_items: RawItems, keys: pd.Index, fast: bool = False
) -> Result:
"""Run JSON schema validation against data.

Args:
Expand All @@ -14,7 +17,7 @@ def validate(schema: Schema, raw_items: RawItems, fast: bool = False) -> Result:
Returns:
Schema errors if any
"""
validator = JsonSchemaValidator(schema)
validator = JsonSchemaValidator(schema, keys)
validator.run(raw_items, fast)
result = Result("JSON Schema Validation")

Expand Down
2 changes: 1 addition & 1 deletion src/arche/rules/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def garbage_symbols(items: Items) -> Result:
matches = items.flat_df[column].str.extractall(garbage, flags=re.IGNORECASE)
matches = matches[["spaces", "html_entities", "css", "html_tags"]]
if not matches.empty:
error_keys = items.flat_df.iloc[matches.unstack().index.values]["_key"]
error_keys = items.flat_df.loc[matches.unstack().index.values].index
original_column = items.origin_column_name(column)
bad_texts = matches.stack().value_counts().index.sort_values().tolist()
error = (
Expand Down
28 changes: 12 additions & 16 deletions src/arche/rules/price.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):

df_prices_less = pd.DataFrame(
prices[prices[price_was_field] < prices[price_field]],
columns=["_key", price_was_field, price_field],
columns=[price_was_field, price_field],
)

price_less_percent = "{:.2%}".format(len(df_prices_less) / items_number)
Expand All @@ -37,12 +37,12 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):
result.add_error(
f"{price_less_percent} ({len(df_prices_less)}) of "
f"items with {price_was_field} < {price_field}",
detailed=f"{error}:\n{list(df_prices_less['_key'])}",
detailed=f"{error}:\n{list(df_prices_less.index)}",
)

df_prices_equals = pd.DataFrame(
prices[prices[price_was_field] == prices[price_field]],
columns=["_key", price_was_field, price_field],
columns=[price_was_field, price_field],
)
price_equal_percent = "{:.2%}".format(len(df_prices_equals) / items_number)

Expand All @@ -54,7 +54,7 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):
),
detailed=(
f"Prices equal for {len(df_prices_equals)} items:\n"
f"{list(df_prices_equals['_key'])}"
f"{list(df_prices_equals.index)}"
),
)

Expand Down Expand Up @@ -104,7 +104,7 @@ def compare_prices_for_same_urls(

missing_detailed_messages = []
for url in missing_urls:
key = target_df.loc[target_df[url_field] == url]["_key"].iloc[0]
key = target_df.loc[target_df[url_field] == url].index[0]
missing_detailed_messages.append(f"Missing {url} from {key}")

result.add_info(
Expand Down Expand Up @@ -135,8 +135,8 @@ def compare_prices_for_same_urls(
and ratio_diff(source_price, target_price) > 0.1
):
diff_prices_count += 1
source_key = source_df[source_df[url_field] == url]["_key"].iloc[0]
target_key = target_df[target_df[url_field] == url]["_key"].iloc[0]
source_key = source_df[source_df[url_field] == url].index[0]
target_key = target_df[target_df[url_field] == url].index[0]
msg = (
f"different prices for url: {url}\nsource price is {source_price} "
f"for {source_key}\ntarget price is {target_price} for {target_key}"
Expand Down Expand Up @@ -197,8 +197,8 @@ def compare_names_for_same_urls(
and target_name.strip() != "nan"
):
diff_names_count += 1
source_key = source_df[source_df[url_field] == url]["_key"].iloc[0]
target_key = target_df[target_df[url_field] == url]["_key"].iloc[0]
source_key = source_df[source_df[url_field] == url].index[0]
target_key = target_df[target_df[url_field] == url].index[0]
msg = (
f"different names for url: {url}\nsource name is {source_name} "
f"for {source_key}\ntarget name is {target_name} for {target_key}"
Expand Down Expand Up @@ -245,7 +245,7 @@ def compare_prices_for_same_names(

detailed_messages = []
for name in missing_names:
target_key = target_df.loc[target_df[name_field] == name]["_key"].iloc[0]
target_key = target_df.loc[target_df[name_field] == name].index[0]
msg = f"Missing {name} from {target_key}"
if product_url_field:
url = target_df.loc[target_df[name_field] == name][product_url_field].iloc[
Expand Down Expand Up @@ -277,12 +277,8 @@ def compare_prices_for_same_names(
if is_number(source_price) and is_number(target_price):
if ratio_diff(source_price, target_price) > 0.1:
count += 1
source_key = source_df[source_df[name_field] == name]["_key"].iloc[
0
]
target_key = target_df[target_df[name_field] == name]["_key"].iloc[
0
]
source_key = source_df[source_df[name_field] == name].index[0]
target_key = target_df[target_df[name_field] == name].index[0]
msg = (
f"different price for {name}\nsource price is {source_price} "
f"for {source_key}\ntarget price is {target_price} for {target_key}"
Expand Down
39 changes: 20 additions & 19 deletions src/arche/tools/json_schema_validator.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
from collections import defaultdict
from typing import Any, Dict, Deque
from typing import Deque

from arche.readers.items import RawItems
from arche.readers.schema import Schema
import fastjsonschema
from jsonschema import FormatChecker, validators
import pandas as pd
from tqdm import tqdm_notebook


class JsonSchemaValidator:
def __init__(self, schema: Schema):
def __init__(self, schema: Schema, keys: pd.Index):
self.errors = defaultdict(set)
self.schema = schema
self.keys = keys

def run(self, raw_items: RawItems, fast: bool) -> None:
if fast:
Expand All @@ -23,30 +25,29 @@ def fast_validate(self, raw_items: RawItems) -> None:
"""Verify items one by one. It stops after the first error in an item in most cases.
Faster than jsonschema validation"""
validate = fastjsonschema.compile(self.schema)
for raw_item in tqdm_notebook(raw_items, desc="JSON Schema Validation"):
for i, raw_item in enumerate(
tqdm_notebook(raw_items, desc="Fast Schema Validation")
):
raw_item.pop("_key", None)
raw_item.pop("_type", None)
try:
validate(raw_item)
except fastjsonschema.JsonSchemaException as error:
self.errors[str(error)].add(raw_item["_key"])
self.errors[str(error)].add(self.keys[i])

def validate(self, raw_items: RawItems) -> None:
validator = validators.validator_for(self.schema)(self.schema)
validator.format_checker = FormatChecker()
for raw_item in tqdm_notebook(raw_items, desc="JSON Schema Validation"):
self.validate_item(raw_item, validator)

def validate_item(self, item: Dict[str, Any], validator) -> None:
"""Check a single item against jsonschema

Args:
item: a dict with item data
validator: a validator instance
"""
for e in validator.iter_errors(item):
error = self.format_validation_message(
e.message, e.path, e.schema_path, e.validator
)
self.errors[error].add(item["_key"])
for i, raw_item in enumerate(
tqdm_notebook(raw_items, desc="JSON Schema Validation")
):
raw_item.pop("_key", None)
raw_item.pop("_type", None)
for e in validator.iter_errors(raw_item):
error = self.format_validation_message(
e.message, e.path, e.schema_path, e.validator
)
self.errors[error].add(self.keys[i])

@staticmethod
def format_validation_message(
Expand Down
2 changes: 2 additions & 0 deletions src/arche/tools/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def create_json_schema(
samples = []
for n in item_numbers:
item = api.get_items(source_key, start_index=n, count=1, p_bar=None)[0]
item.pop("_type", None)
item.pop("_key", None)
samples.append(item)

return infer_schema(samples)
Expand Down
Loading