Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

_key and _type #106

Merged
merged 11 commits into from
Jun 12, 2019
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Note that the top-most release is changes in the unreleased master branch on Git
### Changed
- `Arche.report_all()` does not shorten report by default, added `short` parameter.
- `expand=True` which enables nested data flattening is more than 100x faster and consumes ~2x less memory than before, #94
- Data is consistent with Dash and Spidermon: `_type, _key` fields are dropped from dataframe, raw data, basic schema, #104, #106
- `df.index` now stores `_key` instead
- `basic_json_schema()` works with `deleted` jobs
### Fixed
- `Arche.glance()`, #88
Expand Down
14 changes: 11 additions & 3 deletions src/arche/arche.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ def validate_with_json_schema(self) -> None:
"""Run JSON schema check and output results. It will try to find all errors, but
there are no guarantees. Slower than `check_with_json_schema()`
"""
res = schema_rules.validate(self.schema, self.source_items.raw)
res = schema_rules.validate(
self.schema, self.source_items.raw, self.source_items.df.index
)
self.save_result(res)
res.show()

Expand All @@ -173,14 +175,20 @@ def glance(self) -> None:
only the first error per item. Usable for big jobs as it's about 100x faster than
`validate_with_json_schema()`.
"""
res = schema_rules.validate(self.schema, self.source_items.raw, fast=True)
res = schema_rules.validate(
self.schema, self.source_items.raw, self.source_items.df.index, fast=True
)
self.save_result(res)
res.show()

def run_schema_rules(self) -> None:
if not self.schema:
return
self.save_result(schema_rules.validate(self.schema, self.source_items.raw))
self.save_result(
schema_rules.validate(
self.schema, self.source_items.raw, self.source_items.df.index
)
)

tagged_fields = sr.Tags().get(self.schema)
target_columns = (
Expand Down
4 changes: 3 additions & 1 deletion src/arche/data_quality_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def create_figures(self, items):

validation_errors = self.report.results.get(
"JSON Schema Validation",
schema_rules.validate(self.schema, raw_items=items.raw, fast=False),
schema_rules.validate(
self.schema, raw_items=items.raw, keys=items.df.index, fast=False
),
).get_errors_count()

garbage_symbols_result = self.report.results.get(
Expand Down
14 changes: 6 additions & 8 deletions src/arche/readers/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ def __len__(self) -> int:
def flat_df(self) -> pd.DataFrame:
if self._flat_df is None:
if self.expand:
self._flat_df = pd.DataFrame(flatten(i) for i in self.raw)
self._flat_df["_key"] = self.df.get(
"_key", [str(i) for i in range(len(self))]
)
flat_df = pd.DataFrame(flatten(i) for i in self.raw)
flat_df.index = self.df.index
self._flat_df = flat_df.drop(columns=["_key", "_type"], errors="ignore")
else:
self._flat_df = self.df
return self._flat_df
Expand Down Expand Up @@ -63,9 +62,6 @@ def origin_column_name(self, new: str) -> str:

@classmethod
def from_df(cls, df: pd.DataFrame, expand: bool = True):
if "_key" not in df.columns:
df["_key"] = df.index
df["_key"] = df["_key"].apply(str)
return cls(raw=np.array(df.to_dict("records")), df=df, expand=expand)

@classmethod
Expand All @@ -88,7 +84,9 @@ def __init__(
self.expand = expand
raw = self.fetch_data()
df = pd.DataFrame(list(raw))
df["_key"] = self.format_keys(df["_key"])
df.index = self.format_keys(df["_key"])
df.index.name = None
df = df.drop(columns=["_key", "_type"], errors="ignore")
super().__init__(raw=raw, df=df, expand=expand)

@property
Expand Down
2 changes: 1 addition & 1 deletion src/arche/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@ def url(x: str) -> str:
if keys.dtype == np.dtype("object") and "/" in keys.iloc[0]:
sample = sample.apply(url)

return ", ".join(sample)
return ", ".join(sample.apply(str))
12 changes: 6 additions & 6 deletions src/arche/rules/duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def check_items(df: pd.DataFrame, tagged_fields: TaggedFields) -> Result:
errors = {}
name_field = name_fields[0]
url_field = url_fields[0]
df = df[[name_field, url_field, "_key"]]
df = df[[name_field, url_field]]
duplicates = df[df[[name_field, url_field]].duplicated(keep=False)]
if duplicates.empty:
return result
Expand All @@ -30,7 +30,7 @@ def check_items(df: pd.DataFrame, tagged_fields: TaggedFields) -> Result:
msg = (
f"same '{d[name_field].iloc[0]}' name and '{d[url_field].iloc[0]}' url"
)
errors[msg] = list(d["_key"])
errors[msg] = list(d.index)
result.add_error(
f"{len(duplicates)} duplicate(s) with same name and url", errors=errors
)
Expand All @@ -53,10 +53,10 @@ def check_uniqueness(df: pd.DataFrame, tagged_fields: TaggedFields) -> Result:
err_keys = set()
for field in unique_fields:
result.items_count = df[field].count()
duplicates = df[df[field].duplicated(keep=False)][[field, "_key"]]
duplicates = df[df[field].duplicated(keep=False)][[field]]
errors = {}
for _, d in duplicates.groupby([field]):
keys = list(d["_key"])
keys = list(d.index)
msg = f"same '{d[field].iloc[0]}' {field}"
errors[msg] = keys
err_keys = err_keys.union(keys)
Expand All @@ -79,7 +79,7 @@ def find_by(df: pd.DataFrame, columns: List[str]) -> Result:
result = Result("Items Uniqueness By Columns")
result.items_count = len(df)
df = df.dropna(subset=columns, how="all")
duplicates = df[df[columns].duplicated(keep=False)][columns + ["_key"]]
duplicates = df[df[columns].duplicated(keep=False)][columns]
if duplicates.empty:
return result

Expand All @@ -89,7 +89,7 @@ def find_by(df: pd.DataFrame, columns: List[str]) -> Result:
msg = "same"
for c in columns:
msg = f"{msg} '{d[c].iloc[0]}' {c}"
errors[msg] = list(d["_key"])
errors[msg] = list(d.index)

result.add_error(
f"{len(duplicates)} duplicate(s) with same {', '.join(columns)}", errors=errors
Expand Down
12 changes: 7 additions & 5 deletions src/arche/rules/json_schema.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from arche.readers.items import RawItems
from arche.readers.schema import Schema, Tag, TaggedFields
from arche.rules.result import Result
from arche.tools.json_schema_validator import JsonSchemaValidator
from arche.tools.schema import fast_validate, full_validate
import numpy as np
import pandas as pd


def validate(schema: Schema, raw_items: RawItems, fast: bool = False) -> Result:
def validate(
schema: Schema, raw_items: RawItems, keys: pd.Index, fast: bool = False
) -> Result:
"""Run JSON schema validation against data.

