diff --git a/scrapinghub/client.py b/scrapinghub/client.py deleted file mode 100644 index 7400cb4b..00000000 --- a/scrapinghub/client.py +++ /dev/null @@ -1,1478 +0,0 @@ -import json -import collections -from functools import partial -from collections import defaultdict - -from six import string_types -from requests.compat import urljoin - -from scrapinghub import Connection as _Connection -from scrapinghub import HubstorageClient as _HubstorageClient - -from .hubstorage.resourcetype import DownloadableResource -from .hubstorage.resourcetype import ItemsResourceType -from .hubstorage.utils import urlpathjoin - -# scrapinghub.hubstorage classes to use as-is -from .hubstorage.job import JobMeta -from .hubstorage.project import Settings - -# scrapinghub.hubstorage proxied classes -from .hubstorage.activity import Activity as _Activity -from .hubstorage.collectionsrt import Collections as _Collections -from .hubstorage.collectionsrt import Collection as _Collection -from .hubstorage.frontier import Frontier as _Frontier -from .hubstorage.job import Items as _Items -from .hubstorage.job import Logs as _Logs -from .hubstorage.job import Samples as _Samples -from .hubstorage.job import Requests as _Requests - -from .exceptions import NotFound, InvalidUsage, DuplicateJobError -from .exceptions import wrap_http_errors, wrap_value_too_large - -from .utils import LogLevel -from .utils import get_tags_for_update -from .utils import parse_auth -from .utils import parse_project_id, parse_job_key -from .utils import proxy_methods -from .utils import wrap_kwargs -from .utils import format_iter_filters - - -class Connection(_Connection): - - @wrap_http_errors - def _request(self, *args, **kwargs): - return super(Connection, self)._request(*args, **kwargs) - - -class HubstorageClient(_HubstorageClient): - - @wrap_http_errors - def request(self, *args, **kwargs): - return super(HubstorageClient, self).request(*args, **kwargs) - - -class ScrapinghubClient(object): - """Main class to work with Scrapinghub API. - - :param auth: Scrapinghub APIKEY or other SH auth credentials. - :param dash_endpoint: (optional) Scrapinghub Dash panel url. - :param \*\*kwargs: (optional) Additional arguments for - :class:`scrapinghub.hubstorage.HubstorageClient` constructor. - - :ivar projects: projects collection, :class:`Projects` instance. - - Usage:: - - >>> from scrapinghub import ScrapinghubClient - >>> client = ScrapinghubClient('APIKEY') - >>> client - - """ - - def __init__(self, auth=None, dash_endpoint=None, **kwargs): - self.projects = Projects(self) - login, password = parse_auth(auth) - self._connection = Connection(apikey=login, - password=password, - url=dash_endpoint) - self._hsclient = HubstorageClient(auth=(login, password), **kwargs) - - def get_project(self, projectid): - """Get :class:`Project` instance with a given project id. - - The method is a shortcut for client.projects.get(). - - :param projectid: integer or string numeric project id. - :return: :class:`Project` object. - :rtype: scrapinghub.client.Project. - - Usage:: - - >>> project = client.get_project(123) - >>> project - - """ - return self.projects.get(parse_project_id(projectid)) - - def get_job(self, jobkey): - """Get Job with a given jobkey. - - :param jobkey: job key string in format 'project/spider/job', - where all the components are integers. - :return: :class:`Job` object. - :rtype: scrapinghub.client.Job. - - Usage:: - - >>> job = client.get_job('123/1/1') - >>> job - - """ - projectid = parse_job_key(jobkey).projectid - return self.projects.get(projectid).jobs.get(jobkey) - - def close(self, timeout=None): - """Close client instance. - - :param timeout: (optional) float timeout secs to stop everything - gracefully. - """ - self._hsclient.close(timeout=timeout) - - -class Projects(object): - """Collection of projects available to current user. - - Not a public constructor: use :class:`Scrapinghub` client instance to get - a :class:`Projects` instance. See :attr:`Scrapinghub.projects` attribute. - - Usage:: - - >>> client.projects - - """ - - def __init__(self, client): - self._client = client - - def get(self, projectid): - """Get project for a given project id. - - :param projectid: integer or string numeric project id. - :return: :class:`Project` object. - :rtype: scrapinghub.client.Project. - - Usage:: - - >>> project = client.projects.get(123) - >>> project - - """ - return Project(self._client, parse_project_id(projectid)) - - def list(self): - """Get list of projects available to current user. - - :return: a list of integer project ids. - - Usage:: - - >>> client.projects.list() - [123, 456] - """ - return self._client._connection.project_ids() - - def iter(self): - """Iterate through list of projects available to current user. - - Provided for the sake of API consistency. - """ - return iter(self.list()) - - def summary(self, **params): - """Get short summaries for all available user projects. - - :return: a list of dictionaries: each dictionary represents a project - summary (amount of pending/running/finished jobs and a flag if it - has a capacity to schedule new jobs). - - Usage:: - - >>> client.projects.summary() - [{'finished': 674, - 'has_capacity': True, - 'pending': 0, - 'project': 123, - 'running': 1}, - {'finished': 33079, - 'has_capacity': True, - 'pending': 0, - 'project': 456, - 'running': 2}] - """ - return self._client._hsclient.projects.jobsummaries(**params) - - -class Project(object): - """Class representing a project object and its resources. - - Not a public constructor: use :class:`ScrapinghubClient` instance or - :class:`Projects` instance to get a :class:`Project` instance. See - :meth:`Scrapinghub.get_project` or :meth:`Projects.get_project` methods. - - :ivar id: integer project id. - :ivar activity: :class:`Activity` resource object. - :ivar collections: :class:`Collections` resource object. - :ivar frontier: :class:`Frontier` resource object. - :ivar jobs: :class:`Jobs` resource object. - :ivar settings: :class:`Settings` resource object. - :ivar spiders: :class:`Spiders` resource object. - - Usage:: - - >>> project = client.get_project(123) - >>> project - - >>> project.key - '123' - """ - - def __init__(self, client, projectid): - self.key = str(projectid) - self._client = client - - # sub-resources - self.jobs = Jobs(client, projectid) - self.spiders = Spiders(client, projectid) - - # proxied sub-resources - self.activity = Activity(_Activity, client, projectid) - self.collections = Collections(_Collections, client, projectid) - self.frontiers = Frontiers(_HSFrontier, client, projectid) - self.settings = Settings(client._hsclient, projectid) - - -class Spiders(object): - """Class to work with a collection of project spiders. - - Not a public constructor: use :class:`Project` instance to get - a :class:`Spiders` instance. See :attr:`Project.spiders` attribute. - - :ivar projectid: integer project id. - - Usage:: - - >>> project.spiders - - """ - - def __init__(self, client, projectid): - self.projectid = projectid - self._client = client - - def get(self, spidername, **params): - """Get a spider object for a given spider name. - - The method gets/sets spider id (and checks if spider exists). - - :param spidername: a string spider name. - :return: :class:`Spider` object. - :rtype: scrapinghub.client.Spider. - - Usage:: - - >>> project.spiders.get('spider2') - - >>> project.spiders.get('non-existing') - NotFound: Spider non-existing doesn't exist. - """ - project = self._client._hsclient.get_project(self.projectid) - spiderid = project.ids.spider(spidername, **params) - if spiderid is None: - raise NotFound("Spider {} doesn't exist.".format(spidername)) - return Spider(self._client, self.projectid, spiderid, spidername) - - def list(self): - """Get a list of spiders for a project. - - :return: a list of dictionaries with spiders metadata. - - Usage:: # noqa - - >>> project.spiders.list() - [{'id': 'spider1', 'tags': [], 'type': 'manual', 'version': '123'}, - {'id': 'spider2', 'tags': [], 'type': 'manual', 'version': '123'}] - """ - project = self._client._connection[self.projectid] - return project.spiders() - - def iter(self): - """Iterate through a list of spiders for a project. - - Provided for the sake of API consistency. - """ - return iter(self.list()) - - -class Spider(object): - """Class representing a Spider object. - - Not a public constructor: use :class:`Spiders` instance to get - a :class:`Spider` instance. See :meth:`Spiders.get` method. - - :ivar projectid: integer project id. - :ivar name: a spider name string. - :ivar jobs: a collection of jobs, :class:`Jobs` object. - - Usage:: - - >>> spider = project.spiders.get('spider1') - >>> spider.key - '123/1' - >>> spider.name - 'spider1' - """ - - def __init__(self, client, projectid, spiderid, spidername): - self.projectid = projectid - self.key = '{}/{}'.format(str(projectid), str(spiderid)) - self._id = str(spiderid) - self.name = spidername - self.jobs = Jobs(client, projectid, self) - self._client = client - - @wrap_http_errors - def update_tags(self, add=None, remove=None): - params = get_tags_for_update(add=add, remove=remove) - path = 'v2/projects/{}/spiders/{}/tags'.format(self.projectid, - self._id) - url = urljoin(self._client._connection.url, path) - response = self._client._connection._session.patch(url, json=params) - response.raise_for_status() - - @wrap_http_errors - def list_tags(self): - path = 'v2/projects/{}/spiders/{}'.format(self.projectid, self._id) - url = urljoin(self._client._connection.url, path) - response = self._client._connection._session.get(url) - response.raise_for_status() - return response.json().get('tags', []) - - -class Jobs(object): - """Class representing a collection of jobs for a project/spider. - - Not a public constructor: use :class:`Project` instance or :class:`Spider` - instance to get a :class:`Jobs` instance. See :attr:`Project.jobs` and - :attr:`Spider.jobs` attributes. - - :ivar projectid: an integer project id. - :ivar spider: :class:`Spider` object if defined. - - Usage:: - - >>> project.jobs - - >>> spider = project.spiders.get('spider1') - >>> spider.jobs - - """ - - def __init__(self, client, projectid, spider=None): - self.projectid = projectid - self.spider = spider - self._client = client - self._project = client._hsclient.get_project(projectid) - - def count(self, **params): - """Count jobs for a given set of parameters. - - :param \*\*params: (optional) a set of filters to apply when counting - jobs (e.g. spider, state, has_tag, lacks_tag, startts and endts). - :return: jobs count. - - Usage:: - - >>> spider = project.spiders.get('spider1') - >>> spider.jobs.count() - 5 - >>> project.jobs.count(spider='spider2', state='finished') - 2 - """ - if self.spider: - params['spider'] = self.spider.name - return next(self._project.jobq.apiget(('count',), params=params)) - - def iter(self, **params): - """Iterate over jobs collection for a given set of params. - - :param \*\*params: (optional) a set of filters to apply when counting - jobs (e.g. spider, state, has_tag, lacks_tag, startts and endts). - :return: a generator object over a list of dictionaries of jobs summary - for a given filter params. - - Usage: - - - retrieve all jobs for a spider:: - - >>> spider.jobs.iter() - - - - get all job keys for a spider:: - - >>> jobs_summary = spider.jobs.iter() - >>> [job['key'] for job in jobs_summary] - ['123/1/3', '123/1/2', '123/1/1'] - - - job summary fieldset is less detailed than job.metadata but contains - few new fields as well. Additional fields can be requested using - ``jobmeta`` parameter. If it's used, then it's up to the user to list - all the required fields, so only few default fields would be added - except requested ones:: - - >>> jobs_summary = project.jobs.iter(jobmeta=['scheduled_by', ]) - - - by default :meth:`Jobs.iter` returns maximum last 1000 results. - Pagination is available using start parameter:: - - >>> jobs_summary = spider.jobs.iter(start=1000) - - - get jobs filtered by tags (list of tags has ``OR`` power):: - - >>> jobs_summary = project.jobs.iter( - ... has_tag=['new', 'verified'], lacks_tag='obsolete') - - - get certain number of last finished jobs per some spider:: - - >>> jobs_summary = project.jobs.iter( - ... spider='spider2', state='finished', count=3) - """ - if self.spider: - params['spider'] = self.spider.name - return self._project.jobq.list(**params) - - def list(self, **params): - """Convenient shortcut to list iter results. - - Please note that list() method can use a lot of memory and for a large - amount of jobs it's recommended to iterate through it via iter() - method (all params and available filters are same for both methods). - - """ - return list(self.iter(**params)) - - def schedule(self, spidername=None, **params): - """Schedule a new job and returns its jobkey. - - :param spidername: a spider name string - (not needed if job is scheduled via :attr:`Spider.jobs`). - :param \*\*params: (optional) additional keyword args. - :return: a jobkey string pointing to the new job. - - Usage:: - - >>> project.schedule('spider1', arg1='val1') - '123/1/1' - """ - if not spidername and not self.spider: - raise ValueError('Please provide spidername') - params['project'] = self.projectid - params['spider'] = spidername or self.spider.name - spider_args = params.pop('spider_args', None) - if spider_args: - if not isinstance(spider_args, dict): - raise ValueError("spider_args should be a dictionary") - cleaned_args = {k: v for k, v in spider_args.items() - if k not in params} - params.update(cleaned_args) - if 'job_settings' in params: - params['job_settings'] = json.dumps(params['job_settings']) - if 'meta' in params: - params['meta'] = json.dumps(params['meta']) - # FIXME improve to schedule multiple jobs - try: - response = self._client._connection._post( - 'schedule', 'json', params) - except InvalidUsage as exc: - if 'already scheduled' in str(exc): - raise DuplicateJobError(exc) - raise - return Job(self._client, response['jobid']) - - def get(self, jobkey): - """Get a Job with a given jobkey. - - :param jobkey: a string job key. - - jobkey's project component should match the project used to get - :class:`Jobs` instance, and jobkey's spider component should match - the spider (if :attr:`Spider.jobs` was used). - - :return: :class:`Job` object. - :rtype: scrapinghub.client.Job. - - Usage:: - - >>> job = project.jobs.get('123/1/2') - >>> job.key - '123/1/2' - """ - jobkey = parse_job_key(jobkey) - if jobkey.projectid != self.projectid: - raise ValueError('Please use same project id') - if self.spider and jobkey.spiderid != self.spider._id: - raise ValueError('Please use same spider id') - return Job(self._client, str(jobkey)) - - def summary(self, _queuename=None, **params): - """Get jobs summary (optionally by state). - - :param _queuename: (optional) a string state to filter jobs. - :param \*\*params: (optional) additional keyword args. - :return: a generator object over a list of dictionaries of jobs summary - for a given filter params grouped by job state. - - Usage:: - - >>> spider.jobs.summary() - [{'count': 0, 'name': 'pending', 'summary': []}, - {'count': 0, 'name': 'running', 'summary': []}, - {'count': 5, 'name': 'finished', 'summary': [...]} - - >>> project.jobs.summary('pending') - {'count': 0, 'name': 'pending', 'summary': []} - """ - spiderid = self._extract_spider_id(params) - return self._project.jobq.summary( - _queuename, spiderid=spiderid, **params) - - def iter_last(self, **params): - """Iterate through last jobs for each spider. - - :param \*\*params: (optional) keyword arguments to filter jobs. - :return: a generator object over a list of dictionaries of jobs summary - for a given filter params. - - Usage: - - - get all last job summaries for a project:: - - >>> project.jobs.iter_last() - - - - get last job summary for a a spider:: - - >>> list(spider.jobs.iter_last()) - [{'close_reason': 'success', - 'elapsed': 3062444, - 'errors': 1, - 'finished_time': 1482911633089, - 'key': '123/1/3', - 'logs': 8, - 'pending_time': 1482911596566, - 'running_time': 1482911598909, - 'spider': 'spider1', - 'state': 'finished', - 'ts': 1482911615830, - 'version': 'some-version'}] - """ - spiderid = self._extract_spider_id(params) - return self._project.spiders.lastjobsummary(spiderid, **params) - - def _extract_spider_id(self, params): - spiderid = params.pop('spiderid', None) - if not spiderid and self.spider: - return self.spider._id - elif spiderid and self.spider and str(spiderid) != self.spider._id: - raise ValueError('Please use same spider id') - return str(spiderid) if spiderid else None - - def update_tags(self, add=None, remove=None, spidername=None): - """Update tags for all existing spider jobs. - - :param add: (optional) list of tags to add to selected jobs. - :param remove: (optional) list of tags to remove from selected jobs. - :param spidername: spider name, must if used with :attr:`Project.jobs`. - - It's not allowed to update tags for all project jobs, so spider must be - specified (it's done implicitly when using :attr:`Spider.jobs`, or you - have to specify ``spidername`` param when using :attr:`Project.jobs`). - - :return: amount of jobs that were updated. - - Usage: - - - mark all spider jobs with tag ``consumed``:: - - >>> spider = project.spiders.get('spider1') - >>> spider.jobs.update_tags(add=['consumed']) - 5 - - - remove existing tag ``existing`` for all spider jobs:: - - >>> project.jobs.update_tags( - ... remove=['existing'], spidername='spider2') - 2 - """ - spidername = spidername or (self.spider.name if self.spider else None) - if not spidername: - raise ValueError('Please provide spidername') - params = get_tags_for_update(add_tag=add, remove_tag=remove) - if not params: - return - params.update({'project': self.projectid, 'spider': spidername}) - result = self._client._connection._post('jobs_update', 'json', params) - return result['count'] - - -class Job(object): - """Class representing a job object. - - Not a public constructor: use :class:`ScrapinghubClient` instance or - :class:`Jobs` instance to get a :class:`Job` instance. See - :meth:`ScrapinghubClient.get_job` and :meth:`Jobs.get` methods. - - :ivar projectid: in integer project id. - :ivar key: a job key. - :ivar items: :class:`Items` resource object. - :ivar logs: :class:`Logs` resource object. - :ivar requests: :class:`Requests` resource object. - :ivar samples: :class:`Samples` resource object. - :ivar metadata: :class:`Metadata` resource. - - Usage:: - - >>> job = project.job('123/1/2') - >>> job.key - '123/1/2' - >>> job.metadata['state'] - 'finished' - """ - def __init__(self, client, jobkey, metadata=None): - self.projectid = parse_job_key(jobkey).projectid - self.key = jobkey - - self._client = client - self._project = client._hsclient.get_project(self.projectid) - self._job = client._hsclient.get_job(jobkey) - - # proxied sub-resources - self.items = Items(_Items, client, jobkey) - self.logs = Logs(_Logs, client, jobkey) - self.requests = Requests(_Requests, client, jobkey) - self.samples = Samples(_Samples, client, jobkey) - - self.metadata = JobMeta(client._hsclient, jobkey, cached=metadata) - - def update_metadata(self, *args, **kwargs): - """Update job metadata. - - :param \*\*kwargs: keyword arguments representing job metadata - - Usage: - - - update job outcome:: - - >>> job.update_metadata(close_reason='custom reason') - - - change job tags:: - - >>> job.update_metadata({'tags': 'obsolete'}) - """ - self._job.update_metadata(*args, **kwargs) - - def update_tags(self, add=None, remove=None): - """Partially update job tags. - - It provides a convenient way to mark specific jobs (for better search, - postprocessing etc). - - :param add: (optional) list of tags to add - :param remove: (optional) list of tags to remove - :return: amount of jobs that were updated - - Usage: to mark a job with tag ``consumed``:: - - >>> job.update_tags(add=['consumed']) - """ - params = get_tags_for_update(add_tag=add, remove_tag=remove) - params.update({'project': self.projectid, 'job': self.key}) - result = self._client._connection._post('jobs_update', 'json', params) - return result['count'] - - def close_writers(self): - """Stop job batch writers threads gracefully. - - Called on :meth:`ScrapinghubClient.close` method. - """ - self._job.close_writers() - - def start(self, **params): - """Move job to running state. - - :param \*\*params: (optional) keyword meta parameters to update - :return: a previous string job state - - Usage:: - - >>> job.start() - 'pending' - """ - return self.update(state='running', **params) - - def finish(self, **params): - """Move running job to finished state. - - :param \*\*params: (optional) keyword meta parameters to update - :return: a previous string job state - - Usage:: - - >>> job.finish() - 'running' - """ - return self.update(state='finished', **params) - - def delete(self, **params): - """Mark finished job for deletion. - - :param \*\*params: (optional) keyword meta parameters to update - :return: a previous string job state - - Usage:: - - >>> job.delete() - 'finished' - """ - return self.update(state='deleted', **params) - - def update(self, **params): - """Update job state. - - :param \*\*params: (optional) keyword meta parameters to update - :return: a previous string job state - - Usage:: - - >>> job.update(state='finished') - 'running' - """ - try: - job = next(self._project.jobq.update(self, **params)) - return job['prevstate'] - except StopIteration: - raise NotFound("Job {} doesn't exist".format(self.key)) - - def cancel(self): - """Schedule a running job for cancellation. - - Usage:: - - >>> job.cancel() - >>> job.metadata['cancelled_by'] - 'John' - """ - self._project.jobq.request_cancel(self) - - def purge(self): - """Delete job and expire its local metadata. - - Usage:: - - >>> job.purge() - >>> job.metadata['state'] - 'deleted' - """ - self.delete() - self.metadata.expire() - - -class _Proxy(object): - """A helper to create a class instance and proxy its methods to origin. - - The internal proxy class is useful to link class attributes from its - origin depending on the origin base class as a part of init logic: - - - :class:`ItemsResourceType` provides items-based attributes to access - items in an arbitrary collection with get/write/flush/close/stats/iter - methods. - - - :class:`DownloadableResource` provides download-based attributes to - iter through collection with or without msgpack support. - """ - - def __init__(self, cls, client, key): - self.key = key - self._client = client - self._origin = cls(client._hsclient, key) - - if issubclass(cls, ItemsResourceType): - self._proxy_methods(['get', 'write', 'flush', 'close', - 'stats', ('iter', 'list')]) - # redefine write method to wrap hubstorage.ValueTooLarge error - origin_method = getattr(self, 'write') - setattr(self, 'write', wrap_value_too_large(origin_method)) - - # DType iter_values() has more priority than IType list() - # plus Collections interface doesn't need the iter methods - if issubclass(cls, DownloadableResource) and cls is not Collections: - methods = [('iter', 'iter_values'), - ('iter_raw_msgpack', 'iter_msgpack'), - ('iter_raw_json', 'iter_json')] - self._proxy_methods(methods) - self._wrap_iter_methods([method[0] for method in methods]) - - def _proxy_methods(self, methods): - """A little helper for cleaner interface.""" - proxy_methods(self._origin, self, methods) - - def _wrap_iter_methods(self, methods): - """Modify kwargs for all passed self.iter* methods.""" - for method in methods: - wrapped = wrap_kwargs(getattr(self, method), - self._modify_iter_params) - setattr(self, method, wrapped) - - def _modify_iter_params(self, params): - """Modify iter() params on-the-fly.""" - return format_iter_filters(params) - - def list(self, *args, **kwargs): - return list(self.iter(*args, **kwargs)) - - -class Logs(_Proxy): - """Representation of collection of job logs. - - Not a public constructor: use :class:`Job` instance to get a :class:`Logs` - instance. See :attr:`Job.logs` attribute. - - Please note that list() method can use a lot of memory and for a large - amount of logs it's recommended to iterate through it via iter() method - (all params and available filters are same for both methods). - - Usage: - - - retrieve all logs from a job:: - - >>> job.logs.iter() - - - - iterate through first 100 log entries and print them:: - - >>> for log in job.logs.iter(count=100): - >>> ... print(log) - - - retrieve a single log entry from a job:: - - >>> job.logs.list(count=1) - [{ - 'level': 20, - 'message': '[scrapy.core.engine] Closing spider (finished)', - 'time': 1482233733976, - }] - - - retrive logs with a given log level and filter by a word - - >>> filters = [("message", "contains", ["logger"])] - >>> job.logs.list(level='WARNING', filter=filters) - [{ - 'level': 30, - 'message': 'Some warning message', - 'time': 1486375511188, - }] - """ - - def __init__(self, *args, **kwargs): - super(Logs, self).__init__(*args, **kwargs) - self._proxy_methods(['log', 'debug', 'info', 'warning', 'warn', - 'error', 'batch_write_start']) - - def _modify_iter_params(self, params): - """Modify iter() filters on-the-fly. - - - convert offset to start parameter - - check log level and create a corresponding meta filter - - :param params: an original dictionary with params - :return: a modified dictionary with params - """ - params = super(Logs, self)._modify_iter_params(params) - offset = params.pop('offset', None) - if offset: - params['start'] = '{}/{}'.format(self.key, offset) - level = params.pop('level', None) - if level: - minlevel = getattr(LogLevel, level, None) - if minlevel is None: - raise ValueError("Unknown log level: {}".format(level)) - level_filter = json.dumps(['level', '>=', [minlevel]]) - # there can already be some filters handled by super class method - params['filter'] = params.get('filter', []) + [level_filter] - return params - - -class Items(_Proxy): - """Representation of collection of job items. - - Not a public constructor: use :class:`Job` instance to get a :class:`Items` - instance. See :attr:`Job.items` attribute. - - Please note that list() method can use a lot of memory and for a large - amount of items it's recommended to iterate through it via iter() method - (all params and available filters are same for both methods). - - Usage: - - - retrieve all scraped items from a job:: - - >>> job.items.iter() - - - - iterate through first 100 items and print them:: - - >>> for log in job.logs.iter(count=100): - >>> ... print(log) - - - retrieve items with timestamp greater or equal to given timestamp - (item here is an arbitrary dictionary depending on your code):: - - >>> job.items.list(startts=1447221694537) - [{ - 'name': ['Some custom item'], - 'url': 'http://some-url/item.html', - 'size': 100000, - }] - - - retrieve 1 item with multiple filters: - >>> filters = [("size", ">", [30000]), ("size", "<", [40000])] - >>> job.items.list(count=1, filter=filters) - [{ - 'name': ['Some other item'], - 'url': 'http://some-url/other-item.html', - 'size': 50000, - }] - """ - - def _modify_iter_params(self, params): - """Modify iter filter to convert offset to start parameter. - - Returns: - dict: updated set of params - """ - params = super(Items, self)._modify_iter_params(params) - offset = params.pop('offset', None) - if offset: - params['start'] = '{}/{}'.format(self.key, offset) - return params - - -class Requests(_Proxy): - """Representation of collection of job requests. - - Not a public constructor: use :class:`Job` instance to get a - :class:`Requests` instance. See :attr:`Job.requests` attribute. - - Please note that list() method can use a lot of memory and for a large - amount of requests it's recommended to iterate through it via iter() - method (all params and available filters are same for both methods). - - Usage: - - - retrieve all requests from a job:: - - >>> job.requests.iter() - - - - iterate through the requests:: - - >>> for reqitem in job.requests.iter(count=1): - ... print(reqitem['time']) - 1482233733870 - - - retrieve single request from a job:: - - >>> job.requests.list(count=1) - [{ - 'duration': 354, - 'fp': '6d748741a927b10454c83ac285b002cd239964ea', - 'method': 'GET', - 'rs': 1270, - 'status': 200,a - 'time': 1482233733870, - 'url': 'https://example.com' - }] - """ - def __init__(self, *args, **kwargs): - super(Requests, self).__init__(*args, **kwargs) - self._proxy_methods(['add']) - - -class Samples(_Proxy): - """Representation of collection of job samples. - - Not a public constructor: use :class:`Job` instance to get a - :class:`Samples` instance. See :attr:`Job.samples` attribute. - - Please note that list() method can use a lot of memory and for a large - amount of samples it's recommended to iterate through it via iter() - method (all params and available filters are same for both methods). - - Usage: - - - retrieve all samples from a job:: - - >>> job.samples.iter() - - - - retrieve samples with timestamp greater or equal to given timestamp:: - - >>> job.samples.list(startts=1484570043851) - [[1484570043851, 554, 576, 1777, 821, 0], - [1484570046673, 561, 583, 1782, 821, 0]] - """ - - -class Activity(_Proxy): - """Representation of collection of job activity events. - - Not a public constructor: use :class:`Project` instance to get a - :class:`Activity` instance. See :attr:`Project.activity` attribute. - - Please note that list() method can use a lot of memory and for a large - amount of activities it's recommended to iterate through it via iter() - method (all params and available filters are same for both methods). - - Usage: - - - get all activity from a project:: - - >>> project.activity.iter() - - - - get only last 2 events from a project:: - - >>> project.activity.list(count=2) - [{'event': 'job:completed', 'job': '123/2/3', 'user': 'jobrunner'}, - {'event': 'job:started', 'job': '123/2/3', 'user': 'john'}] - - - post a new event:: - - >>> event = {'event': 'job:completed', - 'job': '123/2/4', - 'user': 'jobrunner'} - >>> project.activity.add(event) - - - post multiple events at once:: - - >>> events = [ - {'event': 'job:completed', 'job': '123/2/5', 'user': 'jobrunner'}, - {'event': 'job:cancelled', 'job': '123/2/6', 'user': 'john'}, - ] - >>> project.activity.add(events) - - """ - def __init__(self, *args, **kwargs): - super(Activity, self).__init__(*args, **kwargs) - self._proxy_methods([('iter', 'list')]) - self._wrap_iter_methods(['iter']) - - def add(self, values, **kwargs): - if not isinstance(values, list): - values = list(values) - for activity in values: - if not isinstance(activity, dict): - raise ValueError("Please pass events as dictionaries") - jobkey = activity.get('job') - if jobkey and parse_job_key(jobkey).projectid != self.key: - raise ValueError('Please use same project id') - self._origin.post(values, **kwargs) - - -class _HSFrontier(_Frontier): - """Modified hubstorage Frontier with newcount per slot.""" - - def __init__(self, *args, **kwargs): - super(_HSFrontier, self).__init__(*args, **kwargs) - self.newcount = defaultdict(int) - - def _get_writer(self, frontier, slot): - key = (frontier, slot) - writer = self._writers.get(key) - if not writer: - writer = self.client.batchuploader.create_writer( - url=urlpathjoin(self.url, frontier, 's', slot), - auth=self.auth, - size=self.batch_size, - start=self.batch_start, - interval=self.batch_interval, - qsize=self.batch_qsize, - content_encoding=self.batch_content_encoding, - callback=partial(self._writer_callback, key), - ) - self._writers[key] = writer - return writer - - def _writer_callback(self, key, response): - self.newcount[key] += response.json()["newcount"] - - -class Frontiers(_Proxy): - """Frontiers collection for a project. - - Not a public constructor: use :class:`Project` instance to get a - :class:`Frontiers` instance. See :attr:`Project.frontiers` attribute. - - Usage: - - - get all frontiers from a project:: - >>> project.frontiers.iter() - - - - list all frontiers - >>> project.frontiers.list() - ['test', 'test1', 'test2'] - - - get a frontier by name - >>> project.frontiers.get('test') - - - - flush data of all frontiers of a project - >>> project.frontiers.flush() - - - show amount of new requests added for all frontiers - >>> project.frontiers.newcount - 3 - - - close batch writers of all frontiers of a project - >>> project.frontiers.close() - """ - def __init__(self, *args, **kwargs): - super(Frontiers, self).__init__(*args, **kwargs) - self._proxy_methods(['close', 'flush']) - - def get(self, name): - """Get a frontier by name.""" - return Frontier(self._client, self, name) - - def iter(self): - """Iterate through frontiers.""" - return iter(self.list()) - - def list(self): - """List frontiers.""" - return next(self._origin.apiget('list')) - - @property - def newcount(self): - return sum(self._origin.newcount.values()) - - -class Frontier(object): - """Representation of a frontier object. - - Not a public constructor: use :class:`Frontiers` instance to get a - :class:`Frontier` instance. See :meth:`Frontiers.get` method. - - Usage: - - - get iterator with all slots - >>> frontier.iter() - - - - list all slots - >>> frontier.list() - ['example.com', 'example.com2'] - - - get a slot by name - >>> frontier.get('example.com') - - - - flush frontier data - >>> frontier.flush() - - - show amount of new requests added to frontier - >>> frontier.newcount - 3 - """ - def __init__(self, client, frontiers, name): - self.key = name - self._client = client - self._frontiers = frontiers - - def get(self, slot): - """Get a slot by name.""" - return FrontierSlot(self._client, self, slot) - - def iter(self): - """Iterate through slots.""" - return iter(self.list()) - - def list(self): - """List all slots.""" - return next(self._frontiers._origin.apiget((self.key, 'list'))) - - def flush(self): - """Flush data for a whole frontier.""" - writers = self._frontiers._origin._writers - for (fname, _), writer in writers.items(): - if fname == self.key: - writer.flush() - - @property - def newcount(self): - newcount_values = self._frontiers._origin.newcount - return sum(v for (frontier, _), v in newcount_values.items() - if frontier == self.key) - - -class FrontierSlot(object): - """Representation of a frontier slot object. - - Not a public constructor: use :class:`Frontier` instance to get a - :class:`FrontierSlot` instance. See :meth:`Frontier.get` method. - - Usage: - - - add request to a queue - >>> data = [{'fp': 'page1.html', 'p': 1, 'qdata': {'depth': 1}}] - >>> slot.q.add('example.com', data) - - - add fingerprints to a slot - >>> slot.f.add(['fp1', 'fp2']) - - - flush data for a slot - >>> slot.flush() - - - show amount of new requests added to a slot - >>> slot.newcount - 2 - - - read requests from a slot - >>> slot.q.iter() - - >>> slot.q.list() - [{'id': '0115a8579633600006', - 'requests': [['page1.html', {'depth': 1}]]}] - - - read fingerprints from a slot - >>> slot.f.iter() - - >>> slot.f.list() - ['page1.html'] - - - delete a batch with requests from a slot - >>> slot.q.delete('0115a8579633600006') - - - delete a whole slot - >>> slot.delete() - - """ - def __init__(self, client, frontier, slot): - self.key = slot - self._client = client - self._frontier = frontier - self.fingerprints = FrontierSlotFingerprints(self) - self.queue = FrontierSlotQueue(self) - - @property - def f(self): - return self.fingerprints - - @property - def q(self): - return self.queue - - def delete(self): - """Delete the slot.""" - origin = self._frontier._frontiers._origin - origin.delete_slot(self._frontier.key, self.key) - origin.newcount.pop((self._frontier.key, self.key), None) - - def flush(self): - """Flush data for the slot.""" - writers = self._frontier._frontiers._origin._writers - writer = writers.get((self._frontier.key, self.key)) - if writer: - writer.flush() - - @property - def newcount(self): - newcount_values = self._frontier._frontiers._origin.newcount - return newcount_values.get((self._frontier.key, self.key), 0) - - -class FrontierSlotFingerprints(object): - - def __init__(self, slot): - self.key = slot.key - self._frontier = slot._frontier - self._slot = slot - - def add(self, fps): - origin = self._frontier._frontiers._origin - writer = origin._get_writer(self._frontier.key, self.key) - fps = list(fps) if not isinstance(fps, list) else fps - if not all(isinstance(fp, string_types) for fp in fps): - raise ValueError('Fingerprint should be of a string type') - for fp in fps: - writer.write({'fp': fp}) - - def iter(self, **kwargs): - """Iterate through fingerprints in the slot.""" - origin = self._frontier._frontiers._origin - path = (self._frontier.key, 's', self.key, 'f') - for fp in origin.apiget(path, params=kwargs): - yield fp.get('fp') - - def list(self, **kwargs): - """List fingerprints in the slot.""" - return list(self.iter(**kwargs)) - - -class FrontierSlotQueue(object): - - def __init__(self, slot): - self.key = slot.key - self._frontier = slot._frontier - self._slot = slot - - def add(self, fps): - """Add requests to the queue.""" - origin = self._frontier._frontiers._origin - return origin.add(self._frontier.key, self.key, fps) - - def iter(self, **kwargs): - """Iterate through batches in the queue.""" - origin = self._frontier._frontiers._origin - path = (self._frontier.key, 's', self.key, 'q') - return origin.apiget(path, params=kwargs) - - def list(self, **kwargs): - """List request batches in the queue.""" - return list(self.iter(**kwargs)) - - def delete(self, ids): - """Delete request batches from the queue.""" - origin = self._frontier._frontiers._origin - return origin.delete(self._frontier.key, self.key, ids) - - -class Collections(_Proxy): - """Access to project collections. - - Not a public constructor: use :class:`Project` instance to get a - :class:`Collections` instance. See :attr:`Project.collections` attribute. - - Usage:: - - >>> collections = project.collections - >>> collections.list() - [{'name': 'Pages', 'type': 's'}] - >>> foo_store = collections.get_store('foo_store') - """ - - def get(self, coltype, colname): - """Base method to get a collection with a given type and name.""" - self._origin._validate_collection(coltype, colname) - return Collection(self._client, self, coltype, colname) - - def get_store(self, colname): - return self.get('s', colname) - - def get_cached_store(self, colname): - return self.get('cs', colname) - - def get_versioned_store(self, colname): - return self.get('vs', colname) - - def get_versioned_cached_store(self, colname): - return self.get('vcs', colname) - - def iter(self): - """Iterate through collections of a project.""" - return self._origin.apiget('list') - - def list(self): - """List collections of a project.""" - return list(self.iter()) - - -class Collection(object): - """Representation of a project collection object. - - Not a public constructor: use :class:`Collections` instance to get a - :class:`Collection` instance. See :meth:`Collections.get_store` and - similar methods. # noqa - - Usage: - - - add a new item to collection:: - - >>> foo_store.set({'_key': '002d050ee3ff6192dcbecc4e4b4457d7', - 'value': '1447221694537'}) - - - count items in collection:: - - >>> foo_store.count() - 1 - - - get an item from collection:: - - >>> foo_store.get('002d050ee3ff6192dcbecc4e4b4457d7') - {'value': '1447221694537'} - - - get all items from collection:: - - >>> foo_store.iter() - - - - iterate iterate over _key & value pair:: - - >>> for elem in foo_store.iter(count=1)): - >>> ... print(elem) - [{'_key': '002d050ee3ff6192dcbecc4e4b4457d7', - 'value': '1447221694537'}] - - - filter by multiple keys, only values for keys that exist will be returned:: - - >>> foo_store.list(key=['002d050ee3ff6192dcbecc4e4b4457d7', 'blah']) - [{'_key': '002d050ee3ff6192dcbecc4e4b4457d7', 'value': '1447221694537'}] - - - delete an item by key:: - - >>> foo_store.delete('002d050ee3ff6192dcbecc4e4b4457d7') - """ - - def __init__(self, client, collections, coltype, colname): - self._client = client - self._origin = _Collection(coltype, colname, collections._origin) - proxy_methods(self._origin, self, [ - 'create_writer', 'count', - ('iter', 'iter_values'), - ('iter_raw_json', 'iter_json'), - ]) - # simplified version of _Proxy._wrap_iter_methods logic - # to provide better support for filter param in iter methods - for method in ['iter', 'iter_raw_json']: - wrapped = wrap_kwargs(getattr(self, method), format_iter_filters) - setattr(self, method, wrapped) - - def list(self, *args, **kwargs): - """Convenient shortcut to list iter results. - - Please note that list() method can use a lot of memory and for a large - amount of elements it's recommended to iterate through it via iter() - method (all params and available filters are same for both methods). - """ - return list(self.iter(*args, **kwargs)) - - def get(self, key, *args, **kwargs): - """Get item from collection by key. - - :param key: string item key - :return: an item dictionary if exists - """ - if key is None: - raise ValueError("key cannot be None") - return self._origin.get(key, *args, **kwargs) - - def set(self, *args, **kwargs): - """Set item to collection by key. - - The method returns None (original method returns an empty generator). - """ - self._origin.set(*args, **kwargs) - - def delete(self, keys): - """Delete item(s) from collection by key(s). - - The method returns None (original method returns an empty generator). - """ - if (not isinstance(keys, string_types) and - not isinstance(keys, collections.Iterable)): - raise ValueError("You should provide string key or iterable " - "object providing string keys") - self._origin.delete(keys) diff --git a/scrapinghub/client/__init__.py b/scrapinghub/client/__init__.py new file mode 100644 index 00000000..a0201aad --- /dev/null +++ b/scrapinghub/client/__init__.py @@ -0,0 +1,94 @@ +from scrapinghub import Connection as _Connection +from scrapinghub import HubstorageClient as _HubstorageClient + +from .projects import Projects +from .exceptions import wrap_http_errors + +from .utils import parse_auth +from .utils import parse_project_id, parse_job_key + + +__all__ = ['ScrapinghubClient'] + + +class Connection(_Connection): + + @wrap_http_errors + def _request(self, *args, **kwargs): + return super(Connection, self)._request(*args, **kwargs) + + +class HubstorageClient(_HubstorageClient): + + @wrap_http_errors + def request(self, *args, **kwargs): + return super(HubstorageClient, self).request(*args, **kwargs) + + +class ScrapinghubClient(object): + """Main class to work with Scrapinghub API. + + :param auth: Scrapinghub APIKEY or other SH auth credentials. + :param dash_endpoint: (optional) Scrapinghub Dash panel url. + :param \*\*kwargs: (optional) Additional arguments for + :class:`scrapinghub.hubstorage.HubstorageClient` constructor. + + :ivar projects: projects collection, :class:`Projects` instance. + + Usage:: + + >>> from scrapinghub import ScrapinghubClient + >>> client = ScrapinghubClient('APIKEY') + >>> client + + """ + + def __init__(self, auth=None, dash_endpoint=None, **kwargs): + self.projects = Projects(self) + login, password = parse_auth(auth) + self._connection = Connection(apikey=login, + password=password, + url=dash_endpoint) + self._hsclient = HubstorageClient(auth=(login, password), **kwargs) + + def get_project(self, projectid): + """Get :class:`Project` instance with a given project id. + + The method is a shortcut for client.projects.get(). + + :param projectid: integer or string numeric project id. + :return: :class:`Project` object. + :rtype: scrapinghub.client.Project. + + Usage:: + + >>> project = client.get_project(123) + >>> project + + """ + return self.projects.get(parse_project_id(projectid)) + + def get_job(self, jobkey): + """Get Job with a given jobkey. + + :param jobkey: job key string in format 'project/spider/job', + where all the components are integers. + :return: :class:`Job` object. + :rtype: scrapinghub.client.Job. + + Usage:: + + >>> job = client.get_job('123/1/1') + >>> job + + """ + projectid = parse_job_key(jobkey).projectid + return self.projects.get(projectid).jobs.get(jobkey) + + def close(self, timeout=None): + """Close client instance. + + :param timeout: (optional) float timeout secs to stop everything + gracefully. + """ + self._hsclient.close(timeout=timeout) diff --git a/scrapinghub/client/activity.py b/scrapinghub/client/activity.py new file mode 100644 index 00000000..e561b23f --- /dev/null +++ b/scrapinghub/client/activity.py @@ -0,0 +1,60 @@ +from __future__ import absolute_import + +from .utils import _Proxy +from .utils import parse_job_key + + +class Activity(_Proxy): + """Representation of collection of job activity events. + + Not a public constructor: use :class:`Project` instance to get a + :class:`Activity` instance. See :attr:`Project.activity` attribute. + + Please note that list() method can use a lot of memory and for a large + amount of activities it's recommended to iterate through it via iter() + method (all params and available filters are same for both methods). + + Usage: + + - get all activity from a project:: + + >>> project.activity.iter() + + + - get only last 2 events from a project:: + + >>> project.activity.list(count=2) + [{'event': 'job:completed', 'job': '123/2/3', 'user': 'jobrunner'}, + {'event': 'job:started', 'job': '123/2/3', 'user': 'john'}] + + - post a new event:: + + >>> event = {'event': 'job:completed', + 'job': '123/2/4', + 'user': 'jobrunner'} + >>> project.activity.add(event) + + - post multiple events at once:: + + >>> events = [ + {'event': 'job:completed', 'job': '123/2/5', 'user': 'jobrunner'}, + {'event': 'job:cancelled', 'job': '123/2/6', 'user': 'john'}, + ] + >>> project.activity.add(events) + + """ + def __init__(self, *args, **kwargs): + super(Activity, self).__init__(*args, **kwargs) + self._proxy_methods([('iter', 'list')]) + self._wrap_iter_methods(['iter']) + + def add(self, values, **kwargs): + if not isinstance(values, list): + values = list(values) + for activity in values: + if not isinstance(activity, dict): + raise ValueError("Please pass events as dictionaries") + jobkey = activity.get('job') + if jobkey and parse_job_key(jobkey).projectid != self.key: + raise ValueError('Please use same project id') + self._origin.post(values, **kwargs) diff --git a/scrapinghub/client/collections.py b/scrapinghub/client/collections.py new file mode 100644 index 00000000..aa56f0a0 --- /dev/null +++ b/scrapinghub/client/collections.py @@ -0,0 +1,154 @@ +from __future__ import absolute_import +import collections + +from six import string_types + +from ..hubstorage.collectionsrt import Collection as _Collection + +from .utils import _Proxy +from .utils import format_iter_filters +from .utils import proxy_methods +from .utils import wrap_kwargs + + +class Collections(_Proxy): + """Access to project collections. + + Not a public constructor: use :class:`Project` instance to get a + :class:`Collections` instance. See :attr:`Project.collections` attribute. + + Usage:: + + >>> collections = project.collections + >>> collections.list() + [{'name': 'Pages', 'type': 's'}] + >>> foo_store = collections.get_store('foo_store') + """ + + def get(self, coltype, colname): + """Base method to get a collection with a given type and name.""" + self._origin._validate_collection(coltype, colname) + return Collection(self._client, self, coltype, colname) + + def get_store(self, colname): + return self.get('s', colname) + + def get_cached_store(self, colname): + return self.get('cs', colname) + + def get_versioned_store(self, colname): + return self.get('vs', colname) + + def get_versioned_cached_store(self, colname): + return self.get('vcs', colname) + + def iter(self): + """Iterate through collections of a project.""" + return self._origin.apiget('list') + + def list(self): + """List collections of a project.""" + return list(self.iter()) + + +class Collection(object): + """Representation of a project collection object. + + Not a public constructor: use :class:`Collections` instance to get a + :class:`Collection` instance. See :meth:`Collections.get_store` and + similar methods. # noqa + + Usage: + + - add a new item to collection:: + + >>> foo_store.set({'_key': '002d050ee3ff6192dcbecc4e4b4457d7', + 'value': '1447221694537'}) + + - count items in collection:: + + >>> foo_store.count() + 1 + + - get an item from collection:: + + >>> foo_store.get('002d050ee3ff6192dcbecc4e4b4457d7') + {'value': '1447221694537'} + + - get all items from collection:: + + >>> foo_store.iter() + + + - iterate iterate over _key & value pair:: + + >>> for elem in foo_store.iter(count=1)): + >>> ... print(elem) + [{'_key': '002d050ee3ff6192dcbecc4e4b4457d7', + 'value': '1447221694537'}] + + - filter by multiple keys, only values for keys that exist will be returned:: + + >>> foo_store.list(key=['002d050ee3ff6192dcbecc4e4b4457d7', 'blah']) + [{'_key': '002d050ee3ff6192dcbecc4e4b4457d7', 'value': '1447221694537'}] + + - delete an item by key:: + + >>> foo_store.delete('002d050ee3ff6192dcbecc4e4b4457d7') + """ + + def __init__(self, client, collections, coltype, colname): + self._client = client + self._origin = _Collection(coltype, colname, collections._origin) + proxy_methods(self._origin, self, [ + 'create_writer', 'count', + ('iter', 'iter_values'), + ('iter_raw_json', 'iter_json'), + ]) + # simplified version of _Proxy._wrap_iter_methods logic + # to provide better support for filter param in iter methods + for method in ['iter', 'iter_raw_json']: + wrapped = wrap_kwargs(getattr(self, method), format_iter_filters) + setattr(self, method, wrapped) + + def list(self, *args, **kwargs): + """Convenient shortcut to list iter results. + + Please note that list() method can use a lot of memory and for a large + amount of elements it's recommended to iterate through it via iter() + method (all params and available filters are same for both methods). + """ + return list(self.iter(*args, **kwargs)) + + def get(self, key, *args, **kwargs): + """Get item from collection by key. + + :param key: string item key + :return: an item dictionary if exists + """ + if key is None: + raise ValueError("key cannot be None") + return self._origin.get(key, *args, **kwargs) + + def set(self, *args, **kwargs): + """Set item to collection by key. + + The method returns None (original method returns an empty generator). + """ + self._origin.set(*args, **kwargs) + + def delete(self, keys): + """Delete item(s) from collection by key(s). + + The method returns None (original method returns an empty generator). + """ + if (not isinstance(keys, string_types) and + not isinstance(keys, collections.Iterable)): + raise ValueError("You should provide string key or iterable " + "object providing string keys") + self._origin.delete(keys) + + def iter_raw_msgpack(self, requests_params=None, **apiparams): + return self._origin._collections.iter_msgpack( + self._origin.coltype, self._origin.colname, + requests_params=requests_params, **apiparams) diff --git a/scrapinghub/exceptions.py b/scrapinghub/client/exceptions.py similarity index 95% rename from scrapinghub/exceptions.py rename to scrapinghub/client/exceptions.py index c0de28d2..6aadffbb 100644 --- a/scrapinghub/exceptions.py +++ b/scrapinghub/client/exceptions.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- +from __future__ import absolute_import from functools import wraps from requests import HTTPError -from .legacy import APIError -from .hubstorage import ValueTooLarge as _ValueTooLarge +from ..legacy import APIError +from ..hubstorage import ValueTooLarge as _ValueTooLarge def _get_http_error_msg(exc): diff --git a/scrapinghub/client/frontiers.py b/scrapinghub/client/frontiers.py new file mode 100644 index 00000000..684e1743 --- /dev/null +++ b/scrapinghub/client/frontiers.py @@ -0,0 +1,279 @@ +from __future__ import absolute_import +from functools import partial +from collections import defaultdict + +from six import string_types + +from ..hubstorage.frontier import Frontier as _Frontier +from ..hubstorage.utils import urlpathjoin + +from .utils import _Proxy + + +class _HSFrontier(_Frontier): + """Modified hubstorage Frontier with newcount per slot.""" + + def __init__(self, *args, **kwargs): + super(_HSFrontier, self).__init__(*args, **kwargs) + self.newcount = defaultdict(int) + + def _get_writer(self, frontier, slot): + key = (frontier, slot) + writer = self._writers.get(key) + if not writer: + writer = self.client.batchuploader.create_writer( + url=urlpathjoin(self.url, frontier, 's', slot), + auth=self.auth, + size=self.batch_size, + start=self.batch_start, + interval=self.batch_interval, + qsize=self.batch_qsize, + content_encoding=self.batch_content_encoding, + callback=partial(self._writer_callback, key), + ) + self._writers[key] = writer + return writer + + def _writer_callback(self, key, response): + self.newcount[key] += response.json()["newcount"] + + +class Frontiers(_Proxy): + """Frontiers collection for a project. + + Not a public constructor: use :class:`Project` instance to get a + :class:`Frontiers` instance. See :attr:`Project.frontiers` attribute. + + Usage: + + - get all frontiers from a project:: + >>> project.frontiers.iter() + + + - list all frontiers + >>> project.frontiers.list() + ['test', 'test1', 'test2'] + + - get a frontier by name + >>> project.frontiers.get('test') + + + - flush data of all frontiers of a project + >>> project.frontiers.flush() + + - show amount of new requests added for all frontiers + >>> project.frontiers.newcount + 3 + + - close batch writers of all frontiers of a project + >>> project.frontiers.close() + """ + def __init__(self, *args, **kwargs): + super(Frontiers, self).__init__(*args, **kwargs) + self._proxy_methods(['close', 'flush']) + + def get(self, name): + """Get a frontier by name.""" + return Frontier(self._client, self, name) + + def iter(self): + """Iterate through frontiers.""" + return iter(self.list()) + + def list(self): + """List frontiers.""" + return next(self._origin.apiget('list')) + + @property + def newcount(self): + return sum(self._origin.newcount.values()) + + +class Frontier(object): + """Representation of a frontier object. + + Not a public constructor: use :class:`Frontiers` instance to get a + :class:`Frontier` instance. See :meth:`Frontiers.get` method. + + Usage: + + - get iterator with all slots + >>> frontier.iter() + + + - list all slots + >>> frontier.list() + ['example.com', 'example.com2'] + + - get a slot by name + >>> frontier.get('example.com') + + + - flush frontier data + >>> frontier.flush() + + - show amount of new requests added to frontier + >>> frontier.newcount + 3 + """ + def __init__(self, client, frontiers, name): + self.key = name + self._client = client + self._frontiers = frontiers + + def get(self, slot): + """Get a slot by name.""" + return FrontierSlot(self._client, self, slot) + + def iter(self): + """Iterate through slots.""" + return iter(self.list()) + + def list(self): + """List all slots.""" + return next(self._frontiers._origin.apiget((self.key, 'list'))) + + def flush(self): + """Flush data for a whole frontier.""" + writers = self._frontiers._origin._writers + for (fname, _), writer in writers.items(): + if fname == self.key: + writer.flush() + + @property + def newcount(self): + newcount_values = self._frontiers._origin.newcount + return sum(v for (frontier, _), v in newcount_values.items() + if frontier == self.key) + + +class FrontierSlot(object): + """Representation of a frontier slot object. + + Not a public constructor: use :class:`Frontier` instance to get a + :class:`FrontierSlot` instance. See :meth:`Frontier.get` method. + + Usage: + + - add request to a queue + >>> data = [{'fp': 'page1.html', 'p': 1, 'qdata': {'depth': 1}}] + >>> slot.q.add('example.com', data) + + - add fingerprints to a slot + >>> slot.f.add(['fp1', 'fp2']) + + - flush data for a slot + >>> slot.flush() + + - show amount of new requests added to a slot + >>> slot.newcount + 2 + + - read requests from a slot + >>> slot.q.iter() + + >>> slot.q.list() + [{'id': '0115a8579633600006', + 'requests': [['page1.html', {'depth': 1}]]}] + + - read fingerprints from a slot + >>> slot.f.iter() + + >>> slot.f.list() + ['page1.html'] + + - delete a batch with requests from a slot + >>> slot.q.delete('0115a8579633600006') + + - delete a whole slot + >>> slot.delete() + + """ + def __init__(self, client, frontier, slot): + self.key = slot + self._client = client + self._frontier = frontier + self.fingerprints = FrontierSlotFingerprints(self) + self.queue = FrontierSlotQueue(self) + + @property + def f(self): + return self.fingerprints + + @property + def q(self): + return self.queue + + def delete(self): + """Delete the slot.""" + origin = self._frontier._frontiers._origin + origin.delete_slot(self._frontier.key, self.key) + origin.newcount.pop((self._frontier.key, self.key), None) + + def flush(self): + """Flush data for the slot.""" + writers = self._frontier._frontiers._origin._writers + writer = writers.get((self._frontier.key, self.key)) + if writer: + writer.flush() + + @property + def newcount(self): + newcount_values = self._frontier._frontiers._origin.newcount + return newcount_values.get((self._frontier.key, self.key), 0) + + +class FrontierSlotFingerprints(object): + + def __init__(self, slot): + self.key = slot.key + self._frontier = slot._frontier + self._slot = slot + + def add(self, fps): + origin = self._frontier._frontiers._origin + writer = origin._get_writer(self._frontier.key, self.key) + fps = list(fps) if not isinstance(fps, list) else fps + if not all(isinstance(fp, string_types) for fp in fps): + raise ValueError('Fingerprint should be of a string type') + for fp in fps: + writer.write({'fp': fp}) + + def iter(self, **kwargs): + """Iterate through fingerprints in the slot.""" + origin = self._frontier._frontiers._origin + path = (self._frontier.key, 's', self.key, 'f') + for fp in origin.apiget(path, params=kwargs): + yield fp.get('fp') + + def list(self, **kwargs): + """List fingerprints in the slot.""" + return list(self.iter(**kwargs)) + + +class FrontierSlotQueue(object): + + def __init__(self, slot): + self.key = slot.key + self._frontier = slot._frontier + self._slot = slot + + def add(self, fps): + """Add requests to the queue.""" + origin = self._frontier._frontiers._origin + return origin.add(self._frontier.key, self.key, fps) + + def iter(self, **kwargs): + """Iterate through batches in the queue.""" + origin = self._frontier._frontiers._origin + path = (self._frontier.key, 's', self.key, 'q') + return origin.apiget(path, params=kwargs) + + def list(self, **kwargs): + """List request batches in the queue.""" + return list(self.iter(**kwargs)) + + def delete(self, ids): + """Delete request batches from the queue.""" + origin = self._frontier._frontiers._origin + return origin.delete(self._frontier.key, self.key, ids) diff --git a/scrapinghub/client/items.py b/scrapinghub/client/items.py new file mode 100644 index 00000000..3a2dea5c --- /dev/null +++ b/scrapinghub/client/items.py @@ -0,0 +1,58 @@ +from __future__ import absolute_import + +from .utils import _Proxy + + +class Items(_Proxy): + """Representation of collection of job items. + + Not a public constructor: use :class:`Job` instance to get a :class:`Items` + instance. See :attr:`Job.items` attribute. + + Please note that list() method can use a lot of memory and for a large + amount of items it's recommended to iterate through it via iter() method + (all params and available filters are same for both methods). + + Usage: + + - retrieve all scraped items from a job:: + + >>> job.items.iter() + + + - iterate through first 100 items and print them:: + + >>> for log in job.logs.iter(count=100): + >>> ... print(log) + + - retrieve items with timestamp greater or equal to given timestamp + (item here is an arbitrary dictionary depending on your code):: + + >>> job.items.list(startts=1447221694537) + [{ + 'name': ['Some custom item'], + 'url': 'http://some-url/item.html', + 'size': 100000, + }] + + - retrieve 1 item with multiple filters: + >>> filters = [("size", ">", [30000]), ("size", "<", [40000])] + >>> job.items.list(count=1, filter=filters) + [{ + 'name': ['Some other item'], + 'url': 'http://some-url/other-item.html', + 'size': 50000, + }] + """ + + def _modify_iter_params(self, params): + """Modify iter filter to convert offset to start parameter. + + Returns: + dict: updated set of params + """ + params = super(Items, self)._modify_iter_params(params) + offset = params.pop('offset', None) + if offset: + params['start'] = '{}/{}'.format(self.key, offset) + return params diff --git a/scrapinghub/client/jobs.py b/scrapinghub/client/jobs.py new file mode 100644 index 00000000..9bf9b12f --- /dev/null +++ b/scrapinghub/client/jobs.py @@ -0,0 +1,444 @@ +from __future__ import absolute_import +import json + +from ..hubstorage.job import JobMeta +from ..hubstorage.job import Items as _Items +from ..hubstorage.job import Logs as _Logs +from ..hubstorage.job import Samples as _Samples +from ..hubstorage.job import Requests as _Requests + +from .items import Items +from .logs import Logs +from .requests import Requests +from .samples import Samples +from .exceptions import NotFound, InvalidUsage, DuplicateJobError +from .utils import get_tags_for_update +from .utils import parse_job_key + + +class Jobs(object): + """Class representing a collection of jobs for a project/spider. + + Not a public constructor: use :class:`Project` instance or :class:`Spider` + instance to get a :class:`Jobs` instance. See :attr:`Project.jobs` and + :attr:`Spider.jobs` attributes. + + :ivar projectid: an integer project id. + :ivar spider: :class:`Spider` object if defined. + + Usage:: + + >>> project.jobs + + >>> spider = project.spiders.get('spider1') + >>> spider.jobs + + """ + + def __init__(self, client, projectid, spider=None): + self.projectid = projectid + self.spider = spider + self._client = client + self._project = client._hsclient.get_project(projectid) + + def count(self, **params): + """Count jobs for a given set of parameters. + + :param \*\*params: (optional) a set of filters to apply when counting + jobs (e.g. spider, state, has_tag, lacks_tag, startts and endts). + :return: jobs count. + + Usage:: + + >>> spider = project.spiders.get('spider1') + >>> spider.jobs.count() + 5 + >>> project.jobs.count(spider='spider2', state='finished') + 2 + """ + if self.spider: + params['spider'] = self.spider.name + return next(self._project.jobq.apiget(('count',), params=params)) + + def iter(self, **params): + """Iterate over jobs collection for a given set of params. + + :param \*\*params: (optional) a set of filters to apply when counting + jobs (e.g. spider, state, has_tag, lacks_tag, startts and endts). + :return: a generator object over a list of dictionaries of jobs summary + for a given filter params. + + Usage: + + - retrieve all jobs for a spider:: + + >>> spider.jobs.iter() + + + - get all job keys for a spider:: + + >>> jobs_summary = spider.jobs.iter() + >>> [job['key'] for job in jobs_summary] + ['123/1/3', '123/1/2', '123/1/1'] + + - job summary fieldset is less detailed than job.metadata but contains + few new fields as well. Additional fields can be requested using + ``jobmeta`` parameter. If it's used, then it's up to the user to list + all the required fields, so only few default fields would be added + except requested ones:: + + >>> jobs_summary = project.jobs.iter(jobmeta=['scheduled_by', ]) + + - by default :meth:`Jobs.iter` returns maximum last 1000 results. + Pagination is available using start parameter:: + + >>> jobs_summary = spider.jobs.iter(start=1000) + + - get jobs filtered by tags (list of tags has ``OR`` power):: + + >>> jobs_summary = project.jobs.iter( + ... has_tag=['new', 'verified'], lacks_tag='obsolete') + + - get certain number of last finished jobs per some spider:: + + >>> jobs_summary = project.jobs.iter( + ... spider='spider2', state='finished', count=3) + """ + if self.spider: + params['spider'] = self.spider.name + return self._project.jobq.list(**params) + + def list(self, **params): + """Convenient shortcut to list iter results. + + Please note that list() method can use a lot of memory and for a large + amount of jobs it's recommended to iterate through it via iter() + method (all params and available filters are same for both methods). + + """ + return list(self.iter(**params)) + + def schedule(self, spidername=None, **params): + """Schedule a new job and returns its jobkey. + + :param spidername: a spider name string + (not needed if job is scheduled via :attr:`Spider.jobs`). + :param \*\*params: (optional) additional keyword args. + :return: a jobkey string pointing to the new job. + + Usage:: + + >>> project.schedule('spider1', arg1='val1') + '123/1/1' + """ + if not spidername and not self.spider: + raise ValueError('Please provide spidername') + params['project'] = self.projectid + params['spider'] = spidername or self.spider.name + spider_args = params.pop('spider_args', None) + if spider_args: + if not isinstance(spider_args, dict): + raise ValueError("spider_args should be a dictionary") + cleaned_args = {k: v for k, v in spider_args.items() + if k not in params} + params.update(cleaned_args) + if 'job_settings' in params: + params['job_settings'] = json.dumps(params['job_settings']) + if 'meta' in params: + params['meta'] = json.dumps(params['meta']) + # FIXME improve to schedule multiple jobs + try: + response = self._client._connection._post( + 'schedule', 'json', params) + except InvalidUsage as exc: + if 'already scheduled' in str(exc): + raise DuplicateJobError(exc) + raise + return Job(self._client, response['jobid']) + + def get(self, jobkey): + """Get a Job with a given jobkey. + + :param jobkey: a string job key. + + jobkey's project component should match the project used to get + :class:`Jobs` instance, and jobkey's spider component should match + the spider (if :attr:`Spider.jobs` was used). + + :return: :class:`Job` object. + :rtype: scrapinghub.client.Job. + + Usage:: + + >>> job = project.jobs.get('123/1/2') + >>> job.key + '123/1/2' + """ + jobkey = parse_job_key(jobkey) + if jobkey.projectid != self.projectid: + raise ValueError('Please use same project id') + if self.spider and jobkey.spiderid != self.spider._id: + raise ValueError('Please use same spider id') + return Job(self._client, str(jobkey)) + + def summary(self, _queuename=None, **params): + """Get jobs summary (optionally by state). + + :param _queuename: (optional) a string state to filter jobs. + :param \*\*params: (optional) additional keyword args. + :return: a generator object over a list of dictionaries of jobs summary + for a given filter params grouped by job state. + + Usage:: + + >>> spider.jobs.summary() + [{'count': 0, 'name': 'pending', 'summary': []}, + {'count': 0, 'name': 'running', 'summary': []}, + {'count': 5, 'name': 'finished', 'summary': [...]} + + >>> project.jobs.summary('pending') + {'count': 0, 'name': 'pending', 'summary': []} + """ + spiderid = self._extract_spider_id(params) + return self._project.jobq.summary( + _queuename, spiderid=spiderid, **params) + + def iter_last(self, **params): + """Iterate through last jobs for each spider. + + :param \*\*params: (optional) keyword arguments to filter jobs. + :return: a generator object over a list of dictionaries of jobs summary + for a given filter params. + + Usage: + + - get all last job summaries for a project:: + + >>> project.jobs.iter_last() + + + - get last job summary for a a spider:: + + >>> list(spider.jobs.iter_last()) + [{'close_reason': 'success', + 'elapsed': 3062444, + 'errors': 1, + 'finished_time': 1482911633089, + 'key': '123/1/3', + 'logs': 8, + 'pending_time': 1482911596566, + 'running_time': 1482911598909, + 'spider': 'spider1', + 'state': 'finished', + 'ts': 1482911615830, + 'version': 'some-version'}] + """ + spiderid = self._extract_spider_id(params) + return self._project.spiders.lastjobsummary(spiderid, **params) + + def _extract_spider_id(self, params): + spiderid = params.pop('spiderid', None) + if not spiderid and self.spider: + return self.spider._id + elif spiderid and self.spider and str(spiderid) != self.spider._id: + raise ValueError('Please use same spider id') + return str(spiderid) if spiderid else None + + def update_tags(self, add=None, remove=None, spidername=None): + """Update tags for all existing spider jobs. + + :param add: (optional) list of tags to add to selected jobs. + :param remove: (optional) list of tags to remove from selected jobs. + :param spidername: spider name, must if used with :attr:`Project.jobs`. + + It's not allowed to update tags for all project jobs, so spider must be + specified (it's done implicitly when using :attr:`Spider.jobs`, or you + have to specify ``spidername`` param when using :attr:`Project.jobs`). + + :return: amount of jobs that were updated. + + Usage: + + - mark all spider jobs with tag ``consumed``:: + + >>> spider = project.spiders.get('spider1') + >>> spider.jobs.update_tags(add=['consumed']) + 5 + + - remove existing tag ``existing`` for all spider jobs:: + + >>> project.jobs.update_tags( + ... remove=['existing'], spidername='spider2') + 2 + """ + spidername = spidername or (self.spider.name if self.spider else None) + if not spidername: + raise ValueError('Please provide spidername') + params = get_tags_for_update(add_tag=add, remove_tag=remove) + if not params: + return + params.update({'project': self.projectid, 'spider': spidername}) + result = self._client._connection._post('jobs_update', 'json', params) + return result['count'] + + +class Job(object): + """Class representing a job object. + + Not a public constructor: use :class:`ScrapinghubClient` instance or + :class:`Jobs` instance to get a :class:`Job` instance. See + :meth:`ScrapinghubClient.get_job` and :meth:`Jobs.get` methods. + + :ivar projectid: in integer project id. + :ivar key: a job key. + :ivar items: :class:`Items` resource object. + :ivar logs: :class:`Logs` resource object. + :ivar requests: :class:`Requests` resource object. + :ivar samples: :class:`Samples` resource object. + :ivar metadata: :class:`Metadata` resource. + + Usage:: + + >>> job = project.job('123/1/2') + >>> job.key + '123/1/2' + >>> job.metadata['state'] + 'finished' + """ + def __init__(self, client, jobkey, metadata=None): + self.projectid = parse_job_key(jobkey).projectid + self.key = jobkey + + self._client = client + self._project = client._hsclient.get_project(self.projectid) + self._job = client._hsclient.get_job(jobkey) + + # proxied sub-resources + self.items = Items(_Items, client, jobkey) + self.logs = Logs(_Logs, client, jobkey) + self.requests = Requests(_Requests, client, jobkey) + self.samples = Samples(_Samples, client, jobkey) + + self.metadata = JobMeta(client._hsclient, jobkey, cached=metadata) + + def update_metadata(self, *args, **kwargs): + """Update job metadata. + + :param \*\*kwargs: keyword arguments representing job metadata + + Usage: + + - update job outcome:: + + >>> job.update_metadata(close_reason='custom reason') + + - change job tags:: + + >>> job.update_metadata({'tags': 'obsolete'}) + """ + self._job.update_metadata(*args, **kwargs) + + def update_tags(self, add=None, remove=None): + """Partially update job tags. + + It provides a convenient way to mark specific jobs (for better search, + postprocessing etc). + + :param add: (optional) list of tags to add + :param remove: (optional) list of tags to remove + :return: amount of jobs that were updated + + Usage: to mark a job with tag ``consumed``:: + + >>> job.update_tags(add=['consumed']) + """ + params = get_tags_for_update(add_tag=add, remove_tag=remove) + params.update({'project': self.projectid, 'job': self.key}) + result = self._client._connection._post('jobs_update', 'json', params) + return result['count'] + + def close_writers(self): + """Stop job batch writers threads gracefully. + + Called on :meth:`ScrapinghubClient.close` method. + """ + self._job.close_writers() + + def start(self, **params): + """Move job to running state. + + :param \*\*params: (optional) keyword meta parameters to update + :return: a previous string job state + + Usage:: + + >>> job.start() + 'pending' + """ + return self.update(state='running', **params) + + def finish(self, **params): + """Move running job to finished state. + + :param \*\*params: (optional) keyword meta parameters to update + :return: a previous string job state + + Usage:: + + >>> job.finish() + 'running' + """ + return self.update(state='finished', **params) + + def delete(self, **params): + """Mark finished job for deletion. + + :param \*\*params: (optional) keyword meta parameters to update + :return: a previous string job state + + Usage:: + + >>> job.delete() + 'finished' + """ + return self.update(state='deleted', **params) + + def update(self, **params): + """Update job state. + + :param \*\*params: (optional) keyword meta parameters to update + :return: a previous string job state + + Usage:: + + >>> job.update(state='finished') + 'running' + """ + try: + job = next(self._project.jobq.update(self, **params)) + return job['prevstate'] + except StopIteration: + raise NotFound("Job {} doesn't exist".format(self.key)) + + def cancel(self): + """Schedule a running job for cancellation. + + Usage:: + + >>> job.cancel() + >>> job.metadata['cancelled_by'] + 'John' + """ + self._project.jobq.request_cancel(self) + + def purge(self): + """Delete job and expire its local metadata. + + Usage:: + + >>> job.purge() + >>> job.metadata['state'] + 'deleted' + """ + self.delete() + self.metadata.expire() diff --git a/scrapinghub/client/logs.py b/scrapinghub/client/logs.py new file mode 100644 index 00000000..f57b4b7e --- /dev/null +++ b/scrapinghub/client/logs.py @@ -0,0 +1,76 @@ +from __future__ import absolute_import +import json + +from .utils import _Proxy +from .utils import LogLevel + + +class Logs(_Proxy): + """Representation of collection of job logs. + + Not a public constructor: use :class:`Job` instance to get a :class:`Logs` + instance. See :attr:`Job.logs` attribute. + + Please note that list() method can use a lot of memory and for a large + amount of logs it's recommended to iterate through it via iter() method + (all params and available filters are same for both methods). + + Usage: + + - retrieve all logs from a job:: + + >>> job.logs.iter() + + + - iterate through first 100 log entries and print them:: + + >>> for log in job.logs.iter(count=100): + >>> ... print(log) + + - retrieve a single log entry from a job:: + + >>> job.logs.list(count=1) + [{ + 'level': 20, + 'message': '[scrapy.core.engine] Closing spider (finished)', + 'time': 1482233733976, + }] + + - retrive logs with a given log level and filter by a word + + >>> filters = [("message", "contains", ["logger"])] + >>> job.logs.list(level='WARNING', filter=filters) + [{ + 'level': 30, + 'message': 'Some warning message', + 'time': 1486375511188, + }] + """ + + def __init__(self, *args, **kwargs): + super(Logs, self).__init__(*args, **kwargs) + self._proxy_methods(['log', 'debug', 'info', 'warning', 'warn', + 'error', 'batch_write_start']) + + def _modify_iter_params(self, params): + """Modify iter() filters on-the-fly. + + - convert offset to start parameter + - check log level and create a corresponding meta filter + + :param params: an original dictionary with params + :return: a modified dictionary with params + """ + params = super(Logs, self)._modify_iter_params(params) + offset = params.pop('offset', None) + if offset: + params['start'] = '{}/{}'.format(self.key, offset) + level = params.pop('level', None) + if level: + minlevel = getattr(LogLevel, level, None) + if minlevel is None: + raise ValueError("Unknown log level: {}".format(level)) + level_filter = json.dumps(['level', '>=', [minlevel]]) + # there can already be some filters handled by super class method + params['filter'] = params.get('filter', []) + [level_filter] + return params diff --git a/scrapinghub/client/projects.py b/scrapinghub/client/projects.py new file mode 100644 index 00000000..614d9e03 --- /dev/null +++ b/scrapinghub/client/projects.py @@ -0,0 +1,124 @@ +from __future__ import absolute_import + +from ..hubstorage.activity import Activity as _Activity +from ..hubstorage.collectionsrt import Collections as _Collections +from ..hubstorage.project import Settings + +from .activity import Activity +from .collections import Collections +from .frontiers import _HSFrontier, Frontiers +from .jobs import Jobs +from .spiders import Spiders +from .utils import parse_project_id + + +class Projects(object): + """Collection of projects available to current user. + + Not a public constructor: use :class:`Scrapinghub` client instance to get + a :class:`Projects` instance. See :attr:`Scrapinghub.projects` attribute. + + Usage:: + + >>> client.projects + + """ + + def __init__(self, client): + self._client = client + + def get(self, projectid): + """Get project for a given project id. + + :param projectid: integer or string numeric project id. + :return: :class:`Project` object. + :rtype: scrapinghub.client.Project. + + Usage:: + + >>> project = client.projects.get(123) + >>> project + + """ + return Project(self._client, parse_project_id(projectid)) + + def list(self): + """Get list of projects available to current user. + + :return: a list of integer project ids. + + Usage:: + + >>> client.projects.list() + [123, 456] + """ + return self._client._connection.project_ids() + + def iter(self): + """Iterate through list of projects available to current user. + + Provided for the sake of API consistency. + """ + return iter(self.list()) + + def summary(self, **params): + """Get short summaries for all available user projects. + + :return: a list of dictionaries: each dictionary represents a project + summary (amount of pending/running/finished jobs and a flag if it + has a capacity to schedule new jobs). + + Usage:: + + >>> client.projects.summary() + [{'finished': 674, + 'has_capacity': True, + 'pending': 0, + 'project': 123, + 'running': 1}, + {'finished': 33079, + 'has_capacity': True, + 'pending': 0, + 'project': 456, + 'running': 2}] + """ + return self._client._hsclient.projects.jobsummaries(**params) + + +class Project(object): + """Class representing a project object and its resources. + + Not a public constructor: use :class:`ScrapinghubClient` instance or + :class:`Projects` instance to get a :class:`Project` instance. See + :meth:`Scrapinghub.get_project` or :meth:`Projects.get_project` methods. + + :ivar id: integer project id. + :ivar activity: :class:`Activity` resource object. + :ivar collections: :class:`Collections` resource object. + :ivar frontier: :class:`Frontier` resource object. + :ivar jobs: :class:`Jobs` resource object. + :ivar settings: :class:`Settings` resource object. + :ivar spiders: :class:`Spiders` resource object. + + Usage:: + + >>> project = client.get_project(123) + >>> project + + >>> project.key + '123' + """ + + def __init__(self, client, projectid): + self.key = str(projectid) + self._client = client + + # sub-resources + self.jobs = Jobs(client, projectid) + self.spiders = Spiders(client, projectid) + + # proxied sub-resources + self.activity = Activity(_Activity, client, projectid) + self.collections = Collections(_Collections, client, projectid) + self.frontiers = Frontiers(_HSFrontier, client, projectid) + self.settings = Settings(client._hsclient, projectid) diff --git a/scrapinghub/client/requests.py b/scrapinghub/client/requests.py new file mode 100644 index 00000000..61dcf8e3 --- /dev/null +++ b/scrapinghub/client/requests.py @@ -0,0 +1,44 @@ +from __future__ import absolute_import + +from .utils import _Proxy + + +class Requests(_Proxy): + """Representation of collection of job requests. + + Not a public constructor: use :class:`Job` instance to get a + :class:`Requests` instance. See :attr:`Job.requests` attribute. + + Please note that list() method can use a lot of memory and for a large + amount of requests it's recommended to iterate through it via iter() + method (all params and available filters are same for both methods). + + Usage: + + - retrieve all requests from a job:: + + >>> job.requests.iter() + + + - iterate through the requests:: + + >>> for reqitem in job.requests.iter(count=1): + ... print(reqitem['time']) + 1482233733870 + + - retrieve single request from a job:: + + >>> job.requests.list(count=1) + [{ + 'duration': 354, + 'fp': '6d748741a927b10454c83ac285b002cd239964ea', + 'method': 'GET', + 'rs': 1270, + 'status': 200,a + 'time': 1482233733870, + 'url': 'https://example.com' + }] + """ + def __init__(self, *args, **kwargs): + super(Requests, self).__init__(*args, **kwargs) + self._proxy_methods(['add']) diff --git a/scrapinghub/client/samples.py b/scrapinghub/client/samples.py new file mode 100644 index 00000000..581d0fd7 --- /dev/null +++ b/scrapinghub/client/samples.py @@ -0,0 +1,28 @@ +from __future__ import absolute_import + +from .utils import _Proxy + + +class Samples(_Proxy): + """Representation of collection of job samples. + + Not a public constructor: use :class:`Job` instance to get a + :class:`Samples` instance. See :attr:`Job.samples` attribute. + + Please note that list() method can use a lot of memory and for a large + amount of samples it's recommended to iterate through it via iter() + method (all params and available filters are same for both methods). + + Usage: + + - retrieve all samples from a job:: + + >>> job.samples.iter() + + + - retrieve samples with timestamp greater or equal to given timestamp:: + + >>> job.samples.list(startts=1484570043851) + [[1484570043851, 554, 576, 1777, 821, 0], + [1484570046673, 561, 583, 1782, 821, 0]] + """ diff --git a/scrapinghub/client/spiders.py b/scrapinghub/client/spiders.py new file mode 100644 index 00000000..93a89e0a --- /dev/null +++ b/scrapinghub/client/spiders.py @@ -0,0 +1,115 @@ +from __future__ import absolute_import + +from requests.compat import urljoin + +from .jobs import Jobs +from .exceptions import NotFound +from .exceptions import wrap_http_errors +from .utils import get_tags_for_update + + +class Spiders(object): + """Class to work with a collection of project spiders. + + Not a public constructor: use :class:`Project` instance to get + a :class:`Spiders` instance. See :attr:`Project.spiders` attribute. + + :ivar projectid: integer project id. + + Usage:: + + >>> project.spiders + + """ + + def __init__(self, client, projectid): + self.projectid = projectid + self._client = client + + def get(self, spidername, **params): + """Get a spider object for a given spider name. + + The method gets/sets spider id (and checks if spider exists). + + :param spidername: a string spider name. + :return: :class:`Spider` object. + :rtype: scrapinghub.client.Spider. + + Usage:: + + >>> project.spiders.get('spider2') + + >>> project.spiders.get('non-existing') + NotFound: Spider non-existing doesn't exist. + """ + project = self._client._hsclient.get_project(self.projectid) + spiderid = project.ids.spider(spidername, **params) + if spiderid is None: + raise NotFound("Spider {} doesn't exist.".format(spidername)) + return Spider(self._client, self.projectid, spiderid, spidername) + + def list(self): + """Get a list of spiders for a project. + + :return: a list of dictionaries with spiders metadata. + + Usage:: # noqa + + >>> project.spiders.list() + [{'id': 'spider1', 'tags': [], 'type': 'manual', 'version': '123'}, + {'id': 'spider2', 'tags': [], 'type': 'manual', 'version': '123'}] + """ + project = self._client._connection[self.projectid] + return project.spiders() + + def iter(self): + """Iterate through a list of spiders for a project. + + Provided for the sake of API consistency. + """ + return iter(self.list()) + + +class Spider(object): + """Class representing a Spider object. + + Not a public constructor: use :class:`Spiders` instance to get + a :class:`Spider` instance. See :meth:`Spiders.get` method. + + :ivar projectid: integer project id. + :ivar name: a spider name string. + :ivar jobs: a collection of jobs, :class:`Jobs` object. + + Usage:: + + >>> spider = project.spiders.get('spider1') + >>> spider.key + '123/1' + >>> spider.name + 'spider1' + """ + + def __init__(self, client, projectid, spiderid, spidername): + self.projectid = projectid + self.key = '{}/{}'.format(str(projectid), str(spiderid)) + self._id = str(spiderid) + self.name = spidername + self.jobs = Jobs(client, projectid, self) + self._client = client + + @wrap_http_errors + def update_tags(self, add=None, remove=None): + params = get_tags_for_update(add=add, remove=remove) + path = 'v2/projects/{}/spiders/{}/tags'.format(self.projectid, + self._id) + url = urljoin(self._client._connection.url, path) + response = self._client._connection._session.patch(url, json=params) + response.raise_for_status() + + @wrap_http_errors + def list_tags(self): + path = 'v2/projects/{}/spiders/{}'.format(self.projectid, self._id) + url = urljoin(self._client._connection.url, path) + response = self._client._connection._session.get(url) + response.raise_for_status() + return response.json().get('tags', []) diff --git a/scrapinghub/utils.py b/scrapinghub/client/utils.py similarity index 65% rename from scrapinghub/utils.py rename to scrapinghub/client/utils.py index 9c60f3f1..cca35fb5 100644 --- a/scrapinghub/utils.py +++ b/scrapinghub/client/utils.py @@ -1,10 +1,18 @@ +from __future__ import absolute_import + import os import json import logging import binascii from codecs import decode -from six import string_types, binary_type +from six import string_types + +from ..hubstorage.resourcetype import DownloadableResource +from ..hubstorage.resourcetype import ItemsResourceType +from ..hubstorage.collectionsrt import Collections + +from .exceptions import wrap_value_too_large class LogLevel(object): @@ -63,6 +71,60 @@ def get_tags_for_update(**kwargs): return params +class _Proxy(object): + """A helper to create a class instance and proxy its methods to origin. + + The internal proxy class is useful to link class attributes from its + origin depending on the origin base class as a part of init logic: + + - :class:`ItemsResourceType` provides items-based attributes to access + items in an arbitrary collection with get/write/flush/close/stats/iter + methods. + + - :class:`DownloadableResource` provides download-based attributes to + iter through collection with or without msgpack support. + """ + + def __init__(self, cls, client, key): + self.key = key + self._client = client + self._origin = cls(client._hsclient, key) + + if issubclass(cls, ItemsResourceType): + self._proxy_methods(['get', 'write', 'flush', 'close', + 'stats', ('iter', 'list')]) + # redefine write method to wrap hubstorage.ValueTooLarge error + origin_method = getattr(self, 'write') + setattr(self, 'write', wrap_value_too_large(origin_method)) + + # DType iter_values() has more priority than IType list() + # plus Collections interface doesn't need the iter methods + if issubclass(cls, DownloadableResource) and cls is not Collections: + methods = [('iter', 'iter_values'), + ('iter_raw_msgpack', 'iter_msgpack'), + ('iter_raw_json', 'iter_json')] + self._proxy_methods(methods) + self._wrap_iter_methods([method[0] for method in methods]) + + def _proxy_methods(self, methods): + """A little helper for cleaner interface.""" + proxy_methods(self._origin, self, methods) + + def _wrap_iter_methods(self, methods): + """Modify kwargs for all passed self.iter* methods.""" + for method in methods: + wrapped = wrap_kwargs(getattr(self, method), + self._modify_iter_params) + setattr(self, method, wrapped) + + def _modify_iter_params(self, params): + """Modify iter() params on-the-fly.""" + return format_iter_filters(params) + + def list(self, *args, **kwargs): + return list(self.iter(*args, **kwargs)) + + def wrap_kwargs(fn, kwargs_fn): """Tiny wrapper to prepare modified version of function kwargs""" def wrapped(*args, **kwargs): diff --git a/tests/client/conftest.py b/tests/client/conftest.py index a05781e0..16297f91 100644 --- a/tests/client/conftest.py +++ b/tests/client/conftest.py @@ -8,7 +8,7 @@ import shutil from scrapinghub import ScrapinghubClient -from scrapinghub.exceptions import NotFound +from scrapinghub.client.exceptions import NotFound TEST_PROJECT_ID = "2222222" TEST_SPIDER_NAME = 'hs-test-spider' diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 711a0acf..784af111 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -2,9 +2,11 @@ from scrapinghub import HubstorageClient from scrapinghub import ScrapinghubClient -from scrapinghub.client import Projects, Project, Job +from scrapinghub.client.jobs import Job +from scrapinghub.client.projects import Projects, Project from scrapinghub.hubstorage.utils import apipoll + from .conftest import TEST_PROJECT_ID, TEST_SPIDER_NAME from .conftest import TEST_USER_AUTH, TEST_DASH_ENDPOINT diff --git a/tests/client/test_collections.py b/tests/client/test_collections.py index d33083dc..5765ff4f 100644 --- a/tests/client/test_collections.py +++ b/tests/client/test_collections.py @@ -3,7 +3,10 @@ import pytest from six.moves import range -from scrapinghub.exceptions import InvalidUsage, NotFound, ValueTooLarge +from scrapinghub.client.exceptions import InvalidUsage +from scrapinghub.client.exceptions import NotFound +from scrapinghub.client.exceptions import ValueTooLarge + from .conftest import TEST_COLLECTION_NAME diff --git a/tests/client/test_frontier.py b/tests/client/test_frontier.py index 773d56b6..0b8e806c 100644 --- a/tests/client/test_frontier.py +++ b/tests/client/test_frontier.py @@ -4,7 +4,7 @@ from six import string_types -from scrapinghub.client import Frontiers, Frontier, FrontierSlot +from scrapinghub.client.frontiers import Frontiers, Frontier, FrontierSlot from .conftest import TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT diff --git a/tests/client/test_job.py b/tests/client/test_job.py index 660dd6c7..7f1c091c 100644 --- a/tests/client/test_job.py +++ b/tests/client/test_job.py @@ -1,6 +1,9 @@ -from scrapinghub.client import Job -from scrapinghub.client import Items, Logs, Requests -from scrapinghub.client import Samples, JobMeta +from scrapinghub.client.items import Items +from scrapinghub.client.jobs import Job +from scrapinghub.client.jobs import JobMeta +from scrapinghub.client.logs import Logs +from scrapinghub.client.requests import Requests +from scrapinghub.client.samples import Samples from .conftest import TEST_PROJECT_ID from .conftest import TEST_SPIDER_NAME diff --git a/tests/client/test_project.py b/tests/client/test_project.py index 7942e986..b56dacdd 100644 --- a/tests/client/test_project.py +++ b/tests/client/test_project.py @@ -4,10 +4,13 @@ import pytest from six.moves import range -from scrapinghub.client import Jobs, Job -from scrapinghub.exceptions import DuplicateJobError -from scrapinghub.client import Activity, Collections, Spiders -from scrapinghub.client import Frontiers, Settings +from scrapinghub.client.activity import Activity +from scrapinghub.client.collections import Collections +from scrapinghub.client.exceptions import DuplicateJobError +from scrapinghub.client.frontiers import Frontiers +from scrapinghub.client.jobs import Jobs, Job +from scrapinghub.client.projects import Settings +from scrapinghub.client.spiders import Spiders from .conftest import TEST_PROJECT_ID, TEST_SPIDER_NAME from .utils import validate_default_meta @@ -123,7 +126,7 @@ def test_project_jobs_schedule(project): project.jobs.schedule(TEST_SPIDER_NAME) job1 = project.jobs.schedule(TEST_SPIDER_NAME, - spider_args={'arg1':'val1', 'arg2': 'val2'}, + spider_args={'arg1': 'val1', 'arg2': 'val2'}, priority=3, units=3, add_tag=['tagA', 'tagB'], meta={'state': 'running', 'meta1': 'val1'}) diff --git a/tests/client/test_proxy.py b/tests/client/test_proxy.py index 023c8eae..498a2f18 100644 --- a/tests/client/test_proxy.py +++ b/tests/client/test_proxy.py @@ -5,7 +5,7 @@ import pytest from six.moves import range -from scrapinghub.client import LogLevel +from scrapinghub.client.utils import LogLevel from scrapinghub.hubstorage.serialization import mpdecode from .conftest import TEST_PROJECT_ID diff --git a/tests/client/test_spider.py b/tests/client/test_spider.py index c48af592..a0198c25 100644 --- a/tests/client/test_spider.py +++ b/tests/client/test_spider.py @@ -5,10 +5,12 @@ from six import string_types from six.moves import range -from scrapinghub.exceptions import NotFound, DuplicateJobError, InvalidUsage -from scrapinghub.client import Jobs, Job -from scrapinghub.client import Spider -from scrapinghub.utils import JobKey +from scrapinghub.client.exceptions import DuplicateJobError +from scrapinghub.client.exceptions import InvalidUsage +from scrapinghub.client.exceptions import NotFound +from scrapinghub.client.jobs import Jobs, Job +from scrapinghub.client.spiders import Spider +from scrapinghub.client.utils import JobKey from .conftest import TEST_PROJECT_ID, TEST_SPIDER_NAME from .utils import validate_default_meta diff --git a/tests/client/test_utils.py b/tests/client/test_utils.py index 475e317a..03e4362c 100644 --- a/tests/client/test_utils.py +++ b/tests/client/test_utils.py @@ -4,8 +4,8 @@ import mock -from scrapinghub.utils import parse_auth -from scrapinghub.utils import format_iter_filters +from scrapinghub.client.utils import parse_auth +from scrapinghub.client.utils import format_iter_filters def test_format_iter_filters():