Skip to content

Commit

Permalink
Spark compatibility in the S3 gateway (#8115)
Browse files Browse the repository at this point in the history
These are the changes needed to the S3 Gateway to make it work with Spark, and an example of how to use it.

* Handle paths with a trailing slash as if they didn't have one.
* Don't error on trying to read the timestamp on new files in open commits in the S3 Gateway. Spark needs to be able to read them.
* Don't fail GetObject for paths that are simultaneously an imaginary directory and a real file. Instead, read the file. This requires dancing around the fact that InspectFile doesn't work correctly in this case, but ListFile[0] works just fine.

Co-authored-by: Ben Bonenfant <ben.bonenfant@pachyderm.io>
Co-authored-by: Matthew Steffen <msteffen@pachyderm.io>
  • Loading branch information
3 people committed Aug 30, 2022
1 parent 77f598a commit 3e9b4c2
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 17 deletions.
130 changes: 130 additions & 0 deletions examples/spark/s3gateway/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Writing to Pachyderm from Spark using the S3 Gateway

In Pachyderm 2.3.2+, Pachyderm's S3 Gateway has been made compatible with Spark's s3a adapter.
This means that you can write to a Pachyderm repo from a distributed Spark cluster.

⚠️ TODO: 2.3.2 hasn't been released with this fix yet. For now you can use custom pachd image `pachyderm/pachd:18e1a0dc425fa5df8a68148d85074795666ab640` (based on Pachyderm 2.3.x) to test it!

You can connect Spark to Pachyderm's S3 gateway with, for example, options like the following:

```python
conf = SparkConf()
conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:30600")
conf.set('spark.hadoop.fs.s3a.impl', "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set('spark.hadoop.fs.s3a.access.key', 'anything_matching')
conf.set('spark.hadoop.fs.s3a.secret.key', 'anything_matching')
conf.set('spark.hadoop.fs.s3a.path.style.access', 'true')
conf.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
```

You may need to customize these for your own Pachyderm installation.

```python
conf.set("spark.hadoop.fs.s3a.change.detection.mode", 'none')
conf.set("spark.hadoop.fs.s3a.change.detection.version.required", 'false')
```

We need to disable change detection, otherwise Etag issues will cause the Spark job to hang.

You need to ensure you open a commit for the duration of the Spark job, otherwise you will see errors.
This is because Spark's s3a driver writes temporary files with the same names as directories. This works in S3, but not in Pachyderm when closing commits: in this case you get a "file / directory path collision" error on the commits that the S3 gateway generates. Holding a commit open throughout the Spark job allows it to complete without errors.

You can hold a commit open during a job with `python_pachyderm`.

```python
with client.commit("spark-s3g-demo", "master") as commit:
print(f"Opening commit {commit} for spark job")
df.coalesce(1).write.format("parquet").mode("overwrite").save("s3a://master.spark-s3g-demo/example")
print(f"Closing {commit}")
```

⚠️ Don't use pachyderm repo names with an underscore (`_`) in them, s3a fails to connect to these, giving the mysterious error `bucket is null/empty`, hyphens however (`-`) work fine!

Another issue is that the Java S3 driver assumes that ETags are generated as MD5 hashes. Pachyderm doesn't do that, so we have to disable integrity checking on writes.

You can do that with:
```python
sc = SparkContext(conf=conf)
sc.setLogLevel("DEBUG")
sc.setSystemProperty("com.amazonaws.services.s3.disablePutObjectMD5Validation", "true")
```

# Complete example

Putting this all together, we get the following example of writing to Pachyderm from pyspark.
Start by creating the repo and the branch:
```
pachctl create repo spark-s3g-demo
pachctl create branch spark-s3g-demo@master
```

Install the dependencies, Java:
```
sudo apt install openjdk-11-jdk openjdk-11-jre
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
```
PySpark and Python-Pachyderm:
```
pip3 install pyspark==3.3.0
pip3 install python-pachyderm==7.3.2
```
Some jars:
```
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.3/hadoop-aws-3.3.3.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.264/aws-java-sdk-bundle-1.12.264.jar
```

Now write this `spark.py`:

```python
from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.context import SparkContext
from pyspark import SparkConf
import time
import os
import python_pachyderm

conf = SparkConf()
conf.set('spark.hadoop.fs.s3a.endpoint', "http://localhost:30600")
conf.set('spark.hadoop.fs.s3a.impl', "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set('spark.hadoop.fs.s3a.access.key', 'anything_matching')
conf.set('spark.hadoop.fs.s3a.secret.key', 'anything_matching')
conf.set('spark.hadoop.fs.s3a.path.style.access', 'true')
conf.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
conf.set("spark.hadoop.fs.s3a.change.detection.mode", 'none')
conf.set("spark.hadoop.fs.s3a.change.detection.version.required", 'false')

sc = SparkContext(conf=conf)
sc.setLogLevel("DEBUG")
sc.setSystemProperty("com.amazonaws.services.s3.disablePutObjectMD5Validation", "true")

# confirm config is applied to this session
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
sc = spark.sparkContext
conf = sc.getConf()
print(sc.getConf().getAll())

# create some example data
df = spark.createDataFrame([ Row(a=1, b=2.,) ])
df.show()

repo = "spark-s3g-demo"
branch = "master"

client = python_pachyderm.Client()

with client.commit(repo, branch) as commit:
print(f"Opening commit {commit} for spark job")
df.coalesce(1).write.format("parquet").mode("overwrite").save(f"s3a://{branch}.{repo}/example-data")
print(f"Closing {commit}")
```

If necessary (and if you are usuing `localhost` in your endpoint), you may need to run `pachctl port-forward` in another terminal to forward the 30600 port to your Pachyderm instance.

And run it with:
```
spark-submit --jars 'hadoop-aws-3.3.3.jar,aws-java-sdk-bundle-1.12.264.jar' spark.py 2>&1 |tee -a spark-log.txt
```

If you have any issues, please report them to us on Slack, and include the `spark-log.txt` file.
6 changes: 3 additions & 3 deletions src/server/pfs/s3/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/pachyderm/s2"
)

