Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add s3 support for staging local dependencies in sparkctl #144 #179

Merged
merged 2 commits into from
Aug 9, 2018

Conversation

mrow4a
Copy link
Contributor

@mrow4a mrow4a commented Jun 7, 2018

Description

Adds support for staging local dependencies in sparkctl (solves #144 )

To bo done

  • Implement S3 support
  • Adjust GCS
  • Update docs

How did I test

export AWS_ACCESS_KEY_ID=<redacted>
export AWS_SECRET_ACCESS_KEY=<redacted>
./sparkctl create \
./jobs/spark-pi.yaml \
--upload-to s3a://<gcs-bucket>
export GOOGLE_APPLICATION_CREDENTIALS=~/<redacted>.json
./sparkctl create \
./jobs/spark-pi.yaml \
--upload-to gs://<gcs-bucket>

@liyinan926 @prasanthkothuri

@liyinan926 liyinan926 mentioned this pull request Jun 10, 2018
13 tasks
@liyinan926
Copy link
Collaborator

@kow3ns.

@mrow4a
Copy link
Contributor Author

mrow4a commented Jun 11, 2018

Do you have some suggestions to the code itself? I am not sure if I can start adding documentation and update go-deps definitions, as I am not sure whether implementation needs some adjustments.

@liyinan926
Copy link
Collaborator

liyinan926 commented Jun 11, 2018

@mrow4a GCS also supports regional access and allows users to specify a region. I plan to use the new flag region in this PR for that. However, the default storage class of GCS is multi-regional so by default region needs to be empty for GCS. What's the reason for forcing regional access for S3? Other than that, I think the code looks good in general.

Copy link
Collaborator

@liyinan926 liyinan926 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run go fmt to format the code.

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"path/filepath"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Golang built in packages should be on top.

if err != nil {
return nil, err
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary extra line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done



uploader := &s3Uploader{
client: s3manager.NewUploader(sess),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the client need to be closed when done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not find any note that this is necessary in aws sdk

@mrow4a mrow4a force-pushed the upstream_s3_support branch 2 times, most recently from f401977 to e1bbd0d Compare June 12, 2018 16:16
```

NOTE: In Spark 2.3.0 with init-containers used for Spark with Kubernetes as resource manager,
the dependencies are not mounted in proper location (e.g. application file is not mounted in jars folder)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the problem I had previously with s3a deps - the init container incorrectly fetches (mounts) e.g. main application file. I think I need to investigate issue more with spark/master @liyinan926

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark 2.4.0 (master branch) solves the issue

return ""
}

return fmt.Sprintf("%s://%s.%s/%s",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ensured to be fully compatible with AWS, and thus it requires <endpoint-protocol>://<bucket>.<endpoint-url>/path/to/file scheme.

@mrow4a
Copy link
Contributor Author

mrow4a commented Jun 13, 2018

@liyinan926 I have been debugging S3 issue with Spark 2.3.0

I found that the S3 dependencies are wrongly fetched because of spark-init container. I have been testing with Spark 2.4.0 (apache/spark:master) and all works fine. Please see the analysis log

Configuration

$ cat /opt/spark/conf/spark-defaults.conf

spark.hadoop.fs.s3a.endpoint      cs3.cern.ch
spark.hadoop.fs.s3a.access.key    <redacted>
spark.hadoop.fs.s3a.secret.key    <redacted>
spark.hadoop.fs.s3a.impl          org.apache.hadoop.fs.s3a.S3AFileSystem
spark.eventLog.enabled            true
spark.eventLog.dir                s3a://spark-cluster-test/spark-events
spark.history.fs.logDirectory     s3a://spark-cluster-test/spark-events
spark.logConf true

LOG with Spark 2.3.0 LOCAL (Docker image "gitlab-registry.cern.ch/db/spark-service/docker-registry/spark-on-k8s:v2.3.0-xrootd-s3"):

$ /opt/spark/bin/spark-submit --master local --class org.sparkservice.sparkrootapplications.examples.SparkPi s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar

2018-06-13 09:41:16 INFO  SparkContext:54 - Added JAR s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar at s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar with timestamp 1528875676885
2018-06-13 09:41:16 INFO  Executor:54 - Starting executor ID driver on host localhost
...
2018-06-13 09:41:18 INFO  Executor:54 - Running task 0.0 in stage 0.0 (TID 0)
2018-06-13 09:41:18 INFO  Executor:54 - Fetching s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar with timestamp 1528875676885
2018-06-13 09:41:18 INFO  Utils:54 - Fetching s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar to /tmp/spark-abb7b212-9329-42a4-bcf5-8653794db86b/userFiles-b81721c0-d093-4ba2-a893-2250b2ecb7af/fetchFileTemp8539294104865314818.tmp
2018-06-13 09:41:18 INFO  Executor:54 - Adding file:/tmp/spark-abb7b212-9329-42a4-bcf5-8653794db86b/userFiles-b81721c0-d093-4ba2-a893-2250b2ecb7af/spark-service-examples_2.11-0.0.1.jar to class loader
2018-06-13 09:41:18 INFO  Executor:54 - Finished task 0.0 in stage 0.0 (TID 0). 910 bytes result sent to driver

JOB COMPLETED WITH SUCCESS

LOG with Spark 2.3.0 KUBERNETES (Docker image "gitlab-registry.cern.ch/db/spark-service/docker-registry/spark-on-k8s:v2.3.0"):

$ kubectl describe sparkapplication spark-pi
    Name:         spark-pi
    Namespace:    default
    Labels:       <none>
    Annotations:  <none>
    API Version:  sparkoperator.k8s.io/v1alpha1
    Kind:         SparkApplication
    Spec:
        Image:                  gitlab-registry.cern.ch/db/spark-service/docker-registry/spark-on-k8s:v2.3.0-xrootd-s3
        Image Pull Policy:      Always
        Main Application File:  s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar
        Main Class:             org.sparkservice.sparkrootapplications.examples.SparkPi

$ kubectl logs spark-pi-dc261b23e2e730bfb8a883d1a4d67588-driver -c spark-init
...
2018-06-13 09:40:46 INFO  SparkPodInitContainer:54 - Downloading remote jars: Some(s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar,s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar)
2018-06-13 09:40:46 INFO  SparkPodInitContainer:54 - Downloading remote files: Some(s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/root4j-0.1.6.jar)
2018-06-13 09:40:48 INFO  SparkPodInitContainer:54 - Finished downloading application dependencies.
...

$ kubectl logs spark-pi-dc261b23e2e730bfb8a883d1a4d67588-driver
...
exec /sbin/tini -s -- /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/bin/java -Dspark.driver.cores=1.000000 -Dspark.kubernetes.executor.label.version=2.3.0 -Dspark.driver.memory=1024m -Dspark.kubernetes.driver.annotation.sparkoperator.k8s.io/ownerreference=ChBTcGFya0FwcGxpY2F0aW9uGghzcGFyay1waSIkMTA3NWI0YTEtNmVkZC0xMWU4LWIyYjgtZmExNjNlZjc2ZTdkKh1zcGFya29wZXJhdG9yLms4cy5pby92MWFscGhhMQ== -Dspark.app.name=spark-pi -Dspark.kubernetes.container.image.pullPolicy=Always -Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=spark-pi -Dspark.kubernetes.namespace=default -Dspark.driver.blockManager.port=7079 -Dspark.kubernetes.driver.limit.cores=1000m -Dspark.hadoop.fs.s3a.endpoint=<redacted> -Dspark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true -Dspark.kubernetes.submission.waitAppCompletion=false -Dspark.kubernetes.driver.label.version=2.3.0 -Dspark.hadoop.fs.s3a.secret.key=<redacted> -Dspark.master=k8s://https://10.254.0.1:443 -Dspark.executor.cores=2 -Dspark.executor.memory=2048m -Dspark.kubernetes.container.image=gitlab-registry.cern.ch/db/spark-service/docker-registry/spark-on-k8s:v2.3.0-xrootd-s3 -Dspark.jars=s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar,s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar -Dspark.kubernetes.initContainer.configMapKey=spark-init.properties -Dspark.submit.deployMode=cluster -Dspark.hadoop.fs.s3a.access.key=<redacted> -Dspark.driver.host=spark-pi-dc261b23e2e730bfb8a883d1a4d67588-driver-svc.default.svc -Dspark.driver.port=7078 -Dspark.kubernetes.authenticate.driver.serviceAccountName=spark -Dspark.history.fs.update.interval=1s -Dspark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem -Dspark.kubernetes.driver.pod.name=spark-pi-dc261b23e2e730bfb8a883d1a4d67588-driver -Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-id=spark-pi-2416137625 -Dspark.eventLog.dir=s3a://spark-cluster-test/spark-events -Dspark.eventLog.enabled=true -Dspark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true -Dspark.kubernetes.initContainer.configMapName=spark-pi-dc261b23e2e730bfb8a883d1a4d67588-init-config -Dspark.executor.instances=1 -Dspark.app.id=spark-03326e75bafc4620b300ba8f5adbb67e -Dspark.history.fs.logDirectory=s3a://spark-cluster-test/spark-events -Dspark.kubernetes.executor.podNamePrefix=spark-pi-dc261b23e2e730bfb8a883d1a4d67588 -Dspark.files=s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/root4j-0.1.6.jar -Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=spark-pi -Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-id=spark-pi-2416137625 -cp ':/opt/spark/jars/*:/var/spark-data/spark-jars/spark-service-examples_2.11-0.0.1.jar:/var/spark-data/spark-jars/spark-service-examples_2.11-0.0.1.jar' -Xms1024m -Xmx1024m -Dspark.driver.bindAddress=10.100.41.3 org.sparkservice.sparkrootapplications.examples.SparkPi
Error: Could not find or load main class org.sparkservice.sparkrootapplications.examples.SparkPi
...
JOB COMPLETED WITH ERROR (Could not find or load main class org.sparkservice.sparkrootapplications.examples.SparkPi)

LOG with Spark 2.4.0 KUBERNETES (Docker image "gitlab-registry.cern.ch/db/spark-service/docker-registry/spark-on-k8s:master"):

$ kubectl logs spark-pi-1528892994323-driver -c spark-init
Error from server (BadRequest): container spark-init is not valid for pod spark-pi-1528892994323-driver

$ kubectl logs spark-pi-1528892994323-driver
2018-06-13 14:30:05 INFO  SparkContext:54 - Added JAR s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar at s3a://spark-cluster-test/spark-app-dependencies/default/spark-pi/spark-service-examples_2.11-0.0.1.jar with timestamp 1528893005323

JOB COMPLETED WITH SUCCESS

@liyinan926
Copy link
Collaborator

@mrow4a just came back from vacation. Will take a look soon.

@mrow4a
Copy link
Contributor Author

mrow4a commented Jul 3, 2018

@liyinan926 You can try with S3 interoperability - but generally I got error in Spark 2.3 on fetching from s3, and needed to build Spark from master branch (without init containers).

@liyinan926
Copy link
Collaborator

@mrow4a This is worth taking a look at: https://github.com/google/go-cloud. It can potentially simplifies the code around uploading to GCS/S3.

@mrow4a
Copy link
Contributor Author

mrow4a commented Jul 28, 2018

@liyinan926 will check it, so far looks promising.

@mrow4a
Copy link
Contributor Author

mrow4a commented Aug 1, 2018

@liyinan926 I spent some time on implementing go-cloud, and it is very nice as it allows to cut a lot of code for uploading. However, because go-cloud does not support setting ACLs (public read), the code becomes a bit complicated.

Should I make common code for upload, and dedicated for setting ACLs? (I guess at some point package will have ACL support)

I will add new commit here to show-case it

@mrow4a mrow4a changed the title Add s3 support for staging local dependencies in sparkctl #144 [WIP] Add s3 support for staging local dependencies in sparkctl #144 Aug 1, 2018
@mrow4a
Copy link
Contributor Author

mrow4a commented Aug 1, 2018

@liyinan926 could you have a look into s3 implementation (I for now skipped GSC)

@liyinan926
Copy link
Collaborator

@mrow4a I will take a look soon.

@mrow4a
Copy link
Contributor Author

mrow4a commented Aug 2, 2018

@liyinan926 I have been playing with GCS client, and this is not trivial to set ACLs (meaning that cloud-go does not support it, and two HTTPClients have to be maintained).

@mrow4a mrow4a changed the title [WIP] Add s3 support for staging local dependencies in sparkctl #144 Add s3 support for staging local dependencies in sparkctl #144 Aug 2, 2018
@mrow4a mrow4a force-pushed the upstream_s3_support branch 2 times, most recently from 070ca2c to 3730b25 Compare August 2, 2018 12:30
@mrow4a
Copy link
Contributor Author

mrow4a commented Aug 2, 2018

@liyinan926 I added and tested the support for S3A and GCS using go-cloud (including documentation) - please review.

@mrow4a mrow4a force-pushed the upstream_s3_support branch 4 times, most recently from df7a8c1 to 405b290 Compare August 3, 2018 09:14
@mrow4a
Copy link
Contributor Author

mrow4a commented Aug 6, 2018

I did rebase and updated PR Description in the top.

@@ -31,15 +31,19 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
clientset "k8s.io/client-go/kubernetes"

"context"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two imports should be in their respective groups.

fileName := filepath.Base(localFilePath)
uploadFilePath := filepath.Join(uploadPath, fileName)
// Check if exists by trying to fetch metadata
_, err := bh.bucket.NewRangeReader(bh.ctx, uploadFilePath, 0, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure the reader gets closed.

reader, err := bh.bucket.NewRangeReader(bh.ctx, uploadFilePath, 0, 0)
defer reader.Close()

if err != nil {
return "", fmt.Errorf("Failed to read file: %s", err)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add defer data.Close().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data is not a handle, is actual bytes (buf.Bytes())

return "", fmt.Errorf("Failed to read file: %s", err)
}

w, err := bh.bucket.NewWriter(bh.ctx, uploadFilePath, nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Need to make sure w gets closed, by adding defer w.Close() after the error check below.

return nil, fmt.Errorf(
"unable to upload local dependencies: no upload location specified via --upload-to")
}

uploadLocationUrl, err := url.Parse(UploadTo)
var bh *BlobHandler
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like bh and ctx are not needed until the start of the switch block.

func (blob GCSBlob) setPublicACL(
ctx context.Context,
bucket string,
filePath string) error {
client, err := storage.NewClient(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we still need to use this gcs-specific client for setting public ACL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This already kills a lot of redundant code, but ACL are not implemented in cloud-go yet. This cloud-go seems a very young project.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, can we add a TODO to get rid of this gcs-specific code once cloud-go adds support for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@puneetloya
Copy link

this change would let spark use s3 as a hadoop endpoint(checkpoint), yes?

uploadFilePath := filepath.Join(uploadPath, fileName)
// Check if exists by trying to fetch metadata
reader, err := bh.bucket.NewRangeReader(bh.ctx, uploadFilePath, 0, 0)
defer reader.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you don't even need to use defer here since it's closed immediately after being created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to use defer as reader is not used anywhere below, it's closed right after it's created.

// Write data to bucket and close bucket writer
_, writeErr := w.Write(data)
if err := w.Close(); err != nil {
return "", fmt.Errorf("Failed to close bucket writer: %s", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use lower case letter for the first word of the error string in this package.


// Check if write has been successful
if writeErr != nil {
return "", fmt.Errorf("Failed to write to bucket: %s", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Failed -> failed.

@mrow4a
Copy link
Contributor Author

mrow4a commented Aug 7, 2018

@puneetloya It will allow to do the same as for GCS. Specificaly, to upload your local dependency to your AWS S3 or other S3 compatible storage using a aws-go-sdk via s3://path. It will also add to job definition your uploaded dependency with url s3a://path. Of course you need to have your s3a access and key specified in hadoopConf or sparkConf. S3 is a objectstorage endpoint, S3A is hadoop-s3-connector.

@liyinan926 I think not to confuse people, I need to add scheme as s3, but translate it to s3a.

@liyinan926
Copy link
Collaborator

@liyinan926 I think not to confuse people, I need to add scheme as s3, but translate it to s3a.

That's a good idea.

@mrow4a
Copy link
Contributor Author

mrow4a commented Aug 8, 2018

Clarified the README and adjusted to review @liyinan926

Copy link
Collaborator

@liyinan926 liyinan926 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after the latest comments get addressed.

fileName := filepath.Base(localFilePath)
uploadFilePath := filepath.Join(uploadPath, fileName)
// Check if exists by trying to fetch metadata
_, err := uh.b.NewRangeReader(uh.ctx, uploadFilePath, 0, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I didn't make it clear. You need to close the returned Reader, but you don't need to use defer reader.Close(). Simply do the following is fine.

reader, err := uh.b.NewRangeReader(uh.ctx, uploadFilePath, 0, 0)
reader.Close()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, ok - now I got your point.

spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
```

NOTE: In Spark 2.3.0 init-containers are used for Kubernetes as resource manager. In future versions, init-containers are removed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for Kubernetes as resource manager -> for downloading remote application dependencies. Also this applies to any Spark 2.3.x versions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -267,28 +272,118 @@ func isContainerLocalFile(file string) (bool, error) {
return false, nil
}

type Blob interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like both Blob and UploadHandler are only used inside the cmd package, so they should both start with lower-case letters.

Add documentation for staging local dependencies in s3a
@liyinan926 liyinan926 merged commit 8d95f53 into kubeflow:master Aug 9, 2018
@mrow4a mrow4a deleted the upstream_s3_support branch August 9, 2018 16:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants