![ga4](https://www.google-analytics.com/collect?v=2&tid=G-6VDTYWLKX6&cid=1&en=page_view&sid=1&dl=statmike%2Fvertex-ai-mlops%2F08+-+R&dt=R+-+Dataproc+Cluster+Spark-R+Jobs.ipynb)
<!--- header table --->
<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/statmike/vertex-ai-mlops/blob/main/08%20-%20R/R%20-%20Dataproc%20Cluster%20Spark-R%20Jobs.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo">
      <br>Run in<br>Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https%3A//raw.githubusercontent.com/statmike/vertex-ai-mlops/main/08%20-%20R/R%20-%20Dataproc%20Cluster%20Spark-R%20Jobs.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo">
      <br>Run in<br>Colab Enterprise
    </a>
  </td>      
  <td style="text-align: center">
    <a href="https://github.com/statmike/vertex-ai-mlops/blob/main/08%20-%20R/R%20-%20Dataproc%20Cluster%20Spark-R%20Jobs.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      <br>View on<br>GitHub
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https%3A//raw.githubusercontent.com/statmike/vertex-ai-mlops/main/08%20-%20R/R%20-%20Dataproc%20Cluster%20Spark-R%20Jobs.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      <br>Open in<br>Vertex AI Workbench
    </a>
  </td>
</table>

# R - Dataproc Cluster Spark-R Jobs

Running an **R** script as a job using [SparkR](https://spark.apache.org/docs/latest/sparkr.html#overview).  Submit a prepared script to a Google Cloud [Dataproc](https://cloud.google.com/dataproc/docs/concepts/overview) cluster as a job.  A cluster can be started up in 90s.

> For a serverless approach to submitting a job check out the other workflow in this series:
>- [R - Dataproc Serverless Spark-R Jobs](./R%20-%20Dataproc%20Serverless%20Spark-R%20Jobs.ipynb)

---
Part of the series of [**R**](https://github.com/statmike/vertex-ai-mlops/blob/main/08%20-%20R/readme.md) workflows:

A series of workflows focused on using **R** in Vertex AI as well as other Google Cloud services to run R code, train models with R, and serve predictionns with R.

---

**Prerequisites:**

- This notebook running in Vertex AI Workbench Instance as described in the series [readme](./readme.md)
- Run the workflow: [R - Notebook Based Workflow](./R%20-%20Notebook%20Based%20Workflow.ipynb)
    - This prepares the data source used by the custom job in this workflow

---
## Installs

The list `packages` contains tuples of package import names and install names.  If the import name is not found then the install name is used to install quitely for the current user.

In [1]:
# tuples of (import name, install name)
packages = [
    ('google.cloud.storage', 'google-cloud-storage'),
    ('google.cloud.dataproc', 'google-cloud-dataproc')
]

import importlib
install = False
for package in packages:
    if not importlib.util.find_spec(package[0]):
        print(f'installing package {package[1]}')
        install = True
        !pip install {package[1]} -U -q --user

### Enable APIs

In [2]:
!gcloud services enable dataproc.googleapis.com

### Restart Kernel (If Installs Occured)

After a kernel restart the code submission can start with the next cell after this one.

In [3]:
if install:
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

---
## Setup

inputs:

In [13]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

'statmike-mlops-349915'

In [14]:
REGION = 'us-central1'
EXPERIMENT = 'dataproc-cluster'
SERIES = 'r'

# BigQuery Parameters
BQ_PROJECT = PROJECT_ID
BQ_DATASET = SERIES
BQ_TABLE = 'bigquery-data'
BQ_REGION = REGION[0:2]

# GCS Parameters: Give bucket name
GCS_BUCKET = PROJECT_ID

# key columns in the data:
VAR_TARGET = 'Class'
VAR_OMIT = ['transaction_id', 'splits']

packages:

In [15]:
from google.cloud import storage
from google.cloud import dataproc_v1

from IPython.display import Markdown as md
from datetime import datetime
import os

parameters:

In [16]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
URI = f"gs://{GCS_BUCKET}/{SERIES}/{EXPERIMENT}"

clients:

In [17]:
gcs = storage.Client(project = PROJECT_ID)
dataproc_cluster = dataproc_v1.ClusterControllerClient(client_options = dict(api_endpoint = f"{REGION}-dataproc.googleapis.com:443"))
dataproc_job = dataproc_v1.JobControllerClient(client_options = dict(api_endpoint = f"{REGION}-dataproc.googleapis.com:443"))

---
## Prepare Training Code: **SparkR** Script

The prior workflow in this series, [R - Notebook Based Workflow](./R%20-%20Notebook%20Based%20Workflow.ipynb), did the model training work in a notebook using an **R** kernel.  

The first step is converting the workflow of the prior notebook to a script that will run with SparkR. The steps from the notebook workflow have been replicated in the **R** script included with this repository.  The cell below loads and shows this script.  
- review directly in GitHub with [this link](https://github.com/statmike/vertex-ai-mlops/blob/main/08%20-%20R/code/sparkr.R)

**Notes On Script**
- The steps are replicated identically with the following additions:


In [18]:
# load a view the script:
SCRIPT_PATH = './code/sparkr.R'

with open(SCRIPT_PATH, 'r') as file:
    data = file.read()
md(f"```R\n\n{data}\n```")

```R

n <- 1000000  # Number of random points
x <- runif(n, -1, 1)
y <- runif(n, -1, 1)

inside <- x^2 + y^2 <= 1  # Points within the unit circle
pi_estimate <- 4 * sum(inside) / n 
print(pi_estimate)

```

---
## Run SparkR Job On Dataproc Cluster

### Setup Dataproc

- Network Configuration: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/network

Current networks name:

In [19]:
!gcloud compute networks list

NAME     SUBNET_MODE  BGP_ROUTING_MODE  IPV4_RANGE  GATEWAY_IPV4
default  AUTO         REGIONAL


Open subnet connectivity to allow ingress communication:

In [20]:
!gcloud compute firewall-rules create allow-internal-ingress \
--network=default \
--source-ranges=10.128.0.0/9 \
--direction=ingress \
--action=allow \
--rules=all

Creating firewall...failed.                                                    
[1;31mERROR:[0m (gcloud.compute.firewall-rules.create) Could not fetch resource:
 - The resource 'projects/statmike-mlops-349915/global/firewalls/allow-internal-ingress' already exists



### Create A Dataproc Cluster

A cluster can be create using the `gcloud` CLI, REST, Console, or clients in a number of languages, like [here with Python](https://cloud.google.com/python/docs/reference/dataproc/latest).  [Reference](https://cloud.google.com/dataproc/docs/guides/create-cluster#dataproc-create-cluster-python)

In [54]:
cluster_specs = dict(
    project_id = PROJECT_ID,
    cluster_name = f'{SERIES}-{EXPERIMENT}',
    config = dict(
        master_config = dict(num_instances = 1, machine_type_uri = 'n1-standard-2'),
        worker_config = dict(num_instances = 3, machine_type_uri = 'n1-standard-2')
    )
)

In [55]:
cluster = dataproc_cluster.create_cluster(
    project_id = PROJECT_ID,
    region = REGION,
    cluster = cluster_specs
)

In [56]:
result = cluster.result()

In [57]:
result.cluster_name

'r-dataproc-cluster'

In [58]:
print(f'Review the Dataproc cluster in the console at this link:\nhttps://console.cloud.google.com/dataproc/clusters/{result.cluster_name}/monitoring?region={REGION}&project={PROJECT_ID}')

Review the Dataproc cluster in the console at this link:
https://console.cloud.google.com/dataproc/clusters/r-dataproc-cluster/monitoring?region=us-central1&project=statmike-mlops-349915


### Copy Script To GCS

In [59]:
bucket = gcs.lookup_bucket(GCS_BUCKET)
SOURCEPATH = f'{SERIES}/{EXPERIMENT}/models/{TIMESTAMP}'

In [60]:
blob = bucket.blob(f'{SOURCEPATH}/sparkr.R')
blob.upload_from_filename(SCRIPT_PATH)

In [61]:
blob.name

'r/dataproc-cluster/models/20240129004317/sparkr.R'

### Submit Job

The [script can be submitted](https://cloud.google.com/dataproc/docs/guides/submit-job) with Google Cloud Console, the [`gcloud` CLI](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/spark-r) or [one of the APIs](https://cloud.google.com/dataproc/docs/reference) including the [Python Client](https://cloud.google.com/python/docs/reference/dataproc/latest) used here.

In [62]:
job_specs = dict(
    placement = dict(cluster_name = cluster_specs['cluster_name']),
    spark_r_job = dict(
        main_r_file_uri = f'gs://{GCS_BUCKET}/{blob.name}',
        args = ['1000']
    )
)

In [63]:
job = dataproc_job.submit_job(
    project_id = PROJECT_ID,
    region = REGION,
    job = job_specs
)

In [64]:
job.reference.job_id

'5ec7978b-37de-4b71-9cce-b500d9913ba2'

### Wait On Job

In [65]:
while True:
    getjob = dataproc_job.get_job(project_id = PROJECT_ID, region = REGION, job_id = job.reference.job_id)
    if getjob.status.state.name == "ERROR":
        raise Exception(getjob.status.details)
    elif getjob.status.state.name == "DONE":
        print ("Finished")
        break

Finished


In [66]:
#getjob

In [67]:
print(f'Review job details in the console at this link:\nhttps://console.cloud.google.com/dataproc/jobs/{job.reference.job_id}/monitoring?region={REGION}&project={PROJECT_ID}')

Review job details in the console at this link:
https://console.cloud.google.com/dataproc/jobs/5ec7978b-37de-4b71-9cce-b500d9913ba2/monitoring?region=us-central1&project=statmike-mlops-349915


### Delete Dataproc Cluster

When done with the cluster it should be delete to prevent additional costs.

In [68]:
delete_cluster = dataproc_cluster.delete_cluster(
    project_id = PROJECT_ID,
    region = REGION,
    cluster_name = cluster_specs['cluster_name']
)

In [69]:
result = delete_cluster.result()

In [70]:
result

