Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upload file with boto, download it with boto3: file gets corrupted (wrong md5 sum) #816

Closed
dtenenba opened this issue Jan 24, 2017 · 16 comments
Labels

Comments

@dtenenba
Copy link
Contributor

Hi,

The following code uploads a file to a mock S3 bucket using boto, and downloads the same file to the local disk using boto3. I apologize for bringing both of the libraries into this, but the code I am testing in real life still uses both (definitely trying to get rid of all the boto code and fully migrate to boto3 but that isn't going to happen right away).

What happens is that the resulting file does not have the same md5 sum as the original file so it has been corrupted at some point (not sure if it was during the boto upload or the boto3 download).

This seems to be an issue with moto because if I comment out the line @moto.mock_s3 (using 'real' S3) the script works fine (I also need to change the bucket name to a unique one to avoid collisions).

The script keeps looping (doing the upload/download/md5sum comparison) until it fails (because in my real project this would not happen every time) but this test script seems to fail (for me anyway) on the first attempt every time.

The test file that it uploads/downloads is available here.

You can download it with:

curl -O  https://s3-us-west-2.amazonaws.com/demonstrate-moto-problem/K158154-Mi001716_S1_L001_R1_001.fastq.gz

At this point if you run md5sum on it you should get 6083801a29ef4ebf78fbbed806e6ab2c:

$ md5sum K158154-Mi001716_S1_L001_R1_001.fastq.gz
6083801a29ef4ebf78fbbed806e6ab2c  K158154-Mi001716_S1_L001_R1_001.fastq.gz

Here is the test script (motoprob.py):

import sys
import os
import hashlib
import moto
import boto
import boto3

def md5(fname):
    hash_md5 = hashlib.md5()
    with open(fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()



@moto.mock_s3
def doit():
    # upload file to s3
    conn = boto.connect_s3()
    bkt = conn.create_bucket("mybucket")
    key = boto.s3.key.Key(bkt)
    key.key = "foo/bar.fastq.gz"
    print("Uploading...")

    # You can get this file from:
    #  https://s3-us-west-2.amazonaws.com/demonstrate-moto-problem/K158154-Mi001716_S1_L001_R1_001.fastq.gz
    key.set_contents_from_filename("K158154-Mi001716_S1_L001_R1_001.fastq.gz")

    # download it again
    dlfile = "bar.fastq.gz"
    if os.path.exists(dlfile):
        os.remove(dlfile)

    print("Downloading...")

    client = boto3.client('s3')
    client.download_file(Bucket="mybucket",
      Key="foo/bar.fastq.gz", Filename="bar.fastq.gz")


    md5sum = md5(dlfile)
    if not md5sum == "6083801a29ef4ebf78fbbed806e6ab2c":
        print("Incorrect md5sum! {}").format(md5sum)
        sys.exit(1)


while True:
    doit()

Version info:

$ pip freeze |grep oto
boto==2.42.0
boto3==1.4.0
botocore==1.4.48
moto==0.4.29

$ python --version
Python 2.7.12

$ uname -a
Linux f51bec2ad3be 4.9.4-moby #1 SMP Wed Jan 18 17:04:43 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux

$ more /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.1 LTS"

Other ways to see that the resulting file is not the same as the original:

$ diff bar.fastq.gz /data/2016-10-27-PT140/K158154-Mi001716_S1_L001_R1_001.fastq.gz
Binary files bar.fastq.gz and /data/2016-10-27-PT140/K158154-Mi001716_S1_L001_R1_001.fastq.gz differ


$ zcat bar.fastq.gz > bar.fastq # this works for the original file

gzip: bar.fastq.gz: invalid compressed data--crc error

gzip: bar.fastq.gz: invalid compressed data--length error
@dtenenba
Copy link
Contributor Author

A clarification: one reason my project still has both boto and boto3 code in it is because I ran into a previous issue (#703, which though closed is still affecting me, or something like it is affecting me).

If I change the sample script above to use boto3 for uploading, changing the upload section to:

    client = boto3.client('s3')
    client.create_bucket(Bucket="mybucket")
    client.upload_file(
        Filename="K158154-Mi001716_S1_L001_R1_001.fastq.gz",
        Bucket="mybucket",
        Key="foo/bar.fastq.gz")

Then I get the following error, which is what happens with #703 as well:

Uploading...
Traceback (most recent call last):
  File "motoprob.py", line 58, in <module>
    doit()
  File "/envs/wenv/local/lib/python2.7/site-packages/moto/core/models.py", line 71, in wrapper
    result = func(*args, **kwargs)
  File "motoprob.py", line 32, in doit
    Key="foo/bar.fastq.gz")
  File "/envs/wenv/local/lib/python2.7/site-packages/boto3/s3/inject.py", line 105, in upload_file
    extra_args=ExtraArgs, callback=Callback)
  File "/envs/wenv/local/lib/python2.7/site-packages/boto3/s3/transfer.py", line 253, in upload_file
    filename, '/'.join([bucket, key]), e))
boto3.exceptions.S3UploadFailedError: Failed to upload K158154-Mi001716_S1_L001_R1_001.fastq.gz to mybucket/foo/bar.fastq.gz: An error occurred (InvalidPart) when calling the CompleteMultipartUpload operation: One or more of the specified parts could not be found. The part might not have been uploaded, or the specified entity tag might not have matched the part's entity tag.

So I tweaked my code to use boto for uploads (if I am running unit tests, otherwise I use boto3).

If I change the uploading section yet again to use a boto3 resource instead of a client:

    resource = boto3.resource("s3")
    resource.create_bucket(Bucket="mybucket")
    bucket = resource.Bucket("mybucket")
    with open("K158154-Mi001716_S1_L001_R1_001.fastq.gz") as f:
        bucket.put_object(Key="foo/bar.fastq.gz", Body=f)

...I get the same problem as I had originally (incorrect md5 sum).

So I guess I could get rid of the boto code and replace it with boto3 resource code, then I'd only be dealing with one bug instead of two, but I'd still be stuck with this one...

Anyway, for completeness' sake I'll post the full version of the script modified so it only uses boto3 and not boto as well:

import sys
import os
import hashlib
import moto
import boto3

def md5(fname):
    hash_md5 = hashlib.md5()
    with open(fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()



@moto.mock_s3
def doit():
    print("Uploading...")


    resource = boto3.resource("s3")
    resource.create_bucket(Bucket="mybucket")
    bucket = resource.Bucket("mybucket")
    with open("K158154-Mi001716_S1_L001_R1_001.fastq.gz") as f:
        bucket.put_object(Key="foo/bar.fastq.gz", Body=f)



    # You can get this file from:
    #  https://s3-us-west-2.amazonaws.com/demonstrate-moto-problem/K158154-Mi001716_S1_L001_R1_001.fastq.gz
    # key.set_contents_from_filename("K158154-Mi001716_S1_L001_R1_001.fastq.gz")

    # download it again
    dlfile = "bar.fastq.gz"
    if os.path.exists(dlfile):
        os.remove(dlfile)

    print("Downloading...")

    client = boto3.client('s3')
    client.download_file(Bucket="mybucket",
      Key="foo/bar.fastq.gz", Filename="bar.fastq.gz")


    md5sum = md5(dlfile)
    if not md5sum == "6083801a29ef4ebf78fbbed806e6ab2c":
        print("Incorrect md5sum! {}").format(md5sum)
        sys.exit(1)


while True:
    doit()

@snordhausen
Copy link
Contributor

Hi!

First, you should update your boto/moto code. You have

$ pip freeze |grep oto
boto==2.42.0
boto3==1.4.0
botocore==1.4.48
moto==0.4.29

The current versions are

boto==2.45.0
boto3==1.4.4
botocore==1.5.5
moto==0.4.31

I'm running your first example to reproduce the issue.

Using moto 0.4.23 (forgot to update that one) and boto/boto3 at the current version, it took 4 attempts before an error occurred.

After updating to moto 0.4.31, I have now 15 successful runs and no errors.
I'll keep it running to see if I eventually get an error, but it seems that updating moto to the current version helped.

All running on Ubuntu 14.04 LTS and Python 2.7.10, so quite similar to your setup.

@snordhausen
Copy link
Contributor

After a total of 27 runs with your first example code, I got this exception:

Uploading...
Traceback (most recent call last):
  File "moto_bug.py", line 52, in <module>
    doit()
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/moto/core/models.py", line 71, in wrapper
    result = func(*args, **kwargs)
  File "../moto_bug.py", line 28, in doit
    key.set_contents_from_filename("K158154-Mi001716_S1_L001_R1_001.fastq.gz")
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/boto/s3/key.py", line 1362, in set_contents_from_filename
    encrypt_key=encrypt_key)
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/boto/s3/key.py", line 1293, in set_contents_from_file
    chunked_transfer=chunked_transfer, size=size)
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/boto/s3/key.py", line 750, in send_file
    chunked_transfer=chunked_transfer, size=size)
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/boto/s3/key.py", line 951, in _send_file_internal
    query_args=query_args
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/boto/s3/connection.py", line 668, in make_request
    retry_handler=retry_handler
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/boto/connection.py", line 1071, in make_request
    retry_handler=retry_handler)
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/boto/connection.py", line 940, in _mexe
    request.body, request.headers)
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/boto/s3/key.py", line 882, in sender
    if not self.should_retry(response, chunked_transfer):
  File "/home/snordhausen/venv/local/lib/python2.7/site-packages/boto/s3/key.py", line 983, in should_retry
    '%s vs. %s' % (self.etag, self.md5))
boto.exception.S3DataError: BotoClientError: ETag from S3 did not match computed MD5. "6ba32ddb2902929f0e488c2b1c6faf1c" vs. 6083801a29ef4ebf78fbbed806e6ab2c

Not sure though if that is a bug in boto (wouldn't be the first...) or moto.

I'll retry using your boto3-only example.

@snordhausen
Copy link
Contributor

Your second example code with only boto3 also fails every 10th or 20th time. Error messages were

Incorrect md5sum! 53c03ecd1e61dc7d1cd01f58a1435e8c
Incorrect md5sum! 55196dafb5899178a3c111f8564ae865

so it downloaded different data each time. The length of the downloaded data is correct, though!

A much faster way to reproduce the bug is to upload only once, then download many times. This also shows that the bug occurs during the download.

@snordhausen
Copy link
Contributor

The documentation says

Threads are used by default in the managed transfer methods.
To ensure no threads are used in the transfer process, set use_threads to False.

I recall reading that the http library that moto uses underneath is not thread-safe. After modifying your code to

        from boto3.s3.transfer import TransferConfig
        config = TransferConfig(use_threads=False)
        client.download_file(Bucket="mybucket",
          Key="foo/bar.fastq.gz", Filename="bar.fastq.gz", Config=config)

I now have 6,000 downloads without a single error.

@dtenenba
Copy link
Contributor Author

Thanks so much for all the great info. Did you ever get errors after updating moto/boto/boto3?
If not, then the solution is simple.

Otherwise I will make sure I don't use threads when using moto.

Thanks again.

@snordhausen
Copy link
Contributor

Except for the very first test (where I forgot to update moto), all test were run with the most recent version. So a version upgrade will not fix this issue.

However, it would be interesting to add the following feature to moto: When @mock_s3 is in effect, enforce that use_threads=False is used in the managed transfer methods.

@snordhausen
Copy link
Contributor

Turns out that disabling multi-threading in managed transfer methods is quite easy. With this change in moto, your original code works fine:

diff --git a/moto/core/models.py b/moto/core/models.py
index 60e744f..03d1390 100644
--- a/moto/core/models.py
+++ b/moto/core/models.py
@@ -4,6 +4,7 @@ import functools
 import inspect
 import re
 
+import boto3
 from httpretty import HTTPretty
 from .responses import metadata_response
 from .utils import convert_regex_to_flask_path
@@ -11,6 +12,7 @@ from .utils import convert_regex_to_flask_path
 
 class MockAWS(object):
     nested_count = 0
+    original_create_transfer_manager = None
 
     def __init__(self, backends):
         self.backends = backends
@@ -38,6 +40,15 @@ class MockAWS(object):
         if not HTTPretty.is_enabled():
             HTTPretty.enable()
 
+        if self.__class__.original_create_transfer_manager is None:
+            boto3.client('s3') # Ensure that boto.s3 exists
+            original_create_transfer_manager = boto3.s3.transfer.create_transfer_manager
+            self.__class__.original_create_transfer_manager = original_create_transfer_manager
+            def patched_create_transfer_manager(client, config, *args, **kwargs):
+                config.use_threads = False
+                return original_create_transfer_manager(client, config, *args, **kwargs)
+            boto3.s3.transfer.create_transfer_manager = patched_create_transfer_manager
+
         for method in HTTPretty.METHODS:
             backend = list(self.backends.values())[0]
             for key, value in backend.urls.items():
@@ -63,6 +74,8 @@ class MockAWS(object):
         if self.__class__.nested_count == 0:
             HTTPretty.disable()
             HTTPretty.reset()
+            boto3.s3.transfer.create_transfer_manager = self.__class__.original_create_transfer_manager
+            self.__class__.original_create_transfer_manager = None
 
     def decorate_callable(self, func, reset):
         def wrapper(*args, **kwargs):

@spulec: Should I create a pull request for that or do you see a more elegant way of doing this?

@dtenenba
Copy link
Contributor Author

@snordhausen does that patch presume that the user is using boto3 and not boto?

@snordhausen
Copy link
Contributor

Yes, it only fixes the issue for boto3. Which will be good enough for most people.

But now that you mention it: The code also assumes that you have boto3 installed. It will fail with an ImportError if you do not have boto3 installed (e.g. because you are only using boto and you are working with a virtualenv). It's simple to fix, though.

I'll create a new version of the file tomorrow and then you can give it a try.

@snordhausen
Copy link
Contributor

@dtenenba I created a new branch in my fork which has an improved version of the above patch. For testing, could you

Then, run your boto3-only example code with the hand-patched moto. This should fix your problem.

@snordhausen
Copy link
Contributor

@dtenenba Did you have a chance to test the patch?

@spulec
Copy link
Collaborator

spulec commented Feb 9, 2017

@snordhausen A PR would be very much welcome.

I'm also trying to think about long-term how we can get away from HTTPretty and move toward a system that will work with multiple threads.

@aalvrz
Copy link
Contributor

aalvrz commented Jun 21, 2018

I've been experiencing an issue with file uploads to S3, and after reading through this issue I think it's also related. However, I am using multiple processes for parallel uploads (using multiprocessing module).

If I upload 10 files in parallel, when I want to check that the bucket has 10 files, it fails and says that there are no files in the bucket.

On the other hand, if I test the upload sequentially, it works as expected.

Is there any fix around this while still keeping my application code multi-process?

@davidahern
Copy link

davidahern commented Dec 17, 2020

hi, i'm getting similar issues writing parquet from Spark to moto server. Checksum mismatch.

There are workarounds (not great from Python perspective, better for Scale), as follows, but it would be great if this could be fixed
https://medium.com/@sumitsu/unit-testing-aws-s3-integrated-scala-spark-components-using-local-s3-mocking-tools-8bb90fd58fa2

FYI, it looks like there was a similar issue in localstack which is now fixed.
localstack/localstack#869
localstack/localstack#1571
localstack/localstack#538 (comment)
localstack/localstack#359

And this is how i create my Spark Session in Python
ss = SparkSession.builder.appName('test').getOrCreate()
conf = ss._jsc.hadoopConfiguration()
conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
conf.set('fs.s3a.access.key', 'test')
conf.set('fs.s3a.secret.key', 'test')
conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("fs.s3a.path.style.access", "true")
conf.set("fs.s3a.endpoint", "http://127.0.0.1:5000")

i am testing using gluepyspark which has all AWS dependencies.
if you need any more information, please let me know
thanks

E py4j.protocol.Py4JJavaError: An error occurred while calling o117.parquet.
E : org.apache.hadoop.fs.s3a.AWSClientIOException: innerMkdirs on s3a://dummy-provenance-bucket/prov/_temporary/0: com.amazonaws.SdkClientException: Unable to verify integrity of data upload. Client calculated content hash (contentMD5: 1B2M2Y8AsgTpgAmY7PhCfg== in base 64) didn't match hash (etag: 6c77584be3b5f02ce48d681b13d1244b in hex) calculated by Amazon S3. You may need to delete the data stored in Amazon S3. (metadata.contentMD5: null, md5DigestStream: com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream@f225481, bucketName: dummy-provenance-bucket, key: prov/_temporary/0/): Unable to verify integrity of data upload. Client calculated content hash (contentMD5: 1B2M2Y8AsgTpgAmY7PhCfg== in base 64) didn't match hash (etag: 6c77584be3b5f02ce48d681b13d1244b in hex) calculated by Amazon S3. You may need to delete the data stored in Amazon S3. (metadata.contentMD5: null, md5DigestStream: com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream@f225481, bucketName: dummy-provenance-bucket, key: prov/_temporary/0/)
E at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:128)
E at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:101)
E at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:1484)
E at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1914)
E at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:343)
E at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:162)
E at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
E at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
E at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
E at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
E at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
E at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
E at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
E at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
E at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
E at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
E at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
E at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
E at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
E at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
E at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
E at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
E at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.lang.reflect.Method.invoke(Method.java:498)
E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E at py4j.Gateway.invoke(Gateway.java:282)
E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at py4j.commands.CallCommand.execute(CallCommand.java:79)
E at py4j.GatewayConnection.run(GatewayConnection.java:238)
E at java.lang.Thread.run(Thread.java:748)
E Caused by: com.amazonaws.SdkClientException: Unable to verify integrity of data upload. Client calculated content hash (contentMD5: 1B2M2Y8AsgTpgAmY7PhCfg== in base 64) didn't match hash (etag: 6c77584be3b5f02ce48d681b13d1244b in hex) calculated by Amazon S3. You may need to delete the data stored in Amazon S3. (metadata.contentMD5: null, md5DigestStream: com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream@f225481, bucketName: dummy-provenance-bucket, key: prov/_temporary/0/)
E at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1838)
E at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1772)
E at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
E at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
E at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:143)
E at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:48)
E at java.util.concurrent.FutureTask.run(FutureTask.java:266)
E at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
E at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
E ... 1 more

@bblommers
Copy link
Collaborator

There have been a few improvements in how we handle md5sums/etags since 2020 - is anyone still running into issues using the latest version of Moto?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants