Skip to content

Commit

Permalink
Separate start_index and start, support for collections, closes #112
Browse files Browse the repository at this point in the history
  • Loading branch information
manycoding committed Jun 13, 2019
1 parent 09ff7c1 commit 274b897
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 75 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ Note that the top-most release is changes in the unreleased master branch on Git
- Data is consistent with Dash and Spidermon: `_type, _key` fields are dropped from dataframe, raw data, basic schema, #104, #106
- `df.index` now stores `_key` instead
- `basic_json_schema()` works with `deleted` jobs
- `start` is supported for Collections, #112
### Fixed
- `Arche.glance()`, #88
- Item links in Schema validation errors, #89
- Empty NAN bars on category graphs, #93
- `data_quality_report()`, #95
- Wrong number of Collection Items if it contains item 0, #112
### Removed


Expand Down
20 changes: 7 additions & 13 deletions src/arche/arche.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def __init__(
source: Union[str, pd.DataFrame, RawItems],
schema: Optional[sr.SchemaSource] = None,
target: Optional[Union[str, pd.DataFrame]] = None,
start: int = 0,
count: Optional[int] = None,
start: Union[str, int] = None,
filters: Optional[api.Filters] = None,
expand: bool = True,
):
Expand All @@ -34,8 +34,8 @@ def __init__(
source: a data source to validate, accepts job keys, pandas df, lists
schema: a JSON schema source used to run validation
target: a data source to compare with
start: an item number to start reading from
count: the amount of items to read from start
start: an item key to start reading from
filters: Scrapinghub filtering, see
https://python-scrapinghub.readthedocs.io/en/latest/client/apidocs.html#scrapinghub.client.items.Items # noqa
expand: if True, use flattened data in garbage rules, affects performance
Expand Down Expand Up @@ -72,7 +72,7 @@ def __init__(
def source_items(self):
if not self._source_items:
self._source_items = self.get_items(
self.source, self.start, self.count, self.filters, self.expand
self.source, self.count, self.start, self.filters, self.expand
)
return self._source_items

Expand All @@ -82,7 +82,7 @@ def target_items(self):
return None
if not self._target_items:
self._target_items = self.get_items(
self.target, self.start, self.count, self.filters, self.expand
self.target, self.count, self.start, self.filters, self.expand
)
return self._target_items

Expand All @@ -100,8 +100,8 @@ def schema(self, schema_source):
@staticmethod
def get_items(
source: Union[str, pd.DataFrame, RawItems],
start: int,
count: Optional[int],
start: Union[str, int],
filters: Optional[api.Filters],
expand: bool,
) -> Union[JobItems, CollectionItems]:
Expand All @@ -110,15 +110,9 @@ def get_items(
elif isinstance(source, Iterable) and not isinstance(source, str):
return Items.from_array(source, expand=expand)
elif helpers.is_job_key(source):
return JobItems(
key=source, start=start, count=count, filters=filters, expand=expand
)
return JobItems(source, count, start, filters, expand)
elif helpers.is_collection_key(source):
if start:
raise ValueError("Collections API does not support 'start' parameter")
return CollectionItems(
key=source, count=count, filters=filters, expand=expand
)
return CollectionItems(source, count, start, filters, expand)
else:
raise ValueError(f"'{source}' is not a valid job or collection key")

Expand Down
25 changes: 21 additions & 4 deletions src/arche/readers/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ class JobItems(CloudItems):
def __init__(
self,
key: str,
start: int = 0,
count: Optional[int] = None,
start_index: int = 0,
filters: Optional[api.Filters] = None,
expand: bool = True,
):
self.start_index: int = start
self.start_index = start_index
self.start: int = f"{key}/{start_index}"
self._job: Job = None
super().__init__(key, count, filters, expand)

Expand All @@ -145,7 +146,9 @@ def job(self) -> Job:

def fetch_data(self) -> np.ndarray:
if self.filters or self.count < 200_000:
return api.get_items(self.key, self.count, self.start_index, self.filters)
return api.get_items(
self.key, self.count, self.start_index, self.start, self.filters
)
else:
return api.get_items_with_pool(self.key, self.count, self.start_index)

Expand All @@ -158,6 +161,17 @@ def format_keys(self, keys: pd.Series) -> pd.Series:


class CollectionItems(CloudItems):
def __init__(
self,
key: str,
count: Optional[int] = None,
start: Optional[str] = None,
filters: Optional[api.Filters] = None,
expand: bool = True,
):
self.start = start
super().__init__(key, count, filters, expand)

@property
def limit(self) -> int:
if not self._limit:
Expand All @@ -171,7 +185,10 @@ def count(self) -> int:
return self._count

def fetch_data(self) -> np.ndarray:
return api.get_items(self.key, self.count, 0, self.filters)
desc = f"Fetching from '{self.key.rsplit('/')[-1]}'"
return api.get_items(
self.key, self.count, 0, self.start, self.filters, desc=desc
)

def format_keys(self, keys: pd.Series) -> pd.Series:
"""Get full Scrapy Cloud url from `_key`
Expand Down
22 changes: 11 additions & 11 deletions src/arche/tools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def get_source(source_key):


def get_items_with_pool(
source_key: str, count: int, start_index: int = 0, workers: int = 4
source_key: str, count: int, start_index: int, workers: int = 4
) -> np.ndarray:
"""Concurrently reads items from API using Pool
Expand All @@ -139,10 +139,12 @@ def get_items_with_pool(
processes_count = min(max(helpers.cpus_count(), workers), active_connections_limit)
batch_size = math.ceil(count / processes_count)

start_idxs = [i for i in range(start_index, start_index + count, batch_size)]
start = [f"{source_key}/{i}" for i in start_idxs]
with Pool(processes_count) as p:
results = p.starmap(
partial(get_items, source_key, batch_size, p_bar=tqdm),
zip([i for i in range(start_index, start_index + count, batch_size)]),
zip(start_idxs, start),
)
return np.concatenate(results)

Expand All @@ -151,18 +153,16 @@ def get_items(
key: str,
count: int,
start_index: int,
start: str,
filters: Optional[Filters] = None,
p_bar: Union[tqdm, tqdm_notebook] = tqdm_notebook,
desc: Optional[str] = None,
) -> np.ndarray:
source = get_source(key)
items_iter = source.iter(
start=f"{key}/{start_index}", count=count, filter=filters, meta="_key"
)
items_iter = source.iter(start=start, count=count, filter=filters, meta="_key")

if p_bar:
items_iter = p_bar(
items_iter,
desc=f"Fetching {start_index}:{start_index+count} from {key}",
total=count,
unit_scale=1,
)
if not desc:
desc = f"Fetching {start_index}:{start_index+count} from {key}"
items_iter = p_bar(items_iter, desc=desc, total=count, unit_scale=1)
return np.asarray(list(items_iter))
20 changes: 12 additions & 8 deletions src/arche/tools/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,34 @@ def basic_json_schema(data_source: str, items_numbers: List[int] = None) -> Basi


def create_json_schema(
source_key: str, item_numbers: Optional[List[int]] = None
source_key: str, items_numbers: Optional[List[int]] = None
) -> Schema:
if helpers.is_collection_key(source_key):
store = api.get_collection(source_key)
items_count = store.count()
start_mask = ""
elif helpers.is_job_key(source_key):
items_count = api.get_items_count(api.get_job(source_key))
start_mask = f"{source_key}/"
else:
raise ValueError(f"'{source_key}' is not a valid job or collection key")

if items_count == 0:
raise ValueError(f"'{source_key}' does not have any items")

item_n_err = "{} is a bad item number, choose numbers between 0 and {}"
if item_numbers:
item_numbers.sort()
if item_numbers[-1] >= items_count or item_numbers[0] < 0:
raise ValueError(item_n_err.format(item_numbers[-1], items_count - 1))
if items_numbers:
items_numbers.sort()
if items_numbers[-1] >= items_count or items_numbers[0] < 0:
raise ValueError(item_n_err.format(items_numbers[-1], items_count - 1))
else:
item_numbers = set_item_no(items_count)
items_numbers = set_item_no(items_count)

samples = []
for n in item_numbers:
item = api.get_items(source_key, start_index=n, count=1, p_bar=None)[0]
for n in items_numbers:
item = api.get_items(
source_key, count=1, start_index=n, start=f"{start_mask}{n}", p_bar=None
)[0]
item.pop("_type", None)
item.pop("_key", None)
samples.append(item)
Expand Down
36 changes: 18 additions & 18 deletions tests/readers/test_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,23 @@ def test_items_from_array(raw):

collection_items = np.array(
[
{"_key": "0", "name": "Book", "_type": "Book"},
{"_key": "10", "name": "Book", "_type": "Book"},
{"_key": "1", "name": "Movie", "_type": "Book"},
{"_key": "2", "name": "Guitar", "_type": "Book"},
{"_key": "3", "name": "Dog", "_type": "Book"},
]
)
expected_col_df = pd.DataFrame(
{"name": ["Book", "Movie", "Guitar", "Dog"]},
index=[f"{SH_URL}/key/{i}" for i in range(4)],
index=[f"{SH_URL}/key/{i}" for i in [10, 1, 2, 3]],
)


@pytest.mark.parametrize(
"count, filters, expand, expected_count",
[(1, None, False, 1), (None, None, True, 4)],
"count, start, filters, expand, expected_count",
[(1, "1", None, False, 1), (None, None, None, True, 4)],
)
def test_collection_items(mocker, count, filters, expand, expected_count):
def test_collection_items(mocker, count, start, filters, expand, expected_count):
mocker.patch(
"arche.tools.api.get_collection",
return_value=Collection(len(collection_items)),
Expand All @@ -73,18 +73,20 @@ def test_collection_items(mocker, count, filters, expand, expected_count):
return_value=collection_items[:expected_count],
autospec=True,
)
items = CollectionItems("key", count, filters, expand)
items = CollectionItems("key", count, start, filters, expand)
assert items.key == "key"
assert items.start == start
assert items.filters == filters
assert items.expand == expand
np.testing.assert_array_equal(items.raw, collection_items[:expected_count])
pd.testing.assert_frame_equal(items.df, expected_col_df.iloc[:expected_count])
pd.testing.assert_frame_equal(items.flat_df, items.df)

assert len(items) == len(expected_col_df.iloc[:expected_count])
assert len(items) == expected_count
assert items.limit == len(collection_items)
assert items.count == expected_count
get_items_mock.assert_called_once_with("key", expected_count, 0, filters)
get_items_mock.assert_called_once_with(
"key", expected_count, 0, start, filters, desc="Fetching from 'key'"
)


job_items = np.array(
Expand All @@ -101,20 +103,18 @@ def test_collection_items(mocker, count, filters, expand, expected_count):
)


@pytest.mark.parametrize("start, count, expected_count", [(1, 2, 2)])
def test_job_items(mocker, start, count, expected_count):
def test_job_items(mocker):
mocker.patch("arche.readers.items.JobItems.job", return_value=Job(), autospec=True)
mocker.patch(
"arche.tools.api.get_items",
return_value=job_items[start:expected_count],
autospec=True,
"arche.tools.api.get_items", return_value=job_items[1:3], autospec=True
)
items = JobItems(
key="112358/13/21", start=start, count=count, filters=None, expand=False
key="112358/13/21", count=2, start_index=1, filters=None, expand=False
)
np.testing.assert_array_equal(items.raw, job_items[start:count])
pd.testing.assert_frame_equal(items.df, expected_job_df.iloc[start:count])
assert items.count == count
np.testing.assert_array_equal(items.raw, job_items[1:3])
pd.testing.assert_frame_equal(items.df, expected_job_df.iloc[1:3])
assert items.count == 2
assert items.start == "112358/13/21/1"


def test_process_df():
Expand Down
12 changes: 0 additions & 12 deletions tests/test_arche.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,6 @@ def test_get_items_from_collection(
assert items.expand == expand


def test_get_items_start():
with pytest.raises(ValueError) as excinfo:
Arche.get_items(
source="112358/collections/s/pages",
count=1,
start=1,
filters=None,
expand=None,
)
assert str(excinfo.value) == "Collections API does not support 'start' parameter"


def test_get_items_from_bad_source():
with pytest.raises(ValueError) as excinfo:
Arche.get_items(source="bad_key", count=1, start=1, filters=None, expand=None)
Expand Down
Loading

0 comments on commit 274b897

Please sign in to comment.