func newContents(fileInfo *pfsClient.FileInfo) (s2.Contents, error) {
func (c *controller) newContents(fileInfo *pfsClient.FileInfo) (s2.Contents, error) {
t, err := types.TimestampFromProto(fileInfo.Committed)
if err != nil {
return s2.Contents{}, err
c.logger.Debugf("Warning: using nil timestamp (file probably in open commit), on error %s", err)
}

return s2.Contents{
Expand Down Expand Up @@ -122,7 +122,7 @@ func (c *controller) ListObjects(r *http.Request, bucketName, prefix, marker, de
return errutil.ErrBreak
}
if fileInfo.FileType == pfsClient.FileType_FILE {
c, err := newContents(fileInfo)
c, err := c.newContents(fileInfo)
if err != nil {
return err
}
Expand Down
42 changes: 28 additions & 14 deletions src/server/pfs/s3/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gogo/protobuf/types"
"github.com/pachyderm/pachyderm/v2/src/internal/errutil"
"github.com/pachyderm/pachyderm/v2/src/pfs"
pfsServer "github.com/pachyderm/pachyderm/v2/src/server/pfs"
"github.com/pachyderm/s2"
)
Expand All @@ -19,9 +20,7 @@ func (c *controller) GetObject(r *http.Request, bucketName, file, version string
c.logger.Debugf("GetObject: bucketName=%+v, file=%+v, version=%+v", bucketName, file, version)

pc := c.requestClient(r)
if strings.HasSuffix(file, "/") {
return nil, s2.NoSuchKeyError(r)
}
file = strings.TrimSuffix(file, "/")

bucket, err := c.driver.bucket(pc, r, bucketName)
if err != nil {
Expand All @@ -43,14 +42,35 @@ func (c *controller) GetObject(r *http.Request, bucketName, file, version string
commitID = version
}

fileInfo, err := pc.InspectFile(bucket.Commit, file)
// We use listFileResult[0] rather than InspectFile result since InspectFile
// on a path that has both a file and a directory in it returns the
// directory. However, ListFile will show it as a file, if it exists.
var firstFile *pfs.FileInfo
err = pc.ListFile(bucket.Commit, file, func(fi *pfs.FileInfo) (retErr error) {
if firstFile == nil {
firstFile = fi
}
return errutil.ErrBreak
})
if err != nil {
return nil, maybeNotFoundError(r, err)
}
if firstFile == nil {
// we never set it, probably zero results
return nil, s2.NoSuchKeyError(r)
}
fileInfo := firstFile

// the exact object named does not exist, but perhaps is a "directory".
// "directories" do not actually exist, and certainly cannot be read.
// ("seeker can't seek")
if fileInfo.File.Path[1:] != file {
return nil, s2.NoSuchKeyError(r)
}

modTime, err := types.TimestampFromProto(fileInfo.Committed)
if err != nil {
return nil, err
c.logger.Debugf("Warning: using nil timestamp (file probably in open commit), on error %s", err)
}

content, err := pc.GetFileReadSeeker(bucket.Commit, file)
Expand All @@ -73,9 +93,7 @@ func (c *controller) CopyObject(r *http.Request, srcBucketName, srcFile string,
c.logger.Tracef("CopyObject: srcBucketName=%+v, srcFile=%+v, srcObj=%+v, destBucketName=%+v, destFile=%+v", srcBucketName, srcFile, srcObj, destBucketName, destFile)

pc := c.requestClient(r)
if strings.HasSuffix(destFile, "/") {
return "", invalidFilePathError(r)
}
destFile = strings.TrimSuffix(destFile, "/")

srcBucket, err := c.driver.bucket(pc, r, srcBucketName)
if err != nil {
Expand Down Expand Up @@ -123,9 +141,7 @@ func (c *controller) PutObject(r *http.Request, bucketName, file string, reader
c.logger.Debugf("PutObject: bucketName=%+v, file=%+v", bucketName, file)

pc := c.requestClient(r)
if strings.HasSuffix(file, "/") {
return nil, invalidFilePathError(r)
}
file = strings.TrimSuffix(file, "/")

bucket, err := c.driver.bucket(pc, r, bucketName)
if err != nil {
Expand Down Expand Up @@ -169,9 +185,7 @@ func (c *controller) DeleteObject(r *http.Request, bucketName, file, version str
c.logger.Debugf("DeleteObject: bucketName=%+v, file=%+v, version=%+v", bucketName, file, version)

pc := c.requestClient(r)
if strings.HasSuffix(file, "/") {
return nil, invalidFilePathError(r)
}
file = strings.TrimSuffix(file, "/")
if version != "" {
return nil, s2.NotImplementedError(r)
}
Expand Down

0 comments on commit 3e9b4c2

Please sign in to comment.