Args:
Expand All @@ -14,11 +17,10 @@ def validate(schema: Schema, raw_items: RawItems, fast: bool = False) -> Result:
Returns:
Schema errors if any
"""
validator = JsonSchemaValidator(schema)
validator.run(raw_items, fast)
validate_func = fast_validate if fast else full_validate
errors = validate_func(schema, raw_items, keys)
result = Result("JSON Schema Validation")

errors = validator.errors
schema_result_message = (
f"{len(raw_items)} items were checked, {len(errors)} error(s)"
)
Expand Down
2 changes: 1 addition & 1 deletion src/arche/rules/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def garbage_symbols(items: Items) -> Result:
matches = items.flat_df[column].str.extractall(garbage, flags=re.IGNORECASE)
matches = matches[["spaces", "html_entities", "css", "html_tags"]]
if not matches.empty:
error_keys = items.flat_df.iloc[matches.unstack().index.values]["_key"]
error_keys = items.flat_df.loc[matches.unstack().index.values].index
original_column = items.origin_column_name(column)
bad_texts = matches.stack().value_counts().index.sort_values().tolist()
error = (
Expand Down
28 changes: 12 additions & 16 deletions src/arche/rules/price.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):

df_prices_less = pd.DataFrame(
prices[prices[price_was_field] < prices[price_field]],
columns=["_key", price_was_field, price_field],
columns=[price_was_field, price_field],
)

price_less_percent = "{:.2%}".format(len(df_prices_less) / items_number)
Expand All @@ -37,12 +37,12 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):
result.add_error(
f"{price_less_percent} ({len(df_prices_less)}) of "
f"items with {price_was_field} < {price_field}",
detailed=f"{error}:\n{list(df_prices_less['_key'])}",
detailed=f"{error}:\n{list(df_prices_less.index)}",
)

df_prices_equals = pd.DataFrame(
prices[prices[price_was_field] == prices[price_field]],
columns=["_key", price_was_field, price_field],
columns=[price_was_field, price_field],
)
price_equal_percent = "{:.2%}".format(len(df_prices_equals) / items_number)

Expand All @@ -54,7 +54,7 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):
),
detailed=(
f"Prices equal for {len(df_prices_equals)} items:\n"
f"{list(df_prices_equals['_key'])}"
f"{list(df_prices_equals.index)}"
),
)

Expand Down Expand Up @@ -104,7 +104,7 @@ def compare_prices_for_same_urls(

missing_detailed_messages = []
for url in missing_urls:
key = target_df.loc[target_df[url_field] == url]["_key"].iloc[0]
key = target_df.loc[target_df[url_field] == url].index[0]
missing_detailed_messages.append(f"Missing {url} from {key}")

result.add_info(
Expand Down Expand Up @@ -135,8 +135,8 @@ def compare_prices_for_same_urls(
and ratio_diff(source_price, target_price) > 0.1
):
diff_prices_count += 1
source_key = source_df[source_df[url_field] == url]["_key"].iloc[0]
target_key = target_df[target_df[url_field] == url]["_key"].iloc[0]
source_key = source_df[source_df[url_field] == url].index[0]
target_key = target_df[target_df[url_field] == url].index[0]
msg = (
f"different prices for url: {url}\nsource price is {source_price} "
f"for {source_key}\ntarget price is {target_price} for {target_key}"
Expand Down Expand Up @@ -197,8 +197,8 @@ def compare_names_for_same_urls(
and target_name.strip() != "nan"
):
diff_names_count += 1
source_key = source_df[source_df[url_field] == url]["_key"].iloc[0]
target_key = target_df[target_df[url_field] == url]["_key"].iloc[0]
source_key = source_df[source_df[url_field] == url].index[0]
target_key = target_df[target_df[url_field] == url].index[0]
msg = (
f"different names for url: {url}\nsource name is {source_name} "
f"for {source_key}\ntarget name is {target_name} for {target_key}"
Expand Down Expand Up @@ -245,7 +245,7 @@ def compare_prices_for_same_names(

detailed_messages = []
for name in missing_names:
target_key = target_df.loc[target_df[name_field] == name]["_key"].iloc[0]
target_key = target_df.loc[target_df[name_field] == name].index[0]
msg = f"Missing {name} from {target_key}"
if product_url_field:
url = target_df.loc[target_df[name_field] == name][product_url_field].iloc[
Expand Down Expand Up @@ -277,12 +277,8 @@ def compare_prices_for_same_names(
if is_number(source_price) and is_number(target_price):
if ratio_diff(source_price, target_price) > 0.1:
count += 1
source_key = source_df[source_df[name_field] == name]["_key"].iloc[
0
]
target_key = target_df[target_df[name_field] == name]["_key"].iloc[
0
]
source_key = source_df[source_df[name_field] == name].index[0]
target_key = target_df[target_df[name_field] == name].index[0]
msg = (
f"different price for {name}\nsource price is {source_price} "
f"for {source_key}\ntarget price is {target_price} for {target_key}"
Expand Down
83 changes: 0 additions & 83 deletions src/arche/tools/json_schema_validator.py

This file was deleted.

Loading