Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google Cloud integrations #720

Closed
MarkEdmondson1234 opened this issue Dec 9, 2021 · 28 comments
Closed

Google Cloud integrations #720

MarkEdmondson1234 opened this issue Dec 9, 2021 · 28 comments
Assignees

Comments

@MarkEdmondson1234
Copy link
Contributor

Prework

  • [x ] Read and agree to the code of conduct and contributing guidelines.
  • [x ] If there is already a relevant issue, whether open or closed, comment on the existing thread instead of posting a new issue.
  • [x ] New features take time and effort to create, and they take even more effort to maintain. So if the purpose of the feature is to resolve a struggle you are encountering personally, please consider first posting a "trouble" or "other" issue so we can discuss your use case and search for existing solutions first.
  • [ x] Format your code according to the tidyverse style guide.

Proposal

Integrate targets with googleCloudRunner and/or googleCloudStorageR. I would like to prepare a pull request to enable this and would appreciate some guidance on where best to spend time.

I was inspired by the recent AWS S3 integration and I would like to have similar functionality for googleCloudStorageR. From what I see the versioning of cloud objects that is required is available via the existing gcs_get_object() function to check for updates.

The most interesting integration I think would be with googleCloudRunner, since via Cloud Build and Cloud Run parallel processing of R jobs in a cheap cloud environment could be achieved.

Cloud Build is based on a yaml format that seems to map closely with targets including an id and wait-for attribute that can create DAGs. I propose targets help create those ids, and then download the Build Logs to check for changes? Get a bit woolly here what's best to do. I anticipate lots of cr_buildstep_r() calls with cr_build() called in the final make. I think this can be done via existing targets code calling R scripts with library(googleCloudRunner) within them, but I would like to see if there is anything deserving a pull request within targets itself that would make the process more streamlined.

Cloud Run can be used to create lots of R micro-service APIs via plumber that could trigger R scripts for target steps. There is an example at the bottom of here showing parallel execution. I propose targets could help create the parallel jobs.

@wlandau
Copy link
Collaborator

wlandau commented Dec 9, 2021

Hi Mark! Thank you for your interest! Google Cloud integration has been in the back of my mind, and I would love to support it in targets.

Storage

To start, it would be great to have nicely abstracted utilities for Google Cloud Storage comparable to the Amazon ones (aws_s3_exists(), aws_s3_head(), aws_s3_download(), and aws_s3_upload()). That and semi-automated unit tests would be an excellent first PR. This may not seem like much from your end because functions in https://code.markedmondson.me/googleCloudStorageR/reference/index.html already look easy to use, but it would really help me.

After that, the next step is to create a new abstract storage class to govern internal behaviors like hashing and metadata storage, as well as concrete subclasses that inherit from both that abstract class and classes specific to each supported file format. (I was thinking of supporting format = "gcs_feather", "gcs_parquet", "gcs_qs", "gcs_keras", "gcs_torch", and "gcs_file", each of which requires its own concrete subclass.) This gets pretty involved for devs not already familiar with targets internals, but it would probably only take me a couple days once I am up and running with Google Cloud. If you are still interested in a PR for this part, please let me know.

Compute

targets relies on clustermq and future to orchestrate distributed jobs. (Internally, there is a clustermq class for persistent workers and a future class for transient workers. Both are sub-classes of "active", which is a sub-class of "algorithm".) I chose these packages as backends because each one supports a wide array of backends, most notably forked processes, callr processes, and traditional schedulers like SLURM, SGE, TORQUE, PBS, and LSF. I would prefer to continue in this direction, with clustermq and future serving as intermediaries between targets and distributed systems to submit, poll, and return the results of jobs. The current setup abstracts away a lot of work that seems a bit low-level for a package like targets.

googleCloudRunner + future seems like an excellent combo, and the setup you document at https://code.markedmondson.me/r-at-scale-on-google-cloud-platform/ seems like a great proof of concept. (Kind of like https://furrr.futureverse.org/articles/advanced-furrr-remote-connections.html, but easier to create the cluster because it does not seem to require looking up an IP address.) What are your thoughts on a dedicated future backend like future.callr or future.aws.lambda to automate the setup and teardown of transient workers without the requirement of a PSOCK cluster? I am sure a lot of non-targets R users would appreciate something like this as well. cc @HenrikBengtsson.

@MarkEdmondson1234
Copy link
Contributor Author

Great thanks, will get started.

To start, it would be great to have nicely abstracted utilities for Google Cloud Storage comparable to the Amazon ones (aws_s3_exists(), aws_s3_head(), aws_s3_download(), and aws_s3_upload()). That and semi-automated unit tests would be an excellent first PR. This may not seem like much from your end because functions in https://code.markedmondson.me/googleCloudStorageR/reference/index.html already look easy to use, but it would really help me.

