Skip to content

Commit

Permalink
Merge pull request #5171 from elacuesta/request-types
Browse files Browse the repository at this point in the history
Type hints for Request and subclasses
  • Loading branch information
wRAR committed Jun 11, 2021
2 parents 2885857 + ce64477 commit 9f81de2
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 63 deletions.
2 changes: 1 addition & 1 deletion scrapy/downloadermiddlewares/retry.py
Expand Up @@ -98,7 +98,7 @@ def parse(self, response):
{'request': request, 'retry_times': retry_times, 'reason': reason},
extra={'spider': spider}
)
new_request = request.copy()
new_request: Request = request.copy()
new_request.meta['retry_times'] = retry_times
new_request.dont_filter = True
if priority_adjust is None:
Expand Down
57 changes: 35 additions & 22 deletions scrapy/http/request/__init__.py
Expand Up @@ -5,7 +5,7 @@
See documentation in docs/topics/request-response.rst
"""
import inspect
from typing import Optional, Tuple
from typing import Callable, List, Optional, Tuple, Type, TypeVar, Union

from w3lib.url import safe_url_string

Expand All @@ -18,6 +18,9 @@
from scrapy.utils.url import escape_ajax


RequestTypeVar = TypeVar("RequestTypeVar", bound="Request")


class Request(object_ref):
"""Represents an HTTP request, which is usually generated in a Spider and
executed by the Downloader, thus generating a :class:`Response`.
Expand All @@ -36,10 +39,22 @@ class Request(object_ref):
:func:`~scrapy.utils.request.request_from_dict`.
"""

def __init__(self, url, callback=None, method='GET', headers=None, body=None,
cookies=None, meta=None, encoding='utf-8', priority=0,
dont_filter=False, errback=None, flags=None, cb_kwargs=None):

def __init__(
self,
url: str,
callback: Optional[Callable] = None,
method: str = "GET",
headers: Optional[dict] = None,
body: Optional[Union[bytes, str]] = None,
cookies: Optional[Union[dict, List[dict]]] = None,
meta: Optional[dict] = None,
encoding: str = "utf-8",
priority: int = 0,
dont_filter: bool = False,
errback: Optional[Callable] = None,
flags: Optional[List[str]] = None,
cb_kwargs: Optional[dict] = None,
) -> None:
self._encoding = encoding # this one has to be set first
self.method = str(method).upper()
self._set_url(url)
Expand All @@ -64,23 +79,23 @@ def __init__(self, url, callback=None, method='GET', headers=None, body=None,
self.flags = [] if flags is None else list(flags)

@property
def cb_kwargs(self):
def cb_kwargs(self) -> dict:
if self._cb_kwargs is None:
self._cb_kwargs = {}
return self._cb_kwargs

@property
def meta(self):
def meta(self) -> dict:
if self._meta is None:
self._meta = {}
return self._meta

def _get_url(self):
def _get_url(self) -> str:
return self._url

def _set_url(self, url):
def _set_url(self, url: str) -> None:
if not isinstance(url, str):
raise TypeError(f'Request url must be str or unicode, got {type(url).__name__}')
raise TypeError(f"Request url must be str, got {type(url).__name__}")

s = safe_url_string(url, self.encoding)
self._url = escape_ajax(s)
Expand All @@ -94,39 +109,37 @@ def _set_url(self, url):

url = property(_get_url, obsolete_setter(_set_url, 'url'))

def _get_body(self):
def _get_body(self) -> bytes:
return self._body

def _set_body(self, body):
if body is None:
self._body = b''
else:
self._body = to_bytes(body, self.encoding)
def _set_body(self, body: Optional[Union[str, bytes]]) -> None:
self._body = b"" if body is None else to_bytes(body, self.encoding)

body = property(_get_body, obsolete_setter(_set_body, 'body'))

@property
def encoding(self):
def encoding(self) -> str:
return self._encoding

def __str__(self):
def __str__(self) -> str:
return f"<{self.method} {self.url}>"

__repr__ = __str__

def copy(self):
"""Return a copy of this Request"""
def copy(self) -> "Request":
return self.replace()

def replace(self, *args, **kwargs):
def replace(self, *args, **kwargs) -> "Request":
"""Create a new Request with the same attributes except for those given new values"""
for x in self.attributes:
kwargs.setdefault(x, getattr(self, x))
cls = kwargs.pop('cls', self.__class__)
return cls(*args, **kwargs)

@classmethod
def from_curl(cls, curl_command, ignore_unknown_options=True, **kwargs):
def from_curl(
cls: Type[RequestTypeVar], curl_command: str, ignore_unknown_options: bool = True, **kwargs
) -> RequestTypeVar:
"""Create a Request object from a string containing a `cURL
<https://curl.haxx.se/>`_ command. It populates the HTTP method, the
URL, the headers, the cookies and the body. It accepts the same
Expand Down
84 changes: 56 additions & 28 deletions scrapy/http/request/form.py
Expand Up @@ -5,22 +5,28 @@
See documentation in docs/topics/request-response.rst
"""

from typing import Iterable, List, Optional, Tuple, Type, TypeVar, Union
from urllib.parse import urljoin, urlencode

import lxml.html
from lxml.html import FormElement, HtmlElement, HTMLParser, SelectElement
from parsel.selector import create_root_node
from w3lib.html import strip_html5_whitespace

from scrapy.http.request import Request
from scrapy.http.response.text import TextResponse
from scrapy.utils.python import to_bytes, is_listlike
from scrapy.utils.response import get_base_url


FormRequestTypeVar = TypeVar("FormRequestTypeVar", bound="FormRequest")

FormdataType = Optional[Union[dict, List[Tuple[str, str]]]]


class FormRequest(Request):
valid_form_methods = ['GET', 'POST']

def __init__(self, *args, **kwargs):
formdata = kwargs.pop('formdata', None)
def __init__(self, *args, formdata: FormdataType = None, **kwargs) -> None:
if formdata and kwargs.get('method') is None:
kwargs['method'] = 'POST'

Expand All @@ -36,17 +42,27 @@ def __init__(self, *args, **kwargs):
self._set_url(self.url + ('&' if '?' in self.url else '?') + querystr)

@classmethod
def from_response(cls, response, formname=None, formid=None, formnumber=0, formdata=None,
clickdata=None, dont_click=False, formxpath=None, formcss=None, **kwargs):

def from_response(
cls: Type[FormRequestTypeVar],
response: TextResponse,
formname: Optional[str] = None,
formid: Optional[str] = None,
formnumber: Optional[int] = 0,
formdata: FormdataType = None,
clickdata: Optional[dict] = None,
dont_click: bool = False,
formxpath: Optional[str] = None,
formcss: Optional[str] = None,
**kwargs,
) -> FormRequestTypeVar:
kwargs.setdefault('encoding', response.encoding)

if formcss is not None:
from parsel.csstranslator import HTMLTranslator
formxpath = HTMLTranslator().css_to_xpath(formcss)

form = _get_form(response, formname, formid, formnumber, formxpath)
formdata = _get_inputs(form, formdata, dont_click, clickdata, response)
formdata = _get_inputs(form, formdata, dont_click, clickdata)
url = _get_form_url(form, kwargs.pop('url', None))

method = kwargs.pop('method', form.method)
Expand All @@ -58,7 +74,7 @@ def from_response(cls, response, formname=None, formid=None, formnumber=0, formd
return cls(url=url, method=method, formdata=formdata, **kwargs)


def _get_form_url(form, url):
def _get_form_url(form: FormElement, url: Optional[str]) -> str:
if url is None:
action = form.get('action')
if action is None:
Expand All @@ -67,17 +83,22 @@ def _get_form_url(form, url):
return urljoin(form.base_url, url)


def _urlencode(seq, enc):
def _urlencode(seq: Iterable, enc: str) -> str:
values = [(to_bytes(k, enc), to_bytes(v, enc))
for k, vs in seq
for v in (vs if is_listlike(vs) else [vs])]
return urlencode(values, doseq=True)


def _get_form(response, formname, formid, formnumber, formxpath):
"""Find the form element """
root = create_root_node(response.text, lxml.html.HTMLParser,
base_url=get_base_url(response))
def _get_form(
response: TextResponse,
formname: Optional[str],
formid: Optional[str],
formnumber: Optional[int],
formxpath: Optional[str],
) -> FormElement:
"""Find the wanted form element within the given response."""
root = create_root_node(response.text, HTMLParser, base_url=get_base_url(response))
forms = root.xpath('//form')
if not forms:
raise ValueError(f"No <form> element found in {response}")
Expand Down Expand Up @@ -105,8 +126,7 @@ def _get_form(response, formname, formid, formnumber, formxpath):
break
raise ValueError(f'No <form> element found with {formxpath}')

# If we get here, it means that either formname was None
# or invalid
# If we get here, it means that either formname was None or invalid
if formnumber is not None:
try:
form = forms[formnumber]
Expand All @@ -116,47 +136,54 @@ def _get_form(response, formname, formid, formnumber, formxpath):
return form


def _get_inputs(form, formdata, dont_click, clickdata, response):
def _get_inputs(
form: FormElement,
formdata: FormdataType,
dont_click: bool,
clickdata: Optional[dict],
) -> List[Tuple[str, str]]:
"""Return a list of key-value pairs for the inputs found in the given form."""
try:
formdata_keys = dict(formdata or ()).keys()
except (ValueError, TypeError):
raise ValueError('formdata should be a dict or iterable of tuples')

if not formdata:
formdata = ()
formdata = []
inputs = form.xpath('descendant::textarea'
'|descendant::select'
'|descendant::input[not(@type) or @type['
' not(re:test(., "^(?:submit|image|reset)$", "i"))'
' and (../@checked or'
' not(re:test(., "^(?:checkbox|radio)$", "i")))]]',
namespaces={
"re": "http://exslt.org/regular-expressions"})
values = [(k, '' if v is None else v)
for k, v in (_value(e) for e in inputs)
if k and k not in formdata_keys]
namespaces={"re": "http://exslt.org/regular-expressions"})
values = [
(k, '' if v is None else v)
for k, v in (_value(e) for e in inputs)
if k and k not in formdata_keys
]

if not dont_click:
clickable = _get_clickable(clickdata, form)
if clickable and clickable[0] not in formdata and not clickable[0] is None:
values.append(clickable)

if isinstance(formdata, dict):
formdata = formdata.items()
formdata = formdata.items() # type: ignore[assignment]

values.extend((k, v) for k, v in formdata if v is not None)
return values


def _value(ele):
def _value(ele: HtmlElement):
n = ele.name
v = ele.value
if ele.tag == 'select':
return _select_value(ele, n, v)
return n, v


def _select_value(ele, n, v):
def _select_value(ele: SelectElement, n: str, v: str):
multiple = ele.multiple
if v is None and not multiple:
# Match browser behaviour on simple select tag without options selected
Expand All @@ -167,11 +194,12 @@ def _select_value(ele, n, v):
# This is a workround to bug in lxml fixed 2.3.1
# fix https://github.com/lxml/lxml/commit/57f49eed82068a20da3db8f1b18ae00c1bab8b12#L1L1139
selected_options = ele.xpath('.//option[@selected]')
v = [(o.get('value') or o.text or '').strip() for o in selected_options]
values = [(o.get('value') or o.text or '').strip() for o in selected_options]
return n, values
return n, v


def _get_clickable(clickdata, form):
def _get_clickable(clickdata: Optional[dict], form: FormElement) -> Optional[Tuple[str, str]]:
"""
Returns the clickable element specified in clickdata,
if the latter is given. If not, it returns the first
Expand All @@ -183,7 +211,7 @@ def _get_clickable(clickdata, form):
namespaces={"re": "http://exslt.org/regular-expressions"}
))
if not clickables:
return
return None

