Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pachyderm and Kubeflow integration #151

Closed
jlewi opened this issue Jan 26, 2018 · 20 comments
Closed

Pachyderm and Kubeflow integration #151

jlewi opened this issue Jan 26, 2018 · 20 comments

Comments

@jlewi
Copy link
Contributor

jlewi commented Jan 26, 2018

Opening this issue to begin discussing if and how we want to integrate Pachyderm with Kubeflow.

/cc @jdoliner @JoeyZwicker

@JoeyZwicker
Copy link

@dwhitena too.

There's definitely interest in exploring this integration because running distributed TF within Pachyderm is a common request and our tools can likely compliment each other very well to support this type of use case, as well as others.

@dwhitena
Copy link

dwhitena commented Mar 8, 2018

Proposal for KubeFlow + Pachyderm Integration

Motivation

From the KubeFlow Perspective

KubeFlow is providing some great standards for the deployment and management of distributed ML/AI tooling on Kubernetes (distributed TensorFlow, TF serving, JupyterHub, etc.). However, as of now, KubeFlow does not have a built in solution for orchestrating ML/AI workflows that utilize these tools. That is, what are users to do if they want to string together pre-processing, feature engineering, training, inference, post-processing, etc. into an automated data pipeline that is sustainable, scalable, and tracked over time.

In addition, vanilla KubeFlow leaves the data management solution up to the individual users and relies on users to integrate their own ingress/egress/tracking/versioning of data in TFJobs, notebooks, etc. This is good in that it provides flexibility for users, but many users will want a scalable and unified way of managing and versioning data sets and connecting those to corresponding processing (e.g., with TF).

From the Pachyderm perspective

Pachyderm is already allowing users to build scalable, tracked, and manageable ML/AI pipelines on Kubernetes. However, for the most part (outside of GPU usage and some custom solutions), these pipelines rely on single node (non-distributed) training (although Pachyderm can automatically parallelize map/reduce style processing for these stages).

Pachyderm is managing production model development workflows on GPU instances, but its users frequently ask how they integrate distributed model training with things like TF distributed or Spark. Thus, Pachyderm needs a way to enable these distributed frameworks to run as stages of a Pachyderm pipeline, thus giving its users access to accelerated training paired with scalable data management and pipelining.

Moreover, anything that can be done to ease the burden of deploying these frameworks and tools on Kubernetes will benefit the Pachyderm ecosystem and its users. For example, easily spinning up TF serving would be useful to Pachyderm users, because they could automatically egress models from Pachyderm to TF serving and manage/version that workflow in a unified way.

Pachyderm + KubeFlow

Pachyderm + KubeFlow will satisfy these needs from both the KubeFlow and Pachyderm perspective. Pachyderm can provide the pipeline/orchestration/data management pieces missing with KubeFlow, and KubeFlow can provide the ease of deployment and distributed framework integrations that are missing in Pachyderm.

Use Cases

  • I have tested out distributed TF training in a Jupyter notebook, but now I want to hook that training into my ETL pipeline and/or integrate large scale data with its corresponding pre-processing stages.
  • I have my TF model development workflow running as a pipeline in Pachyderm, but I need to scale my training using TF distributed.
  • I want to unify my management of model training and deployment/inference in a way that maintains versions of models and provides data provenance for my results.
  • I want to both easily deploy the various data science, ML, AI frameworks I need to use on k8s and stitch those together in a data pipeline.
  • I need to automatically trigger model training and/or inference based on new data.
  • I want to utilize the framework deployed with KubeFlow to also manage large scale data on the same cluster, such that I can ingress/egress data to/from KubeFlow components.

Goals

  • Trigger TFJobs from Pachyderm
    • Allow users to trigger these jobs without modifying their TF code
    • Support both distributed and non-distributed training
  • Implement a mechanism to transfer data to/from TFJobs that are launched by Pachyderm.
  • Implement a mechanism to egress models trained and versioned in Pachyderm to TF serving, as deployed by KubeFlow.
  • Deploy Pachyderm in a manner consistent with the rest of the KubeFlow offerings.

Workflow

kubeflow_pachyderm_proposal

Fig. 1 Distributed training as part of a Pachyderm pipeline

As shown in Fig. 1, users can manage their data sets in Pachyderm distributed file system, and have various stages of processing (e.g., pre-processing) that run as normal Pachyderm pipelines stages. However, the model training stage of the Pachyderm pipeline would trigger a TFJob via KubeFlow that would run distributed (or not) training via TF. The Pachyderm pipeline stage would wait for this Job to complete and then version any output data from the TFJob (e..g, the serialize *.pb file from TF) as output in Pachyderm. This back and forth of data and the corresponding mechanisms are discussed below. In addition, users could pull in other external data (e.g., from databases, HDFS, etc.) in the TFJob if they wanted (similar to how this could be done in any other Pachyderm pipeline stage).

kubeflow_pachyderm_proposal 1

Fig. 2 Training + Model versioning export

As shown in Fig. 2, users could train model using the mechanism discussed wrt to Fig. 1 (or another mechanism). After a model is trained and versioned, a Pachyderm pipeline stage could be utilized to export this newly trained model to TF serving. The has some huge benefits around automation, as models will be kept up to date in TF serving, and around tracking, as models can be automatically versioned in pachyderm. Models utilized in TF serving can also be tracked back to the exact state of pre-processing, raw data, etc. that produced them.

Components

pachyderm_kubeflow 1

Fig. 3 Pachyderm + KubeFlow Integration Components (proposed components shown in green)

The design has several components (some of which are pictured above, and some of which are configuration/templates):

  • A Pachyderm KubeFlow worker image, utilized as the image for Pachyderm pipelines that utilize KubeFlow components
  • A Pachyderm TFJob Sidecar
  • A Pachyderm ksonnet package
  • Pachyderm template pipeline specs
    • A template spec for utilizing the KubeFlow worker image to do training
    • A template spec for utilizing the KubeFlow worker image to do export a model for serving

KubeFlow Worker Image

The KubeFlow Worker Image will be a Docker image including a binary for launching TFJobs and exporting models to TF serving. This binary will be included in Pachyderm’s source repo and the image will be integrated into Pachyderm CI/CD.

The purpose of the KubeFlow Worker Image, as enabled by the included binary (likely a Go program) is to:

  • Launch and manage TFJobs via the Kubernetes API
    • Create a TFJob based on a user supplied manifest
    • Wait for the TFJob to finish or fail
    • Gather results and logs from the TFJob, which will be made available and will be versioned in Pachyderm.
  • Export TF models to TF Serving

By having this officially supported KubeFlow worker, users of Pachyderm will be able to:

  • Develop TFJobs as documented in KubeFlow documentation
  • Trigger those TFJobs from Pachyderm, without having to modify the TF code (e.g., by injecting references to Pachyderm)
  • Version and manage the output of TFJobs via Pachyderm data versioning
  • Both version models and easily export them to TF serving, without having to write extra code to enable that export step

An example of the functionality required in this worker has been created here. In addition to that creating, waiting, and gathering functionality, this worker image will, for training jobs, likely also need to read and slightly modify the user supplied TFJob manifest, to include:

  • The Pachyderm TFJob Sidecar Container (further discussed below) in each of the TF master, worker, and ps pods.
  • Environmental variables for the sidecar containers that reference the input data repository that is being processed. That way the sidecar containers can know what data to shim into the TFJob pods. The environment variable will include input repo name and branch at the minimum.

Pachyderm TFJob Sidecar Container

There are a couple of use cases in terms of the data that would be required to run a TFJob:

  1. The training data comes from a Pachyderm data repository that is input to the KubeFlow worker pipeline stage.
  2. The training data or other data is pull from data stores outside of Pachyderm (HDFS, databases, object store buckets, etc.)

In either case, the TFJob can be triggered the same way. The KubeFlow worker image will create the TFJob using a modified version of the user manifest that includes the Pachyderm TFJob sidecar containers in each of the master, worker, and ps pods. The Pachyderm TFJob sidecar will then read in the environmental variable set by the KubeFlow worker (Pachyderm input repo name and branch) and then shim in the data from the input repo via a Pachyderm language client to the pod.

In the first case above (1), this will allow the TFJob to have access to input data from Pachyderm, without having to use any Pachyderm components explicitly. In the second case above (2), the data that is shimmed in is likely irrelevant and the user can just ignore it. This is consistent with the general Pachyderm philosophy of data in/out. It’s up to the user whether they want to pull data from the Pachyderm input or not.

If the node is the master TF node, the Pachyderm sidecar will also take care of collecting output from the TFJob that is meant to go back to Pachyderm. This will be done by writing to a specified directory (e.g., /pfs/out as in a normal Pachyderm pipeline). Once the TF code completes and the user container is exiting, the Pachyderm sidecar can gather whatever is in that directory and:

  • use the Pachyderm client to commit this output back into Pachyderm as a “partial result” in the Kubeflow worker pipeline’s output repo.
  • the Kubeflow worker would then, by default, gather all partial results in the output repo, including the ones from the sidecar as the pipeline’s final output (which is what happens now when Pachyderm is running multiple workers in a job).

(Note an alternative or future direction here might be to exploit k8s volumes to facilitate this interaction)

Pachyderm ksonnet package

KubeFlow users should be able to deploy Pachyderm as they would other components of the KubeFlow ecosystem. This would look like:

$ ks pkg install kubeflow/pachyderm

Possible with some arguments or setting a cloud/nocloud option. Some inspiration can be taken from the Helm chart for Pachyderm.

Pachyderm pipeline templates

We should also provide pipeline templates for KubeFlow/TFJob/TF serving, so users can get up and running quickly. This would look like the following for training (where pachkf is the example binary included in the KubeFlow Worker, further discussed above):

{
  "pipeline": {
    "name": "model_training"
  },
  "transform": {
    "image": "pachyderm/kubeflow-worker",
    "cmd": [ "pachkf", “tfjob”, “/pfs/tfjob/tfjob.yaml” ]
  },
  "input": {
    "cross": [
      {
        "atom": {
          "repo": "tfjob",
          "glob": "/"
        }
      },
      {
        "atom": {
          "repo": "training_data",
          "glob": "/"
        }
      }
    ]
  }
}

(note, the TFJob manifest could also be shim in directly from a git repo using Pachyderm git repo inputs)

The pipeline spec for serving would look like:

{
  "pipeline": {
    "name": "model_export"
  },
  "transform": {
    "image": "pachyderm/kubeflow-worker",
    "cmd": [ "pachkf", “tfserving”, “/pfs/model/frozen_model.pb” ]
  },
  "input": {
    "atom": {
      "repo": "model",
      "glob": "/"
    }
  }
}

Moving forward and discussion

We look forward to further discussion and fine tuning of this proposal here. @dwhitena is also in KubeFlow Slack, and Pachyderm has their own public Slack for discussion: http://slack.pachyderm.io/

We also propose that, to expedite this integration, the Pachyderm team starts working on the KubeFlow worker image and pipeline templates, and the KubeFlow team starts working on some of the functionality for the Pachyderm sidecar (based on their TFJob expertise). Then we can take care of the ksonnet package collaboratively.

@jlewi
Copy link
Contributor Author

jlewi commented Mar 11, 2018

Thanks this is great.

A couple questions

  1. Why give special treatment to TFJob? Why not allow Pachyderm to create and wait for any K8s resource? You could establish a convention of looking at a resource's conditions to determine when it is done.

  2. What about the sidecar is specific to TFJob?

Would an init container be better than a side car? If you use a sidecar how does the main TF process know when the data has been loaded and it can start?

  1. Is it possible to resolve the pachydrm path to the underlying object store path (S3/GCS/HDFS) and read/write directly to that datastore? What happens if your reading/writing a really large dataset? Do yo have to copy the entire dataset to a local volume?

For example, in training with TF one typically uses multiple TFRecord files containing sequences of records. Typically in training we want to shuffle the data. This can be done efficiently using streaming reads in a way that requires reading fewer bytes than copying the entire dataset to a local volume and then reading it.

@dwhitena
Copy link

Thanks for the feedback @jlewi! Regarding your questions:

(1) Yes, I think this would be a possibility. The TFJob is a specific use case, but I think it would be good to both have creating any resource as a goal, while also specifically targeting the TFJob (because we have a good use case). I wouldn't want to stall this unnecessarily just for the sake of generality, but the "KubeFlow worker image" here could just as well be call "K8s resource worker image."

(2) Regarding the sidecar specificity, I think it is probably similar to my position on (1). We know how this should work out operationally for the TFJob. However, I'm not confident in saying that it would/should work the same for all other resources. I will try to think of some examples or edge cases.

Regarding the init container: I don't think it could just be an init container, because we also have to gather data after the job completes. An init container would/could take care of the shimming of data in, but I'm not sure how we would gather the output. In normal Pachyderm land, a worker binary is shimmed in the user container, and that binary runs the user command such that it knows when it completes. We need to think about what the parallel would be for the sidecar or what might be an alternative. I will ping the team internally about this to get feedback.

(3) It is possible to map the Pachyderm path to underlying objects in the object store, but those are aren't meant to be human readable (i.e., they are stored by hash). If you are readying/writing a large data set (in normal Pachyderm) you could read lazily via named pipes instead of copying everything. Also, you can adjust a cache size that would cache data locally on the node. I would guess that we could utilize a similar type of lazy read to perform shuffling of TFRecord files.