No problem and first PR will do this.

After that, the next step is to create a new abstract storage class to govern internal behaviors like hashing and metadata storage, as well as concrete subclasses that inherit from both that abstract class and classes specific to each supported file format. (I was thinking of supporting format = "gcs_feather", "gcs_parquet", "gcs_qs", "gcs_keras", "gcs_torch", and "gcs_file", each of which requires its own concrete subclass.) This gets pretty involved for devs not already familiar with targets internals, but it would probably only take me a couple days once I am up and running with Google Cloud. If you are still interested in a PR for this part, please let me know.

Was looking through that as a place to get started, targets is using a more OO approach I'm not very familiar with so it will be a bit of a learning curve getting to grips with it but hopefully one to do.

targets relies on clustermq and future to orchestrate distributed jobs. (Internally, there is a clustermq class for persistent workers and a future class for transient workers. Both are sub-classes of "active", which is a sub-class of "algorithm".) I chose these packages as backends because each one supports a wide array of backends, most notably forked processes, callr processes, and traditional schedulers like SLURM, SGE, TORQUE, PBS, and LSF. I would prefer to continue in this direction, with clustermq and future serving as intermediaries between targets and distributed systems to submit, poll, and return the results of jobs. The current setup abstracts away a lot of work that seems a bit low-level for a package like targets.

This is already supported on Google VMs as googleComputeEngineR has a future backend (big fan of Henrik and future who helped implement it). There is some examples of using the VMs for future workloads here: https://cloudyr.github.io/googleComputeEngineR/articles/massive-parallel.html - is anything more needed for that? As I understand it you could use it today in targets via:

library(future)
library(targets)
library(googleComputeEngineR)

vms <- gce_vm_cluster()
plan <- plan(cluster, workers = as.cluster(vms))
tar_resources_future(plan = plan)
...

But I think there is an opportunity to move this more into a serverless direction, as the cloud build steps seem to seamlessly map to tar_targets() if a way of communicating between the steps can be done.

As an example an equivalent googleCloudRunner to targets minimal example would be:

library(googleCloudRunner)

bs <- c(
    cr_buildstep_gcloud("gsutil", 
                        id = "raw_data_file",
                        args = c("gsutil",
                                 "cp",
                                 "gs://your-bucket/data/raw_data.csv",
                                 "/workspace/data/raw_data.csv")),
    # normally would not use readRDS()/saveRDS() in multiple steps but for sake of example
    cr_buildstep_r("read_csv('/workspace/data/raw_data.csv', col_types = cols()) %>% saveRDS('raw_data')",
                   id = "raw_data",
                   name = "verse"),
    cr_buildstep_r("readRDS('raw_data') %>% filter(!is.na(Ozone)) %>% saveRDS('data')",
                   id = "data",
                   name = "verse"),
    cr_buildstep_r("create_plot(readRDS('data')) %>% saveRDS('hist')",
                   id = "hist", 
                   waitFor = "data", # so it runs concurrently to 'fit'
                   name = "verse"),
    cr_buildstep_r("biglm(Ozone ~ Wind + Temp, readRDS('data'))",
                   waitFor = "data", # so it runs concurrently to 'hist'
                   id = "fit",
                   name = "gcr.io/mydocker/biglm")                          
    
)
bs |> cr_build_yaml() 

Normally I would put all the r steps in one buildstep sourced from a file but have added readRDS() %>% blah() %>% saveRDS() to illustrate functionality that I think targets could take care of.

Makes this yaml object that I think maps to targets closely:

==cloudRunnerYaml==
steps:
- name: gcr.io/google.com/cloudsdktool/cloud-sdk:alpine
  entrypoint: gsutil
  args:
  - gsutil
  - cp
  - gs://your-bucket/data/raw_data.csv
  - /workspace/data/raw_data.csv
  id: raw_data_file
- name: rocker/verse
  args:
  - Rscript
  - -e
  - read_csv('/workspace/data/raw_data.csv', col_types = cols()) %>% saveRDS('raw_data')
  id: raw_data
- name: rocker/verse
  args:
  - Rscript
  - -e
  - readRDS('raw_data') %>% filter(!is.na(Ozone)) %>% saveRDS('data')
  id: data
- name: rocker/verse
  args:
  - Rscript
  - -e
  - create_plot(readRDS('data')) %>% saveRDS('hist')
  id: hist
  waitFor:
  - data
- name: gcr.io/mydocker/biglm
  args:
  - Rscript
  - -e
  - biglm(Ozone ~ Wind + Temp, readRDS('data'))
  id: fit
  waitFor:
  - data

(more build args here)

Do the build on GCP via the_build |> cr_build()

And/or each buildstep could be its own dedicated cr_build() and the build's artefacts are uploaded/downloaded after its run.

This holds several advantages:

  • Each step can be executed in its own environment
  • Each step can use differing amount of resources (e.g. a 32 core build step vs a 1 core)
  • Start-up and tear down is handled automatically
  • Multiple languages could be used within a task step
  • Up to 24hrs compute time per step
  • Default 30 steps concurrent usage, quotas up to 100. Unlimited build queue.

I see that as a tool that is better than Airflow for visualising DAGs, taking care of state management on whether each node needs to be run but with a lot of scale to build each step in a cloud environment.

@MarkEdmondson1234
Copy link
Contributor Author

I think looking through another simple addition will be to create a version of tar_github_actions() that will get one step closer to my end vision ;)

@wlandau
Copy link
Collaborator

wlandau commented Dec 12, 2021

Was looking through that as a place to get started, targets is using a more OO approach I'm not very familiar with so it will be a bit of a learning curve getting to grips with it but hopefully one to do.

Sounds good, I am totally willing to work through future PRs with you that add the OO-based functionality. Perhaps the next one could be the class that contains user-defined resources that will get passed to GCS, e.g. the bucket and ACL. The AWS equivalent is at https://github.com/ropensci/targets/blob/main/R/class_resources_aws.R. That PR could include a user-facing function to create an object and an argument to add it to the whole resources object (either for a target or default for the pipeline). With that in place, it will be easier to create GCS classes equivalent to https://github.com/ropensci/targets/blob/main/R/class_aws.R and https://github.com/ropensci/targets/blob/main/R/class_aws_parquet.R, etc.

@wlandau
Copy link
Collaborator

wlandau commented Dec 12, 2021

This is already supported on Google VMs as googleComputeEngineR has a future backend (big fan of Henrik and future who helped implement it). There is some examples of using the VMs for future workloads here: https://cloudyr.github.io/googleComputeEngineR/articles/massive-parallel.html - is anything more needed for that?

Not for Compute Engine, I think.

But I think there is an opportunity to move this more into a serverless direction, as the cloud build steps seem to seamlessly map to tar_targets() if a way of communicating between the steps can be done.

