Skip to content

Postprocessing feeds do not work for S3 feed storage #5500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Nykakin opened this issue May 14, 2022 · 1 comment · Fixed by #5581
Closed

Postprocessing feeds do not work for S3 feed storage #5500

Nykakin opened this issue May 14, 2022 · 1 comment · Fixed by #5581
Labels

Comments

@Nykakin
Copy link

Nykakin commented May 14, 2022

Description

Example settings:

FEEDS = {
    "s3://base/file_%(batch_id)05d.gz": {
        "format": "csv",
        "postprocessing": [GzipPlugin],
        "gzip_compresslevel": 5,
    },
}
FEED_EXPORT_BATCH_ITEM_COUNT = 50000

This causes an exception:

2022-05-14 01:02:12 [scrapy.extensions.feedexport] ERROR: Error storing csv feed (15 items) in: s3://base/file_00001.gz
Traceback (most recent call last):
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/twisted/python/threadpool.py", line 244, in inContext
    result = inContext.theWork()  # type: ignore[attr-defined]
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/twisted/python/threadpool.py", line 260, in <lambda>
    inContext.theWork = lambda: context.call(  # type: ignore[attr-defined]
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/twisted/python/context.py", line 117, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/twisted/python/context.py", line 82, in callWithContext
    return func(*args, **kw)
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/scrapy/extensions/feedexport.py", line 194, in _store_in_thread
    file.seek(0)
io.UnsupportedOperation: seek

Apparently scrapy.extensions.postprocessing.PostProcessingManager doesn't fully implement file protocol. Adding this method to the class:

    def seek(self, offset, whence=SEEK_SET):
        return self.file.seek(offset, whence)

Cause an exception in a different place:

Traceback (most recent call last):
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/twisted/python/threadpool.py", line 244, in inContext
    result = inContext.theWork()  # type: ignore[attr-defined]
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/twisted/python/threadpool.py", line 260, in <lambda>
    inContext.theWork = lambda: context.call(  # type: ignore[attr-defined]
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/twisted/python/context.py", line 117, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/twisted/python/context.py", line 82, in callWithContext
    return func(*args, **kw)
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/scrapy/extensions/feedexport.py", line 196, in _store_in_thread
    self.s3_client.put_object(
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/botocore/client.py", line 395, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/botocore/client.py", line 695, in _make_api_call
    request_dict = self._convert_to_request_dict(
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/botocore/client.py", line 745, in _convert_to_request_dict
    request_dict = self._serializer.serialize_to_request(
  File "/home/mariusz/Documents/Praca/venv_wgsn/lib/python3.10/site-packages/botocore/validate.py", line 360, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter Body, value: <scrapy.extensions.postprocessing.PostProcessingManager object at 0x7f1f245c6920>, type: <class 'scrapy.extensions.postprocessing.PostProcessingManager'>, valid types: <class 'bytes'>, <class 'bytearray'>, file-like object

Apparently boto excepts a read() method to be present as well (here).

Tried to add read() method to scrapy.extensions.postprocessing.PostProcessingManager as well but I only received an incomplete file. I think it's possible because gzip.GzipFile use some buffering so it only save full file when close() is called on it. Since S3FeedStorage uses internally tempfile.NamedTemporaryFile, this cause the file to disappear right after creation.

PostProcessingManager needs to be refactored so it can handle BlockingFeedStorage correctly.

Versions

$ scrapy version --verbose
Scrapy       : 2.6.1
lxml         : 4.8.0.0
libxml2      : 2.9.12
cssselect    : 1.1.0
parsel       : 1.6.0
w3lib        : 1.22.0
Twisted      : 22.2.0
Python       : 3.10.4 (main, May 11 2022, 11:41:05) [GCC 11.0.1 20210417 (experimental) [master revision c1c86ab96c2:b6fb0ccbb48:8ae
pyOpenSSL    : 22.0.0 (OpenSSL 1.1.1m  14 Dec 2021)
cryptography : 36.0.1
Platform     : Linux-5.11.0-16-generic-x86_64-with-glibc2.33

Additional context

@Nykakin Nykakin changed the title Postprocessing pipelines do not work for S3 feed storage Postprocessing feeds do not work for S3 feed storage May 14, 2022
@Gallaecio Gallaecio added the bug label Jun 21, 2022
@VMRuiz
Copy link
Contributor

VMRuiz commented Jun 22, 2022

I was able to fix this ad hoc for my project. This is what I did:

  1. scrapy.extensions.postprocessing.PostProcessingManager must pass all calls to its seek and read methods to the same methods inside its file attribute.
  2. In GzipPlugin its gzipfile attribute must be closed before trying to read form it.

I was able to monkey patch everything with the following code:

 from scrapy.extensions.postprocessing import PostProcessingManager, GzipPlugin
 
 
 def read(self, *args, **kwargs):
     return self.file.read(*args, **kwargs)
 
 
 def seek(self, *args, **kwargs):
     # Only time seek is executed is when uploading the finished file
     if hasattr(self.head_plugin, "gzipfile") and not self.head_plugin.gzipfile.closed:
         self.head_plugin.gzipfile.flush()
         # It should be safe to close at this point
         self.head_plugin.gzipfile.close()
 
     return self.file.seek(*args, **kwargs)
 
 
 def close(self):
     # Gzip is already closed by PostProcessingManager.seek
     self.file.close()
 
 
 PostProcessingManager.read = read
 PostProcessingManager.seek = seek
 GzipPlugin.close = close

However, this code assumes only GzipPlugin will be used and seek will only be called right before writting the file to s3.
This is not a good solution overall. A more generic solution should be designed thinking about every possible combination of all plugins.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants