Skip to content

Commit

Permalink
Fix start (#113)
Browse files Browse the repository at this point in the history
* Rewrite Source and add StoreSource mocks

* Separate start_index and start, support for collections, closes #112
  • Loading branch information
manycoding committed Jun 17, 2019
1 parent b5543e3 commit fc5cf41
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 103 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ Note that the top-most release is changes in the unreleased master branch on Git
- Data is consistent with Dash and Spidermon: `_type, _key` fields are dropped from dataframe, raw data, basic schema, #104, #106
- `df.index` now stores `_key` instead
- `basic_json_schema()` works with `deleted` jobs
- `start` is supported for Collections, #112
- `enum` is counted as a `category` tag, #18
### Fixed
- `Arche.glance()`, #88
- Item links in Schema validation errors, #89
- Empty NAN bars on category graphs, #93
- `data_quality_report()`, #95
- Wrong number of Collection Items if it contains item 0, #112
### Removed


Expand Down
20 changes: 7 additions & 13 deletions src/arche/arche.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def __init__(
source: Union[str, pd.DataFrame, RawItems],
schema: Optional[SchemaSource] = None,
target: Optional[Union[str, pd.DataFrame]] = None,
start: int = 0,
count: Optional[int] = None,
start: Union[str, int] = None,
filters: Optional[api.Filters] = None,
expand: bool = True,
):
Expand All @@ -34,8 +34,8 @@ def __init__(
source: a data source to validate, accepts job keys, pandas df, lists
schema: a JSON schema source used to run validation
target: a data source to compare with
start: an item number to start reading from
count: the amount of items to read from start
start: an item key to start reading from
filters: Scrapinghub filtering, see
https://python-scrapinghub.readthedocs.io/en/latest/client/apidocs.html#scrapinghub.client.items.Items # noqa
expand: if True, use flattened data in garbage rules, affects performance
Expand Down Expand Up @@ -72,7 +72,7 @@ def __init__(
def source_items(self):
if not self._source_items:
self._source_items = self.get_items(
self.source, self.start, self.count, self.filters, self.expand
self.source, self.count, self.start, self.filters, self.expand
)
return self._source_items

Expand All @@ -82,7 +82,7 @@ def target_items(self):
return None
if not self._target_items:
self._target_items = self.get_items(
self.target, self.start, self.count, self.filters, self.expand
self.target, self.count, self.start, self.filters, self.expand
)
return self._target_items

Expand All @@ -100,8 +100,8 @@ def schema(self, schema_source):
@staticmethod
def get_items(
source: Union[str, pd.DataFrame, RawItems],
start: int,
count: Optional[int],
start: Union[str, int],
filters: Optional[api.Filters],
expand: bool,
) -> Union[JobItems, CollectionItems]:
Expand All @@ -110,15 +110,9 @@ def get_items(
elif isinstance(source, Iterable) and not isinstance(source, str):
return Items.from_array(source, expand=expand)
elif helpers.is_job_key(source):
return JobItems(
key=source, start=start, count=count, filters=filters, expand=expand
)
return JobItems(source, count, start, filters, expand)
elif helpers.is_collection_key(source):
if start:
raise ValueError("Collections API does not support 'start' parameter")
return CollectionItems(
key=source, count=count, filters=filters, expand=expand
)
return CollectionItems(source, count, start, filters, expand)
else:
raise ValueError(f"'{source}' is not a valid job or collection key")

Expand Down
25 changes: 21 additions & 4 deletions src/arche/readers/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ class JobItems(CloudItems):
def __init__(
self,
key: str,
start: int = 0,
count: Optional[int] = None,
start_index: int = 0,
filters: Optional[api.Filters] = None,
expand: bool = True,
):
self.start_index: int = start
self.start_index = start_index
self.start: int = f"{key}/{start_index}"
self._job: Job = None
super().__init__(key, count, filters, expand)

Expand All @@ -145,7 +146,9 @@ def job(self) -> Job:

def fetch_data(self) -> np.ndarray:
if self.filters or self.count < 200_000:
return api.get_items(self.key, self.count, self.start_index, self.filters)
return api.get_items(
self.key, self.count, self.start_index, self.start, self.filters
)
else:
return api.get_items_with_pool(self.key, self.count, self.start_index)

Expand All @@ -158,6 +161,17 @@ def format_keys(self, keys: pd.Series) -> pd.Series:


class CollectionItems(CloudItems):
def __init__(
self,
key: str,
count: Optional[int] = None,
start: Optional[str] = None,
filters: Optional[api.Filters] = None,
expand: bool = True,
):
self.start = start
super().__init__(key, count, filters, expand)

@property
def limit(self) -> int:
if not self._limit:
Expand All @@ -171,7 +185,10 @@ def count(self) -> int:
return self._count

def fetch_data(self) -> np.ndarray:
return api.get_items(self.key, self.count, 0, self.filters)
desc = f"Fetching from '{self.key.rsplit('/')[-1]}'"
return api.get_items(
self.key, self.count, 0, self.start, self.filters, desc=desc
)

def format_keys(self, keys: pd.Series) -> pd.Series:
"""Get full Scrapy Cloud url from `_key`
Expand Down
22 changes: 11 additions & 11 deletions src/arche/tools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def get_source(source_key):


def get_items_with_pool(
source_key: str, count: int, start_index: int = 0, workers: int = 4
source_key: str, count: int, start_index: int, workers: int = 4
) -> np.ndarray:
"""Concurrently reads items from API using Pool
Expand All @@ -139,10 +139,12 @@ def get_items_with_pool(
processes_count = min(max(helpers.cpus_count(), workers), active_connections_limit)
batch_size = math.ceil(count / processes_count)

start_idxs = range(start_index, start_index + count, batch_size)
start = [f"{source_key}/{i}" for i in start_idxs]
with Pool(processes_count) as p:
results = p.starmap(
partial(get_items, source_key, batch_size, p_bar=tqdm),
zip([i for i in range(start_index, start_index + count, batch_size)]),
zip(start_idxs, start),
)
return np.concatenate(results)

Expand All @@ -151,18 +153,16 @@ def get_items(
key: str,
count: int,
start_index: int,
start: str,
filters: Optional[Filters] = None,
p_bar: Union[tqdm, tqdm_notebook] = tqdm_notebook,
desc: Optional[str] = None,
) -> np.ndarray:
source = get_source(key)
items_iter = source.iter(
start=f"{key}/{start_index}", count=count, filter=filters, meta="_key"
)
items_iter = source.iter(start=start, count=count, filter=filters, meta="_key")

if p_bar:
items_iter = p_bar(
items_iter,
desc=f"Fetching {start_index}:{start_index+count} from {key}",
total=count,
unit_scale=1,
)
if not desc:
desc = f"Fetching {start_index}:{start_index+count} from {key}"
items_iter = p_bar(items_iter, desc=desc, total=count, unit_scale=1)
return np.asarray(list(items_iter))
22 changes: 12 additions & 10 deletions src/arche/tools/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,32 @@ def basic_json_schema(data_source: str, items_numbers: List[int] = None) -> Sche


def create_json_schema(
source_key: str, item_numbers: Optional[List[int]] = None
source_key: str, items_numbers: Optional[List[int]] = None
) -> Schema:
if helpers.is_collection_key(source_key):
store = api.get_collection(source_key)
items_count = store.count()
start_mask = ""
elif helpers.is_job_key(source_key):
items_count = api.get_items_count(api.get_job(source_key))
start_mask = f"{source_key}/"
else:
raise ValueError(f"'{source_key}' is not a valid job or collection key")

if items_count == 0:
raise ValueError(f"'{source_key}' does not have any items")

item_n_err = "{} is a bad item number, choose numbers between 0 and {}"
if item_numbers:
item_numbers.sort()
if item_numbers[-1] >= items_count or item_numbers[0] < 0:
raise ValueError(item_n_err.format(item_numbers[-1], items_count - 1))
else:
item_numbers = set_item_no(items_count)
items_numbers = items_numbers or set_item_no(items_count)
if max(items_numbers) >= items_count or min(items_numbers) < 0:
raise ValueError(
f"Expected values between 0 and {items_count}, got '{items_numbers}'"
)

samples = []
for n in item_numbers:
item = api.get_items(source_key, start_index=n, count=1, p_bar=None)[0]
for n in items_numbers:
item = api.get_items(
source_key, count=1, start_index=n, start=f"{start_mask}{n}", p_bar=None
)[0]
item.pop("_type", None)
item.pop("_key", None)
samples.append(item)
Expand Down
69 changes: 43 additions & 26 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
{"_key": "112358/13/21/2", "_type": "Type", "price": 10, "name": "Yulia"},
{"_key": "112358/13/21/3", "_type": "Type", "price": 11, "name": "Vivien"},
]

DEFAULT_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema",
"required": ["name"],
Expand Down Expand Up @@ -64,8 +65,16 @@ def count(self) -> Optional[int]:
return self._count


def _is_filtered(x, by):
if by:
return x.get(by[0][0]) == by[0][1][0]
return True


class Source:
def __init__(self, items=None, stats=None):
def __init__(
self, items: Optional[List[Dict]] = None, stats: Optional[Dict] = None
):
self.items = items
if stats:
self._stats = stats
Expand All @@ -81,34 +90,42 @@ def stats(self):

def iter(self, **kwargs):
start = kwargs.get("start", 0)
count = kwargs.get("count", None)
counter = 0
if start:
start = int(start.split("/")[-1])
count = kwargs.get("count", len(self.items) - start)

# Scrapinghub API returns all posible items even if `count` greater than possible
if start + count > len(self.items):
limit = len(self.items)
else:
limit = start + count

if kwargs.get("filter"):
field_name = kwargs.get("filter")[0][0]
value = kwargs.get("filter")[0][1][0]
filtered_items = []

counter = 0
for index in range(start, limit):
if counter == limit:
return
if self.items[index].get(field_name) == value:
filtered_items.append(self.items[index])
counter += 1

for filtered_item in filtered_items:
yield filtered_item
else:
for index in range(start, limit):
yield self.items[index]
for item in self.items[start:]:
if counter == count:
return
if _is_filtered(item, kwargs.get("filter")):
counter += 1
yield item


class StoreSource:
def __init__(self, items: List[Dict] = None):
self.items = items

def count(self):
return len(self.items)

def iter(self, **kwargs):
start = kwargs.get("start", self.items[0].get("_key"))

def start_idx():
for i, item in enumerate(self.items):
if item.get("_key") == start:
return i

count = kwargs.get("count", None)
counter = 0
for item in self.items[start_idx() :]:
if counter == count:
return
if _is_filtered(item, kwargs.get("filter")):
counter += 1
yield item


@pytest.fixture(scope="function")
Expand Down
Loading

0 comments on commit fc5cf41

Please sign in to comment.