@jlewi
Copy link
Contributor Author

jlewi commented Mar 30, 2018

@dwhitena and I chatted yesterday.

My suggestion was to have the Pachyderm launch an Argo workflow that would do the following

  1. Copy data from Pachyderm datastore to a datastore (e.g. object store) readable by TensorFlow (e.g. GCS/S3)
  2. Submit and wait for the TFJob.
  3. Copy the output from the object store back to Pachyderm.

We'll experiment with this in the context of our examples; I'll open up an issue for that.

@jdoliner
Copy link

@jlewi That sounds like a good approach to get something up and running now.

Long term it would probably be a lot nicer if we had a way for TensorFlow to pull directly out of Pachyderm. Can Tensorflow pull data from an http endpoint? Pachyderm exposes its whole filesystem over a simple http scheme. If not we may look into writing an adapter for TensorFlow so it can read directly from Pachyderm.

@dwhitena
Copy link

dwhitena commented Mar 30, 2018

@jlewi and @jdoliner, I would also suggest a slight modification of the above. If we did the above, we would have the following data transfer:

(pachyderm bucket) -network-> (other bucket) -network-> (tfjob) -network-> (other bucket) -network-> (pachyderm bucket)

I'm fine with using Argo, but ideally (especially for large data sets) would have the Argo workflow get this data right to the tfjob pods:

(pachyderm bucket) -network-> (tfjob) -network-> (pachyderm bucket)

This was my impression about what we would be doing with Argo, as another temporary bucket in GCS/S3 would be redundant. (Note, this is originally why I proposed using a sidecar with the shared volume. The volume was not for persistent storage of data, it was for exposing the Pachyderm data to the TFJob temporarily.)

@jlewi Can we still utilize the Argo workflow, but avoid the redundant GCS/S3 bucket (because any inputs and outputs will already be backed in the pachyderm object store)? Just to re-iterate, my view of what the Argo workflow (or other component) should do, would be:

  1. determine which data repository should be exposed to the TFJob (which could be done via environment vars, etc. in one example)
  2. expose that data to the TFJob (this could be an additional bucket, but to prevent unnecessary duplication and copies over the network, it would be great to have the data come directly out of Pachyderm into a volume/space accessible via normal file I/O in the TFJob)
  3. gather output data from the TFJob
  4. commit the output data from the TFJob back into Pachyderm.

It would be fine to accomplish this via Argo, but the place where the data lives in steps 2 and 3 doesn't need to be persistent/durable. It just needs to be accessible from the TFJob.

@jlewi
Copy link
Contributor Author

jlewi commented Apr 1, 2018

@jdoliner @dwhitena We have a number of services (TF, KVC) that can talk directly to object store and speak object store APIs. So if Pachyderm can expose data via those APIS (e.g. the GCS or S3 API) then I think that would be a good long term approach.

I think is preferable to make the Pachyderm server speak GCS/S3 because that's a single implementation whereas updating clients to read from Pachyderm would be an unbounded set.

Is there a reason Pachyderm can't just return a list of URIs corresponding to the objects? Does Pachyderm munge the data or store diffs?

The answer @dwhitena gave above was that the underlying URIs are hashes and not human readable but I don't see that as being an issue.

For output, TFJob would write directly to object store. If Pachyderm isn't munging the data and is just doing a copy/move then I'd expect most object stores are smart enough to treat that as just a metadata operation and not actually copy data.

@dwhitena I think the network copy is orthogonal to whether to use Argo or a sidecar or init container. I don't think the semantics of an init container/side car are what we want. init container runs for ever pod every time the pod starts. I think the semantics we want are export the data once from Pachyderm before we start TFJob.

I think the question of network efficiency depends on what type of storage we use and not whether we use side cars. For example, the export step could copy it to host volumes(e.g KVC), PDS, NFS, etc... which could then be mounted as PVs to the TFJob.

@dwhitena
Copy link

dwhitena commented Apr 3, 2018

@jlewi Regarding the network copy and orthogonality, I totally agree with you. I think this is independent of Argo etc., and, in my mind, utilizes the Argo workflow with a KVC or other volume would be best. Actually, I have a discussion planned with the KVC contributors tomorrow, so I will mention this.

Regarding an S3 compatible API to the Pachyderm file system, I think that an HTTP/REST interface is more likely. Actually, the file system is currently available via HTTP, and we are working to make this a little easier. S3 compatibility might be a bit of a challenge due to the versioning semantics that would have to be baked in. Anyway, there might be some options here eventually.

Actually, returning a list of hashes to retrieve is essentially what the Argo workflow will be using. This will likely be a little more convenient though, because we can just use the branch name "master" and repo name to retrieve all of the relevant data via the Pachyderm API. So, really branch name (or commit ID) and repo name will give us what we want.

jlewi added a commit to jlewi/kubeflow that referenced this issue Apr 6, 2018
* Packages were created based on the helm chart
https://github.com/kubernetes/charts/tree/master/stable/pachyderm/templates

* Instructions for adding new packages is being added in kubeflow#609.

* I've confirmed that the resulting components can successfully be deployed
  but I don't know if Pachyderm is working yet.

* We most likely need to expose more options e.g. to control things
  like GCS bucket.

* Related to kubeflow#151
k8s-ci-robot pushed a commit that referenced this issue Apr 9, 2018
* Initial ksonnet package for Pachyderm.

* Packages were created based on the helm chart
https://github.com/kubernetes/charts/tree/master/stable/pachyderm/templates

* Instructions for adding new packages is being added in #609.

* I've confirmed that the resulting components can successfully be deployed
  but I don't know if Pachyderm is working yet.

* We most likely need to expose more options e.g. to control things
  like GCS bucket.

* Related to #151

* Added dash.

* Fix the dashboard; need to set DASH_SERVICE_HOST.

* Remove the dashboard because its behind a paywall.
@jlewi
Copy link
Contributor Author

jlewi commented Jun 5, 2018

@dwhitena @JoeyZwicker Any update on this? In particular any thought on where you'd like to be for our 0.2 release?

@dwhitena
Copy link

dwhitena commented Jun 6, 2018

Hi @jlewi. Well, I think before the 0.2 release, we should have the following example merged in: kubeflow/examples#112.

Regarding the implementation itself, we there is work going on right now in the KVC project that will facilitate pretty easy data sharing for TFJobs: intel/vck#22. Once that is finished, we should update the end-to-end example with distributed TFJobs for training.

@balajismaniam
Copy link

xref: intel/vck#22 (comment).

@carmine
Copy link

carmine commented Oct 25, 2018

Any update on this?

@Nick-Harvey
Copy link

Just submitted a pull request yesterday to pick up where things left off, kubeflow/examples#282 all the credit goes to @dwhitena. He did all the work, I just got it past the 1-yard line.

@jlewi
Copy link
Contributor Author

jlewi commented Nov 19, 2018

This is great thanks.

@carmine
Copy link

carmine commented Jan 31, 2019

Adjusting to 0.5.0 and p2 .. can bump higher if we feel this will get released in the 0.5.0 timeframe.

@carmine carmine added this to New in 0.5.0 Jan 31, 2019
@carmine
Copy link

carmine commented Jan 31, 2019

Have added to kanban board as well - https://github.com/orgs/kubeflow/projects/4

@jlewi jlewi moved this from New to Examples in 0.5.0 Feb 11, 2019
@jlewi
Copy link
Contributor Author

jlewi commented Mar 10, 2019

@JoeyZwicker @Nick-Harvey Is there more work to be done here?

Looks like the example is still pending in kubeflow/examples#522

@Nick-Harvey
Copy link

@jlewi I think there's an issue with the Argo pipeline/test suite that's keeping this from getting merged. Argo says that tfjob-test is failing but the logs are blank. Maybe it was a fluke so I'll hit retest

@jlewi
Copy link
Contributor Author

jlewi commented Mar 26, 2019

@Nick-Harvey @jdoliner @JoeyZwicker The example was merged. I'm going to mark this as close.
If there are next steps for improving the Pachyderm + Kubeflow story lets open up new issues.

@jlewi jlewi closed this as completed Mar 26, 2019
0.5.0 automation moved this from Examples to Done Mar 26, 2019
saffaalvi referenced this issue in StatCan/kubeflow Feb 11, 2021
* Initial ksonnet package for Pachyderm.

* Packages were created based on the helm chart
https://github.com/kubernetes/charts/tree/master/stable/pachyderm/templates

* Instructions for adding new packages is being added in kubeflow#609.

* I've confirmed that the resulting components can successfully be deployed
  but I don't know if Pachyderm is working yet.

* We most likely need to expose more options e.g. to control things
  like GCS bucket.

* Related to #151

* Added dash.

* Fix the dashboard; need to set DASH_SERVICE_HOST.

* Remove the dashboard because its behind a paywall.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
0.5.0
  
Done
Development

No branches or pull requests

7 participants