I agree that serverless computing is an ideal direction, and tar_target() steps and cr_buildstep_r() are conceptually similar. (And I didn't realize googleCloudRunner was itself a pipeline tool, constructing the DAG and putting it into a YAML file.) With targets, which already has its own machinery to define and orchestrate the DAG, the smoothest fit I see is to create a new extension to future that could be invoked within each target. With a new plan like future::plan(cloudrunner), each future::future() could define and asynchronously run each R command as its own cr_buildstep_r() step. Then, the existing machinery of targets could orchestrate these cr_buildstep_r()-powered futures in parallel and identify the input/output data required. Does that make sense? Anything I am missing? Usage would look something like this:

# _targets.R file:
library(targets)
library(tarchetypes)
source("R/functions.R")
options(tidyverse.quiet = TRUE)
tar_option_set(packages = c("biglm", "dplyr", "ggplot2", "readr", "tidyr"))

library(future.googlecloudrunner)
plan <- future::tweak(cloudrunner, cores = 4)
resources_gcp <- tar_resources(
  future = tar_resources_future(plan = plan) # Run on the cloud.
)

list(
  tar_target(
    raw_data_file,
    "data/raw_data.csv",
    format = "file",
    deployment = "main" # run locally
  ),
  tar_target(
    raw_data,
    read_csv(raw_data_file, col_types = cols()),
    deployment = "local"
  ),
  tar_target(
    data,
    raw_data %>%
      filter(!is.na(Ozone)),
    resources = resources_gcp
  ),
  tar_target(
    hist,
    create_plot(data),
     resources = resources_gcp
  ),
  tar_target(fit, biglm(Ozone ~ Wind + Temp, data), resources = resources_gcp),
  tar_render(report, "index.Rmd", deployment = "main") # not run on the cloud
)

@wlandau
Copy link
Collaborator

wlandau commented Dec 12, 2021

I think I see your point about directly mapping tar_target() steps to cr_buildstep_r() in targets, but please let me know if I am missing any potential advantages, especially efficiency. In the early days of drake, I tried something similar: have the user define the pipeline with drake_plan() (the equivalent of a list of tar_target()s), then map that pipeline to a Makefile. Multicore computing was possible though make -j 2 (called from drake::make(plan, jobs = 2), and distributed computing was possible by defining a SHELL that talked to a resource manager like SLURM. However, that proved to be an awkward fit and not very generalizable to other modes of parallelism.

So these days, I prefer that targets go through a framework like future to interact with distributed systems. targets is responsible for setting up and orchestrating the DAG and identifying the dependencies that need to be loaded for each target, while future is responsible for

  • Asynchronously running a worker for a given target:

future <- do.call(what = future::future, args = args)

  • Polling it through future::resolved()

if (future::resolved(worker)) {

  • And returning the results through future::value():

tryCatch(future::value(worker, signal = FALSE), error = identity)

These 3 tasks would be cumbersome to handle directly in targets, especially if targets needed to duplicate the implementation across a multitude of parallel backends (e.g. forked processes, callr, SLURM, SGE, AWS Batch, AWS Fargate, Google Cloud Run). And I am concerned that directly mapping to a Cloud Run YAML file may require an entire orchestration algorithm (like https://github.com/ropensci/targets/blob/e144bdb2a5d7a2c80ae022df434b1354bacf5ad9/R/class_future.R) which is lower level and more specialized than I feel targets is prepared to accommodate.

@MarkEdmondson1234
Copy link
Contributor Author

Thanks for valuable feedback :)

I think I can get what I'm looking for building on top of existing code now I've looked at the GitHub trigger. The key thing is how to use targets to signal the state of the pipeline between builds, which I think the GCS integration will do eg can the targets folder be downloaded in between builds to indicate if it should run the step or not. Some boilerplate code to do that could then sit in googleCloudRunner with possibly a S3 method for a target build step, but will see it working first. To prep for that I have built a public Docker image with renv and targets installed that will be a requirement that's on "gcr.io/gcer-public/targets".

@wlandau
Copy link
Collaborator

wlandau commented Dec 12, 2021

The key thing is how to use targets to signal the state of the pipeline between builds, which I think the GCS integration will do eg can the targets folder be downloaded in between builds to indicate if it should run the step or not.

Yeah, if all the target output is in GCS, you only need to download _targets/meta/meta (super light).

Some boilerplate code to do that could then sit in googleCloudRunner with possibly a S3 method for a target build step, but will see it working first. To prep for that I have built a public Docker image with renv and targets installed that will be a requirement that's on "gcr.io/gcer-public/targets".

Awesome! So then are you thinking of using googleCloudRunner to manage what happens before and after the pipeline?

I think I can get what I'm looking for building on top of existing code now I've looked at the GitHub trigger.

Would you elaborate? I am not sure I follow the connection with GitHub actions.

@MarkEdmondson1234
Copy link
Contributor Author

Awesome! So then are you thinking of using googleCloudRunner to manage what happens before and after the pipeline?

I hope something like a normal targets workflow, then a function in googleCloudRunner that will take the pipeline and create the build (if all steps running in one) or builds (somehow specifying which steps to run in their own build instance) with GCS files maintaining state between them.

Would you elaborate? I am not sure I follow the connection with GitHub actions.

I see how the GitHub action deals with loading packages (via renv) and checking the state of the target pipeline (by looking at the .targets-runs branch).

I think if GCS can take the role of the .targets-run branch then I can replicate it with Cloud Build so as on a GitHub push event it builds a target pipeline, which can then be extended to other events such as a PubSub message (e.g. a file hits cloud storage or a BigQuery table is updated or a scheduler)

Replicating it will necessitate including boilerplate code (the docker image, downloading the _targets/meta/meta and target output GCS files). Each Cloud Build ends with "artifacts" such as Docker images or just arbitrary files uploaded to GCS so that takes care of the output, its the input I would like to see how is handled.

The renv step makes it more generic but build steps could also be sped up by adding R packages to the Dockerfile, and I guess that is necessary anyhow for some system dependencies. I don't know of a way to scan for system dependencies yet. Closest is https://github.com/r-hub/sysreqs but its not on CRAN.

@wlandau
Copy link
Collaborator

wlandau commented Dec 13, 2021

I hope something like a normal targets workflow, then a function in googleCloudRunner that will take the pipeline and create the build (if all steps running in one) or builds (somehow specifying which steps to run in their own build instance) with GCS files maintaining state between them.

So kind of like treating targets as a DSL and mapping the target list directly to a googleCloudRunner YAML file? Sounds cool. If you have benchmarks after that, especially with a few hundred targets, please let me know. It would be nice to get an idea of how fast GCR pipelines run vs how fast tar_make_future() with individual cloud futures would run.

Related: so I take it the idea of developing a googleCloudRunner-backed future::plan() is not appealing to you? Anyone else you know who would be interested? I would be, but it will take time to get up and running with GCP, and I usually feel overcommitted.

@wlandau
Copy link
Collaborator

wlandau commented Dec 13, 2021

I think if GCS can take the role of the .targets-run branch then I can replicate it with Cloud Build so as on a GitHub push event it builds a target pipeline, which can then be extended to other events such as a PubSub message (e.g. a file hits cloud storage or a BigQuery table is updated or a scheduler)

Replicating it will necessitate including boilerplate code (the docker image, downloading the _targets/meta/meta and target output GCS files). Each Cloud Build ends with "artifacts" such as Docker images or just arbitrary files uploaded to GCS so that takes care of the output, its the input I would like to see how is handled.

Maybe I'm misunderstanding, does that mean you'll run targets::tar_make() (or equivalent) inside a Cloud Build workflow? (You would need to in order to generate _targets/meta/meta.) That could still work in parallel if you get a single beefy instance for the Cloud Build run and parallelize with tar_make_future(). Sorry, I think I am a few steps behind your vision.

The renv step makes it more generic but build steps could also be sped up by adding R packages to the Dockerfile, and I guess that is necessary anyhow for some system dependencies. I don't know of a way to scan for system dependencies yet. Closest is https://github.com/r-hub/sysreqs but its not on CRAN.

I agree, handling packages beforehand through the Dockerfile seems ideal. I believe Henrik has anticipated some situations where packages are not known in advance and have to be installed dynamically (or marshaled, if that is possible).

Really excited to try out a prototype when this all comes together.

@MarkEdmondson1234
Copy link
Contributor Author

MarkEdmondson1234 commented Dec 14, 2021

So kind of like treating targets as a DSL and mapping the target list directly to a googleCloudRunner YAML file? Sounds cool. If you have benchmarks after that, especially with a few hundred targets, please let me know. It would be nice to get an idea of how fast GCR pipelines run vs how fast tar_make_future() with individual cloud futures would run.

Yes, I think it will start to make sense for long running tasks (>10mins) and/or those that can run in parallel a lot, since there is a long start time but with practically infinite resources if you have the cash, and not as much cash as you would need for running on a traditional VM cluster as its charged per second of job build time. My immediate use case will be for a lot smaller pipelines than that but those that can be triggered by changes in an API, BigQuery table or cloud storage file, since the targets pipeline can then trigger to a PubSub message (this is what I'm testing with at the moment)

Related: so I take it the idea of developing a googleCloudRunner-backed future::plan() is not appealing to you? Anyone else you know who would be interested? I would be, but it will take time to get up and running with GCP, and I usually feel overcommitted.

That sounds like a nice future project that perhaps I can look at in 2022 - I'm not sure if its a fit since Cloud Build is API based not SSH but I have contacted Henrik about it. There is also already the existing googleComputeEngineR workflow available.

Maybe I'm misunderstanding, does that mean you'll run targets::tar_make() (or equivalent) inside a Cloud Build workflow? (You would need to in order to generate _targets/meta/meta.) That could still work in parallel if you get a single beefy instance for the Cloud Build run and parallelize with tar_make_future().

My first example runs one pipeline in one Cloud Build with targets/tar_make() - that is working more or less now. That will always be a simple enough option and already useful enough given the triggers it can use.

For the "one Cloud Build per target" work will think about whats most useful. There is scope to:

  • Nest builds (Cloud build calling Cloud Build)
  • Callfuture within builds (A build can run on a multi-core build machine)
  • Run tar_make() locally but tasks individually call Cloud Builds

...and all the permutations of that ;) For all though, the Cloud Storage bucket keeping state between them.

@wlandau
Copy link
Collaborator

wlandau commented Dec 14, 2021

Thanks, Mark! I really appreciate your openness to all these directions.

@wlandau
Copy link
Collaborator

wlandau commented Dec 14, 2021

For the DSL approach, I think tar_manifest(fields = everything()) is the easiest way to get the target definitions in a machine-readable format.

@MarkEdmondson1234
Copy link
Contributor Author

Nice will take a look at tar_manifest(fields = everything()).

I have the one build per pipeline function working for my local example, but I'd like some tests to check that its not re-running steps etc when it downloads the Cloud Storage artifacts. There were a few rabbit holes but otherwise it turned into not much code for I hope powerful impact.

MarkEdmondson1234/googleCloudRunner#155

The current workflow is:

  1. Create your 'targets'
  2. Create a Dockerfile that holds the R and system dependencies for your workflow. You can build the image using cr_deploy_docker(). Include library(targets) dependencies - a Docker image with targets/renv installed is available at gcr.io/gcer-public/targets that you can FROM in your own Dockerfile..
  3. Run cr_build_targets() to create the cloudbuild yaml file.
  4. Create a build trigger via cr_buildtrigger() e.g. a GitHub commit one.
  5. Trigger a build. The first trigger will run the targets pipeline, subsequent runs will only recompute the outdated targets.

Using renv just took a very long time to build without its caching and I figured one can use cr_deploy_docker() to make the Dockerfile anyhow so I'm just using Docker for the environments at the moment.

The test build I'm doing is taking around 1m30 vs 2m30 the first run (it downloads from an API, does some dplyr transformations then uploads results to BigQuery if API data has updated)

example based off my local tests

The function cr_build_targets() helps set up some boilerplate code to download targets meta data from the specified GCS bucket, run the pipeline and uplaod the artifacts back to the same bucket.

cr_build_targets(path=tempfile())

# adding custom environment args and secrets to the build
cr_build_targets(
  task_image = "gcr.io/my-project/my-targets-pipeline",
  options = list(env = c("ENV1=1234",
                         "ENV_USER=Dave")),
  availableSecrets = cr_build_yaml_secrets("MY_PW","my-pw"),
  task_args = list(secretEnv = "MY_PW"))

Resulting in build:

==cloudRunnerYaml==
steps:
- name: gcr.io/google.com/cloudsdktool/cloud-sdk:alpine
  entrypoint: bash
  args:
  - -c
  - gsutil -m cp -r ${_TARGET_BUCKET}/* /workspace/_targets || exit 0
  id: get previous _targets metadata
- name: ubuntu
  args:
  - bash
  - -c
  - ls -lR
  id: debug file list
- name: gcr.io/my-project/my-targets-pipeline
  args:
  - Rscript
  - -e
  - targets::tar_make()
  id: target pipeline
  secretEnv:
  - MY_PW
timeout: 3600s
options:
  env:
  - ENV1=1234
  - ENV_USER=Dave
substitutions:
  _TARGET_BUCKET: gs://mark-edmondson-public-files/googleCloudRunner/_targets
availableSecrets:
  secretManager:
  - versionName: projects/mark-edmondson-gde/secrets/my-pw/versions/latest
    env: MY_PW
artifacts:
  objects:
    location: gs://mark-edmondson-public-files/googleCloudRunner/_targets/meta
    paths:
    - /workspace/_targets/meta/**

Looks like this when build after I commit to the repo. For my use case I would put it also on a daily schedule.

Screenshot 2021-12-14 at 22 51 49

@wlandau
Copy link
Collaborator

wlandau commented Dec 15, 2021

Nice! One pattern I have been thinking about for parallel workflows is tar_make_clustermq(workers = ...) inside a single multicore cloud job. cr_build_targets() pretty much gets us there.

@MarkEdmondson1234
Copy link
Contributor Author

MarkEdmondson1234 commented Dec 16, 2021

I have some tests now which can run without needing a cloudbuild.yaml file or trigger. They confirm

  1. Running a targets pipeline on cloud build
  2. Re-running the pipeline will skip over steps a previous build has done
  3. Changing the source files triggers a redo of the targets build

There is now also a cr_build_targets_artifacts() which downloads the target artifacts to a local session, optionally overwriting your local _targets/ folder.

The minimal example takes about 1minute to run with 20seconds for the tar_make() step, so add around 40 seconds to a target build from deployment to files ready, seems reasonable.

https://github.com/MarkEdmondson1234/googleCloudRunner/blob/master/R/build_targets.R

I will if I get time before Christmas look at the comments from the pull request and create a cr_buildstep_targets() that should prepare for looking at parallel builds. I think I missed the most obvious strategy before for workloads, which is using parallel build steps in one build, so the more complete list is:

  • One build with parallel running build steps using targets meta data to fill in the "id" and "waitFor" yaml fields for each step
  • Nest builds (Cloud build calling Cloud Build)
  • Call future within builds (A build can run on a multi-core build machine)
  • Run tar_make() locally but tasks individually call Cloud Builds

@wlandau
Copy link
Collaborator

wlandau commented Dec 19, 2021

I have some tests now which can run without needing a cloudbuild.yaml file or trigger. They confirm

Running a targets pipeline on cloud build
Re-running the pipeline will skip over steps a previous build has done
Changing the source files triggers a redo of the targets build

There is now also a cr_build_targets_artifacts() which downloads the target artifacts to a local session, optionally overwriting your local _targets/ folder.

The minimal example takes about 1minute to run with 20seconds for the tar_make() step, so add around 40 seconds to a target build from deployment to files ready, seems reasonable.

https://github.com/MarkEdmondson1234/googleCloudRunner/blob/master/R/build_targets.R

Amazing! Excellent alternative to tar_github_actions().

One build with parallel running build steps using targets meta data to fill in the "id" and "waitFor" yaml fields for each step

Yup, the DSL we talked about. That would at least convert targets into an Airflow-like tool tailored to GCP. You can get id from tar_manifest() and waitFor from tar_network()$edges.

Nest builds (Cloud build calling Cloud Build)

As an extension of the current cr_build_targets()?

Call future within builds (A build can run on a multi-core build machine)

Yeah, I saw cr_build_targets() has a tar_make argument, which the user could set to something like "future::plan(future.callr::callr); targets::tar_make_future(workers = 4)".

Run tar_make() locally but tasks individually call Cloud Builds

With HenrikBengtsson/future#567 or https://cloudyr.github.io/googleComputeEngineR/articles/massive-parallel.html#remote-r-cluster, right? Another reason I like these options is that many pipelines do not need distributed computing for all targets. tar_target(deployment = "main") causes a target to run locally instead of a worker.

# _targets.R file
library(targets)
library(tarchetypes)

# For tar_make_clustermq() on a SLURM cluster:
options(
  clustermq.scheduler = "slurm",
  clustermq.template = "my_slurm_template.tmpl"
)

list(
  tar_target(model, run_model()), # Runs on a worker.
  tar_render(report, "report.Rmd", deployment = "main") # Runs locally.
)

@MarkEdmondson1234
Copy link
Contributor Author

MarkEdmondson1234 commented Dec 21, 2021

I've had a bit of a restructure to allow passing in the different strategies outlined above, customising the buildsteps you send up.

  • One build with parallel running build steps using targets meta data to fill in the "id" and "waitFor" yaml fields for each step

Now in via cr_buildstep_targets_multi():

library(googleCloudRunner)
targets::tar_script(
      list(
         targets::tar_target(file1, "targets/mtcars.csv", format = "file"),
         targets::tar_target(input1, read.csv(file1)),
         targets::tar_target(result1, sum(input1$mpg)),
         targets::tar_target(result2, mean(input1$mpg)),
         targets::tar_target(result3, max(input1$mpg)),
         targets::tar_target(result4, min(input1$mpg)),
         targets::tar_target(merge1, paste(result1, result2, result3, result4))
     ),
     ask = FALSE
 )

cr_buildstep_targets_multi()
ℹ 2021-12-21 11:57:07 > targets cloud location: gs://bucket/folder2021-12-21 11:57:07 > Resolving targets::tar_manifest()
 
── # Building DAG: ─────────────────────────────────────────────────────────────2021-12-21 11:57:09 > [ get previous _targets metadata ] -> [ file1 ]
ℹ 2021-12-21 11:57:09 > [ file1 ] -> [ input1 ]
ℹ 2021-12-21 11:57:09 > [ input1 ] -> [ result1 ]
ℹ 2021-12-21 11:57:09 > [ input1 ] -> [ result2 ]
ℹ 2021-12-21 11:57:09 > [ input1 ] -> [ result3 ]
ℹ 2021-12-21 11:57:09 > [ input1 ] -> [ result4 ]
ℹ 2021-12-21 11:57:09 > [ result1, result2, result3, result4 ] -> [ merge1 ]
ℹ 2021-12-21 11:57:09 > [ merge1 ] -> [ Upload Artifacts ]
  • Nest builds (Cloud build calling Cloud Build)
    Perhaps a helper function to put in normal R functions within target steps, to send it up and collect the results.

  • Call future within builds (A build can run on a multi-core build machine)
    Can use now by specifying future() in the tar_config() and choosing a multicore build machine

  • Run tar_make() locally but tasks individually call Cloud Builds
    Can specify in the R functions sent in to targets plus GCS files, although perhaps a deployment argument to specify if to send it up may be nice.

@MarkEdmondson1234
Copy link
Contributor Author

My test works but working with a real _targets file I'm coming across an error in my dag when it seems an edge is existing that is not in nodes.

My target list is similar to:

list(
  tar_target(
    cmd_args,
    parse_args(),
    cue = tar_cue(mode = "always")
  ),
  tar_target(
    surveyid_file,
    "data/surveyids.csv",
    format = "file"
  ),
  tar_target(
    surveyIds,
    parse_surveyIds(surveyid_file, cmd_args)
  ),
...

parse_args() takes command line arguments so is first entry into the DAG.

This creates from tar_network()$edges a dependency of from = "parse_args" to="cmd_args", but "parse_args" is not in the tar_manifest() so the DAG build fails.

Am I approaching this the wrong way, is there a way to handle the above situation?

The current pertinent code is

 myMessage("Resolving targets::tar_manifest()", level = 3)
  nodes <- targets::tar_manifest()
  edges <- targets::tar_network()$edges

  first_id <- nodes$name[[1]]

  myMessage("# Building DAG:", level = 3)
  bst <- lapply(nodes$name, function(x){
    wait_for <- edges[edges$to == x,"from"][[1]]
    if(length(wait_for) == 0){
      wait_for <- NULL
    }

    if(x == first_id){
      wait_for <- "get previous _targets metadata"
    }

    myMessage("[",
              paste(wait_for, collapse = ", "),
              "] -> [", x, "]",
              level = 3)

    cr_buildstep_targets(
      task_args = list(
        waitFor = wait_for
      ),
      tar_make = c(tar_config, sprintf("targets::tar_make('%s')", x)),
      task_image = task_image,
      id = x
    )
  })

  bst <- unlist(bst, recursive = FALSE)

  if(is.null(last_id)){
    last_id <- nodes$name[[nrow(nodes)]]
  }
  last_id <- nodes$name[[nrow(nodes)]]


  myMessage("[",last_id,"] -> [ Upload Artifacts ]", level = 3)

  c(
    cr_buildstep_targets_setup(target_bucket),
    bst,
    cr_buildstep_targets_teardown(target_bucket,
                                  last_id = last_id)
  )

from https://github.com/MarkEdmondson1234/googleCloudRunner/blob/20f6e0836e67f9b802228c467a13e611e9bf4d43/R/buildstep_targets.R#L99

@wlandau
Copy link
Collaborator

wlandau commented Dec 22, 2021

This creates from tar_network()$edges a dependency of from = "parse_args" to="cmd_args", but "parse_args" is not in the tar_manifest() so the DAG build fails.

I think targets_only = TRUE in tar_network() should fix that.

@MarkEdmondson1234
Copy link
Contributor Author

Thanks, all working!

@wlandau
Copy link
Collaborator

wlandau commented Jan 10, 2022

Thanks for all your work on #722, Mark! With that PR merged, I think we can move on to resources, basically replicating https://github.com/ropensci/targets/blob/main/R/tar_resources_aws.R and https://github.com/ropensci/targets/blob/main/R/class_resources_aws.R for GCP and adding a gcp argument to tar_resources(). From a glance back at utils_gcp.R, I think the GCP resource fields should probably be bucket, prefix, and verbose. (We can think about part_size for multi-part uploads later if you think that is necessary.)

Are you still interested in implementing this?

@MarkEdmondson1234
Copy link
Contributor Author

Sure I will take a look see how far I get

@wlandau
Copy link
Collaborator

wlandau commented Jan 19, 2022

Thanks for #748, Mark! I think we are ready for a new "gcp_parquet" storage format much like the current tar_target(..., format = "aws_parquet"). For AWS S3, the allowed format settings are described in the docstring of tar_target(), and the internal functionality is handled with an S3 class hierarchy. R/class_aws.R is the parent class with all the AWS-specific methods, and then R/class_aws_parquet.R inherits its methods from multiple classes to create the desired behavior for "aws_parquet". There are automated tests tests/testthat/test-class_aws_parquet.R to check the architecture and a semi-automated test at tests/aws/test-class_aws_parquet.R to verify it actually works in a pipeline. Porting all that to GCP would be a great next PR. A PR after that could migrate other storage formats to GCP: "gcp_rds", "gcp_qs", "gcp_feather", and "gcp_file". ("gcp_file" will take a little more work because "aws_file" has some methods of its own.)

@wlandau
Copy link
Collaborator

wlandau commented Jan 19, 2022

Also, I think I am coming around to the idea of GCR inside targets via #753. I expect this to take a really long time, but I would like to work up to it. I picture something like tar_make_workers() which uses workers, and then the user could select a workers backend somehow (which could be semitransient future workers).

@MarkEdmondson1234
Copy link
Contributor Author

Thanks I will take a look. May I ask if its going to be a case of a central function with perhaps different parsing of the blobs of bytes for the different formats? (e.g. feather and parquet will look the same on GCS, but not in R). I recently added .rds parsing to avoid a trip to write to disk, would that perhaps be useful? https://code.markedmondson.me/googleCloudStorageR/reference/gcs_parse_download.html

I will take your word on what is best approach for the partial workers, it does seem more complicated than initial blush. It makes sense to me there is some kind of meta layer on top that chooses between Cloud Build, future, local etc. and I would say this is probably the trend on where cloud compute is going so worth looking at.

@wlandau
Copy link
Collaborator

wlandau commented Jan 25, 2022

Thanks I will take a look. May I ask if its going to be a case of a central function with perhaps different parsing of the blobs of bytes for the different formats? (e.g. feather and parquet will look the same on GCS, but not in R).

That's what I was initially picturing. For AWS, the store_read_object() method downloads the file and then calls a store_read_path() method specific to the local format. Makes it easier to write new formats because I do not have to worry about whether readRDS() or fst::read_fst() etc. can accept a connection object instead of a file path.

targets/R/class_aws.R

Lines 91 to 103 in 70846eb

store_read_object.tar_aws <- function(store) {
path <- store$file$path
tmp <- tempfile()
on.exit(unlink(tmp))
aws_s3_download(
key = store_aws_key(path),
bucket = store_aws_bucket(path),
file = tmp,
region = store_aws_region(path),
version = store_aws_version(path)
)
store_convert_object(store, store_read_path(store, tmp))
}

I recently added .rds parsing to avoid a trip to write to disk, would that perhaps be useful? https://code.markedmondson.me/googleCloudStorageR/reference/gcs_parse_download.html

Looks like that could speed things up in cases where connection objects are supported. However, it does require that we hold both the serialized blob and the unserialized R object in memory at the same time, and for a moment the garbage collector cannot clean up either one. This drawback turned out to be limiting in drake because storr serializes and hashes objects in memory, and sometimes the user's memory was less than double the largest object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants