Skip to content

Commit

Permalink
Full typing for scrapy/exporters.py. (#6275)
Browse files Browse the repository at this point in the history
  • Loading branch information
wRAR committed Mar 8, 2024
1 parent 861646f commit 8985a04
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 72 deletions.
150 changes: 83 additions & 67 deletions scrapy/exporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
"""

import csv
import io
import marshal
import pickle # nosec
import pprint
from collections.abc import Mapping
from io import BytesIO, TextIOWrapper
from json import JSONEncoder
from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Tuple, Union
from xml.sax.saxutils import XMLGenerator # nosec
from xml.sax.xmlreader import AttributesImpl # nosec

from itemadapter import ItemAdapter, is_item

from scrapy.item import Item
from scrapy.item import Field, Item
from scrapy.utils.python import is_listlike, to_bytes, to_unicode
from scrapy.utils.serialize import ScrapyJSONEncoder

Expand All @@ -29,36 +31,42 @@


class BaseItemExporter:
def __init__(self, *, dont_fail=False, **kwargs):
self._kwargs = kwargs
def __init__(self, *, dont_fail: bool = False, **kwargs: Any):
self._kwargs: Dict[str, Any] = kwargs
self._configure(kwargs, dont_fail=dont_fail)

def _configure(self, options, dont_fail=False):
def _configure(self, options: Dict[str, Any], dont_fail: bool = False) -> None:
"""Configure the exporter by popping options from the ``options`` dict.
If dont_fail is set, it won't raise an exception on unexpected options
(useful for using with keyword arguments in subclasses ``__init__`` methods)
"""
self.encoding = options.pop("encoding", None)
self.fields_to_export = options.pop("fields_to_export", None)
self.export_empty_fields = options.pop("export_empty_fields", False)
self.indent = options.pop("indent", None)
self.encoding: Optional[str] = options.pop("encoding", None)
self.fields_to_export: Union[Mapping[str, str], Iterable[str], None] = (
options.pop("fields_to_export", None)
)
self.export_empty_fields: bool = options.pop("export_empty_fields", False)
self.indent: Optional[int] = options.pop("indent", None)
if not dont_fail and options:
raise TypeError(f"Unexpected options: {', '.join(options.keys())}")

def export_item(self, item):
def export_item(self, item: Any) -> None:
raise NotImplementedError

def serialize_field(self, field, name, value):
serializer = field.get("serializer", lambda x: x)
def serialize_field(
self, field: Union[Mapping[str, Any], Field], name: str, value: Any
) -> Any:
serializer: Callable[[Any], Any] = field.get("serializer", lambda x: x)
return serializer(value)

def start_exporting(self):
def start_exporting(self) -> None:
pass

def finish_exporting(self):
def finish_exporting(self) -> None:
pass

def _get_serialized_fields(self, item, default_value=None, include_empty=None):
def _get_serialized_fields(
self, item: Any, default_value: Any = None, include_empty: Optional[bool] = None
) -> Iterable[Tuple[str, Any]]:
"""Return the fields to export as an iterable of tuples
(name, serialized_value)
"""
Expand Down Expand Up @@ -100,22 +108,22 @@ def _get_serialized_fields(self, item, default_value=None, include_empty=None):


class JsonLinesItemExporter(BaseItemExporter):
def __init__(self, file, **kwargs):
def __init__(self, file: BytesIO, **kwargs: Any):
super().__init__(dont_fail=True, **kwargs)
self.file = file
self.file: BytesIO = file
self._kwargs.setdefault("ensure_ascii", not self.encoding)
self.encoder = ScrapyJSONEncoder(**self._kwargs)
self.encoder: JSONEncoder = ScrapyJSONEncoder(**self._kwargs)

def export_item(self, item):
def export_item(self, item: Any) -> None:
itemdict = dict(self._get_serialized_fields(item))
data = self.encoder.encode(itemdict) + "\n"
self.file.write(to_bytes(data, self.encoding))


class JsonItemExporter(BaseItemExporter):
def __init__(self, file, **kwargs):
def __init__(self, file: BytesIO, **kwargs: Any):
super().__init__(dont_fail=True, **kwargs)
self.file = file
self.file: BytesIO = file
# there is a small difference between the behaviour or JsonItemExporter.indent
# and ScrapyJSONEncoder.indent. ScrapyJSONEncoder.indent=None is needed to prevent
# the addition of newlines everywhere
Expand All @@ -127,71 +135,71 @@ def __init__(self, file, **kwargs):
self.encoder = ScrapyJSONEncoder(**self._kwargs)
self.first_item = True

def _beautify_newline(self):
def _beautify_newline(self) -> None:
if self.indent is not None:
self.file.write(b"\n")

def _add_comma_after_first(self):
def _add_comma_after_first(self) -> None:
if self.first_item:
self.first_item = False
else:
self.file.write(b",")
self._beautify_newline()

def start_exporting(self):
def start_exporting(self) -> None:
self.file.write(b"[")
self._beautify_newline()

def finish_exporting(self):
def finish_exporting(self) -> None:
self._beautify_newline()
self.file.write(b"]")

def export_item(self, item):
def export_item(self, item: Any) -> None:
itemdict = dict(self._get_serialized_fields(item))
data = to_bytes(self.encoder.encode(itemdict), self.encoding)
self._add_comma_after_first()
self.file.write(data)


class XmlItemExporter(BaseItemExporter):
def __init__(self, file, **kwargs):
def __init__(self, file: BytesIO, **kwargs: Any):
self.item_element = kwargs.pop("item_element", "item")
self.root_element = kwargs.pop("root_element", "items")
super().__init__(**kwargs)
if not self.encoding:
self.encoding = "utf-8"
self.xg = XMLGenerator(file, encoding=self.encoding)

def _beautify_newline(self, new_item=False):
def _beautify_newline(self, new_item: bool = False) -> None:
if self.indent is not None and (self.indent > 0 or new_item):
self.xg.characters("\n")

def _beautify_indent(self, depth=1):
def _beautify_indent(self, depth: int = 1) -> None:
if self.indent:
self.xg.characters(" " * self.indent * depth)

def start_exporting(self):
def start_exporting(self) -> None:
self.xg.startDocument()
self.xg.startElement(self.root_element, {})
self.xg.startElement(self.root_element, AttributesImpl({}))
self._beautify_newline(new_item=True)

def export_item(self, item):
def export_item(self, item: Any) -> None:
self._beautify_indent(depth=1)
self.xg.startElement(self.item_element, {})
self.xg.startElement(self.item_element, AttributesImpl({}))
self._beautify_newline()
for name, value in self._get_serialized_fields(item, default_value=""):
self._export_xml_field(name, value, depth=2)
self._beautify_indent(depth=1)
self.xg.endElement(self.item_element)
self._beautify_newline(new_item=True)

def finish_exporting(self):
def finish_exporting(self) -> None:
self.xg.endElement(self.root_element)
self.xg.endDocument()

def _export_xml_field(self, name, serialized_value, depth):
def _export_xml_field(self, name: str, serialized_value: Any, depth: int) -> None:
self._beautify_indent(depth=depth)
self.xg.startElement(name, {})
self.xg.startElement(name, AttributesImpl({}))
if hasattr(serialized_value, "items"):
self._beautify_newline()
for subname, value in serialized_value.items():
Expand All @@ -213,17 +221,17 @@ def _export_xml_field(self, name, serialized_value, depth):
class CsvItemExporter(BaseItemExporter):
def __init__(
self,
file,
include_headers_line=True,
join_multivalued=",",
errors=None,
**kwargs,
file: BytesIO,
include_headers_line: bool = True,
join_multivalued: str = ",",
errors: Optional[str] = None,
**kwargs: Any,
):
super().__init__(dont_fail=True, **kwargs)
if not self.encoding:
self.encoding = "utf-8"
self.include_headers_line = include_headers_line
self.stream = io.TextIOWrapper(
self.stream = TextIOWrapper(
file,
line_buffering=False,
write_through=True,
Expand All @@ -235,19 +243,21 @@ def __init__(
self._headers_not_written = True
self._join_multivalued = join_multivalued

def serialize_field(self, field, name, value):
serializer = field.get("serializer", self._join_if_needed)
def serialize_field(
self, field: Union[Mapping[str, Any], Field], name: str, value: Any
) -> Any:
serializer: Callable[[Any], Any] = field.get("serializer", self._join_if_needed)
return serializer(value)

def _join_if_needed(self, value):
def _join_if_needed(self, value: Any) -> Any:
if isinstance(value, (list, tuple)):
try:
return self._join_multivalued.join(value)
except TypeError: # list in value may not contain strings
pass
return value

def export_item(self, item):
def export_item(self, item: Any) -> None:
if self._headers_not_written:
self._headers_not_written = False
self._write_headers_and_set_fields_to_export(item)
Expand All @@ -256,36 +266,38 @@ def export_item(self, item):
values = list(self._build_row(x for _, x in fields))
self.csv_writer.writerow(values)

def finish_exporting(self):
def finish_exporting(self) -> None:
self.stream.detach() # Avoid closing the wrapped file.

def _build_row(self, values):
def _build_row(self, values: Iterable[Any]) -> Iterable[Any]:
for s in values:
try:
yield to_unicode(s, self.encoding)
except TypeError:
yield s

def _write_headers_and_set_fields_to_export(self, item):
def _write_headers_and_set_fields_to_export(self, item: Any) -> None:
if self.include_headers_line:
if not self.fields_to_export:
# use declared field names, or keys if the item is a dict
self.fields_to_export = ItemAdapter(item).field_names()
fields: Iterable[str]
if isinstance(self.fields_to_export, Mapping):
fields = self.fields_to_export.values()
else:
assert self.fields_to_export
fields = self.fields_to_export
row = list(self._build_row(fields))
self.csv_writer.writerow(row)


class PickleItemExporter(BaseItemExporter):
def __init__(self, file, protocol=4, **kwargs):
def __init__(self, file: BytesIO, protocol: int = 4, **kwargs: Any):
super().__init__(**kwargs)
self.file = file
self.protocol = protocol
self.file: BytesIO = file
self.protocol: int = protocol

def export_item(self, item):
def export_item(self, item: Any) -> None:
d = dict(self._get_serialized_fields(item))
pickle.dump(d, self.file, self.protocol)

Expand All @@ -299,20 +311,20 @@ class MarshalItemExporter(BaseItemExporter):
opened in binary mode, a :class:`~io.BytesIO` object, etc)
"""

def __init__(self, file, **kwargs):
def __init__(self, file: BytesIO, **kwargs: Any):
super().__init__(**kwargs)
self.file = file
self.file: BytesIO = file

def export_item(self, item):
def export_item(self, item: Any) -> None:
marshal.dump(dict(self._get_serialized_fields(item)), self.file)


class PprintItemExporter(BaseItemExporter):
def __init__(self, file, **kwargs):
def __init__(self, file: BytesIO, **kwargs: Any):
super().__init__(**kwargs)
self.file = file
self.file: BytesIO = file

def export_item(self, item):
def export_item(self, item: Any) -> None:
itemdict = dict(self._get_serialized_fields(item))
self.file.write(to_bytes(pprint.pformat(itemdict) + "\n"))

Expand All @@ -327,16 +339,20 @@ class PythonItemExporter(BaseItemExporter):
.. _msgpack: https://pypi.org/project/msgpack/
"""

def _configure(self, options, dont_fail=False):
def _configure(self, options: Dict[str, Any], dont_fail: bool = False) -> None:
super()._configure(options, dont_fail)
if not self.encoding:
self.encoding = "utf-8"

def serialize_field(self, field, name, value):
serializer = field.get("serializer", self._serialize_value)
def serialize_field(
self, field: Union[Mapping[str, Any], Field], name: str, value: Any
) -> Any:
serializer: Callable[[Any], Any] = field.get(
"serializer", self._serialize_value
)
return serializer(value)

def _serialize_value(self, value):
def _serialize_value(self, value: Any) -> Any:
if isinstance(value, Item):
return self.export_item(value)
if is_item(value):
Expand All @@ -347,10 +363,10 @@ def _serialize_value(self, value):
return to_unicode(value, encoding=self.encoding)
return value

def _serialize_item(self, item):
def _serialize_item(self, item: Any) -> Iterable[Tuple[Union[str, bytes], Any]]:
for key, value in ItemAdapter(item).items():
yield key, self._serialize_value(value)

def export_item(self, item):
result = dict(self._get_serialized_fields(item))
def export_item(self, item: Any) -> Dict[Union[str, bytes], Any]: # type: ignore[override]
result: Dict[Union[str, bytes], Any] = dict(self._get_serialized_fields(item))
return result

0 comments on commit 8985a04

Please sign in to comment.