# If we don't have clickdata, we just use the first clickable element
if clickdata is None:
Expand Down
15 changes: 6 additions & 9 deletions scrapy/http/request/json_request.py
Expand Up @@ -8,7 +8,7 @@
import copy
import json
import warnings
from typing import Tuple
from typing import Optional, Tuple

from scrapy.http.request import Request
from scrapy.utils.deprecate import create_deprecated_class
Expand All @@ -18,8 +18,8 @@ class JsonRequest(Request):

attributes: Tuple[str, ...] = Request.attributes + ("dumps_kwargs",)

def __init__(self, *args, **kwargs):
dumps_kwargs = copy.deepcopy(kwargs.pop('dumps_kwargs', {}))
def __init__(self, *args, dumps_kwargs: Optional[dict] = None, **kwargs) -> None:
dumps_kwargs = copy.deepcopy(dumps_kwargs) if dumps_kwargs is not None else {}
dumps_kwargs.setdefault('sort_keys', True)
self._dumps_kwargs = dumps_kwargs

Expand All @@ -29,10 +29,8 @@ def __init__(self, *args, **kwargs):

if body_passed and data_passed:
warnings.warn('Both body and data passed. data will be ignored')

elif not body_passed and data_passed:
kwargs['body'] = self._dumps(data)

if 'method' not in kwargs:
kwargs['method'] = 'POST'

Expand All @@ -41,23 +39,22 @@ def __init__(self, *args, **kwargs):
self.headers.setdefault('Accept', 'application/json, text/javascript, */*; q=0.01')

@property
def dumps_kwargs(self):
def dumps_kwargs(self) -> dict:
return self._dumps_kwargs

def replace(self, *args, **kwargs):
def replace(self, *args, **kwargs) -> Request:
body_passed = kwargs.get('body', None) is not None
data = kwargs.pop('data', None)
data_passed = data is not None

if body_passed and data_passed:
warnings.warn('Both body and data passed. data will be ignored')

elif not body_passed and data_passed:
kwargs['body'] = self._dumps(data)

return super().replace(*args, **kwargs)

def _dumps(self, data):
def _dumps(self, data: dict) -> str:
"""Convert to JSON """
return json.dumps(data, **self._dumps_kwargs)

Expand Down

0 comments on commit 9f81de2

Please sign in to comment.