Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark parquet writes to s3 fail with "InvalidDigest: The Content-MD5 you specified was invalid" #1152

Closed
mvpc opened this issue Feb 27, 2019 · 5 comments

Comments

@mvpc
Copy link

mvpc commented Feb 27, 2019

Hi, I am using localstack s3 in unit tests for code where pyspark reads and writes parquet to s3. Reads work great, but during writes I'm encountering InvalidDigest: The Content-MD5 you specified was invalid. The code works just fine with real s3.

Looks like the errors happen when localstack's s3_listener.ProxyListenerS3.forward_request calls check_content_md5. All the failed requests are PUTs into temporary .snappy.parquet files, with empty data, here is an example request:

method: PUT
path: /test-bucket/test-parquet/_temporary/0/task_20190227132138_0003_m_000000/part-00000-5d4171c2-5b5a-4b7e-be13-133e4a7e1919-c000.snappy.parquet
data: ''
headers:
{
        'X-Forwarded-For': '127.0.0.1, 0.0.0.0:4572',
        'access-control-allow-headers': 'authorization,content-type,content-md5,cache-control,x-amz-content-sha256,x-amz-date,x-amz-security-token,x-amz-user-agent',
        'access-control-allow-methods': 'HEAD,GET,PUT,POST,DELETE,OPTIONS,PATCH',
        'access-control-allow-origin': '*',
        'authorization': 'AWS mock:OOqXr2U4IPl0RbwscBtBnBqwKZE=',
        'connection': 'Keep-Alive',
        'content-length': '0',
        'content-md5': 'wvJ/skIYYk7qYDfLRyk3qQ==',
        'content-type': 'application/octet-stream',
        'date': 'Wed, 27 Feb 2019 18:21:39 GMT',
        'etag': 'c2f27fb24218624eea6037cb472937a9',
        'host': 'localhost',
        'last-modified': 'Wed, 27 Feb 2019 18:21:38 GMT',
        'user-agent': 'aws-sdk-java/1.7.4 Mac_OS_X/10.13.6 Java_HotSpot(TM)_64-Bit_Server_VM/25.192-b12/1.8.0_192 com.amazonaws.services.s3.transfer.TransferManager/1.7.4',
        'x-amz-copy-source': '/test-bucket/test-parquet/_temporary/0/_temporary/attempt_20190227132138_0003_m_000000_0/part-00000-5d4171c2-5b5a-4b7e-be13-133e4a7e1919-c000.snappy.parquet',
        'x-amz-metadata-directive': 'REPLACE'
    }
expected md5: c2f27fb24218624eea6037cb472937a9
actual md5: d41d8cd98f00b204e9800998ecf8427e

I see similar problem mentioned in #1140, but that fix did not help with my issue (I did try upgrading to localstack 0.9.0), so looks like this parquet issue is different.

To reproduce

I'm using python 2.7.12 on mac.
Install localstack and pyspark into a virtualenv, add jars for communication with s3:

virtualenv reproduce
source reproduce/bin/activate
pip install localstack==0.9.0
pip install pyspark==2.2.1
cd reproduce/lib/python2.7/site-packages/pyspark/jars/
wget https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
wget https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar
cd ../../../../../../

in one tab start localstack s3:

SERVICES=s3 DEBUG=1 localstack start

in another tab start pyspark shell:

pyspark

in the pyspark shell run the following:

# configure spark for S3
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set('fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
hadoop_conf.set('fs.s3a.access.key', 'mock')
hadoop_conf.set('fs.s3a.secret.key', 'mock')
hadoop_conf.set('mapreduce.fileoutputcommitter.algorithm.version', '2')
spark.conf.set('spark.sql.shuffle.partitions', '1')

# configure spark for localstack s3
hadoop_conf.set('fs.s3a.endpoint', 'http://127.0.0.1:4572')
hadoop_conf.set('fs.s3a.path.style.access', 'true')

# create s3 bucket
import boto3
boto3.resource('s3', endpoint_url='http://127.0.0.1:4572').Bucket('test-bucket').create()

# create tiny dataframe and write it to S3 as parquet
df = spark.createDataFrame([
    ('column 1 value 1', 'column 2 value 1'),
    ('column 1 value 2', 'column 2 value 2'),
    ('column 1 value 3', 'column 2 value 3'),
    ('column 1 value 4', 'column 2 value 4'),
    ('column 1 value 5', 'column 2 value 5'),
    ('column 1 value 6', 'column 2 value 6'),
    ('column 1 value 7', 'column 2 value 7'),
    ('column 1 value 8', 'column 2 value 8'),
], ['column_1', 'column_2'])

df.write.parquet('s3://test-bucket/test-parquet/', mode='overwrite')

observe InvalidDigest errors like

19/02/27 13:21:39 ERROR Utils: Aborting task
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: InvalidDigest, AWS Error Message: The Content-MD5 you specified was invalid, S3 Extended Request ID: null
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.copyObject(AmazonS3Client.java:1507)
	at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:143)
	at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:131)
	at com.amazonaws.services.s3.transfer.internal.CopyMonitor.copy(CopyMonitor.java:189)
	at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:134)
	at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:46)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
19/02/27 13:21:39 ERROR Utils: Aborting task
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: InvalidDigest, AWS Error Message: The Content-MD5 you specified was invalid, S3 Extended Request ID: null
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	... (etc)
@lyle-nel
Copy link

@mvpc I am encountering this same issue. Have found a workaround for this?

@mvpc
Copy link
Author

mvpc commented May 16, 2019

@lyle-nel I ended up monkey-patching localstack's check_content_md5 to get my tests working, obviously not ideal:

from localstack.services.s3 import s3_listener

def noop_check_content_md5(*args, **kwargs):
    pass

s3_listener.check_content_md5 = noop_check_content_md5

@itsukanov
Copy link

Had the same issue during saving with
spark 2.4.0
hadoop-aws 3.2.0

had solved the problem by setting fs.s3a.fast.upload.buffer to bytebuffer

sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.fast.upload.buffer", "bytebuffer")

@whummer
Copy link
Member

whummer commented Nov 29, 2019

Thanks for providing solutions and workarounds @mvpc @iatsukanov @lyle-nel . There has also been a change recently in #1814 (avoid modifying uploaded XML documents) which might potentially fix this issue. Please report here if the problem persists. Thanks

@whummer whummer closed this as completed Nov 29, 2019
@zagovorichev
Copy link

Had the same issue during saving with
spark 2.4.0
hadoop-aws 3.2.0

had solved the problem by setting fs.s3a.fast.upload.buffer to bytebuffer

sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.fast.upload.buffer", "bytebuffer")

It works, thanks! but one more note:
to make it work we have to initialize hadoop_conf.set('fs.s3a.fast.upload', 'true')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants