## Data Pipeline using Spark and BigQuery on Dataproc
Let us see how we can manage Dataproc Workflows using gcloud commands.
* Step 1: Create Dataproc Workflow Template
* Step 2: Configure active Dataproc cluster (we can also configure new cluster)
* Step 3: Add Spark SQL or Pyspark Jobs to Dataproc Workflow Templates with Dependencies
* Step 4: Run and Validate the Dataproc Workflow Template

We can take care of all the steps using `gcloud` commands.

In [None]:
!gsutil ls -r gs://airetail/scripts

In [None]:
!gsutil cp -r ../../apps/daily_product_revenue_bq gs://airetail/apps

In [None]:
!gsutil ls -r gs://airetail/apps/

In [None]:
!gcloud config set dataproc/region us-central1

In [None]:
!gcloud dataproc workflow-templates list

Here is the command to delete Dataproc Workflow Template (multiline approach doesn't work on Windows)

```shell
gcloud dataproc workflow-templates \
    delete wf-daily-product-revenue-bq
```

In [None]:
!gcloud dataproc workflow-templates delete wf-daily-product-revenue-bq --quiet

Here is the command to create Dataproc Workflow.

```shell
gcloud dataproc workflow-templates \
    create wf-daily-product-revenue-bq
```

In [None]:
!gcloud dataproc workflow-templates create wf-daily-product-revenue-bq

In [None]:
!gcloud dataproc workflow-templates list

Here is the command to attach running or active Dataproc Cluster to the Dataproc Workflow. We need to specify the label for the cluster.

```shell
gcloud dataproc workflow-templates \
    set-cluster-selector \
    wf-daily-product-revenue-bq \
    --cluster-labels goog-dataproc-cluster-name=aidataprocdev
```

In [None]:
!gcloud dataproc workflow-templates set-cluster-selector wf-daily-product-revenue-bq --cluster-labels goog-dataproc-cluster-name=aidataprocdev

Here are the commands to add Spark SQL Jobs to the Dataproc Workflow.

```shell
gcloud dataproc workflow-templates add-job spark-sql \
    --step-id=job-cleanup \
    --file=gs://airetail/scripts/daily_product_revenue/cleanup.sql \
    --workflow-template=wf-daily-product-revenue-bq

# File Format Converter jobs with dependency on cleanup
gcloud dataproc workflow-templates add-job spark-sql \
    --step-id=job-convert-orders \
    --file=gs://airetail/scripts/daily_product_revenue/file_format_converter.sql \
    --params=bucket_name=gs://airetail,table_name=orders \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-cleanup

gcloud dataproc workflow-templates add-job spark-sql \
    --step-id=job-convert-order-items \
    --file=gs://airetail/scripts/daily_product_revenue/file_format_converter.sql \
    --params=bucket_name=gs://airetail,table_name=order_items \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-cleanup

# Last Job which depends on convert orders and order_items jobs
gcloud dataproc workflow-templates add-job spark-sql \
    --step-id=job-daily-product-revenue \
    --file=gs://airetail/scripts/daily_product_revenue/compute_daily_product_revenue.sql \
    --params=bucket_name=gs://airetail \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-convert-orders,job-convert-order-items
```

In [None]:
!gcloud dataproc workflow-templates add-job spark-sql --step-id=job-cleanup --file=gs://airetail/scripts/daily_product_revenue/cleanup.sql --workflow-template=wf-daily-product-revenue-bq

In [None]:

!gcloud dataproc workflow-templates add-job spark-sql --step-id=job-convert-orders --file=gs://airetail/scripts/daily_product_revenue/file_format_converter.sql --params=bucket_name=gs://airetail,table_name=orders --workflow-template=wf-daily-product-revenue-bq --start-after=job-cleanup

In [None]:
!gcloud dataproc workflow-templates add-job spark-sql --step-id=job-convert-order-items --file=gs://airetail/scripts/daily_product_revenue/file_format_converter.sql --params=bucket_name=gs://airetail,table_name=order_items --workflow-template=wf-daily-product-revenue-bq --start-after=job-cleanup

In [None]:
!gcloud dataproc workflow-templates add-job spark-sql --step-id=job-daily-product-revenue --file=gs://airetail/scripts/daily_product_revenue/compute_daily_product_revenue.sql --params=bucket_name=gs://airetail --workflow-template=wf-daily-product-revenue-bq --start-after=job-convert-orders,job-convert-order-items

In [None]:
!gcloud dataproc workflow-templates list

In [None]:
!gcloud dataproc workflow-templates add-job

In [None]:
!gcloud dataproc workflow-templates add-job pyspark

* Make sure to specify right project id in below command. Also reformat it to single line while running on Windows.

You can use the `gcloud dataproc jobs submit` command as reference.

```
gcloud dataproc jobs submit \
    pyspark --cluster=aidataprocdev \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
	--properties=spark.app.name="BigQuery Loader - Daily Product Revenue" \
    --properties=spark.submit.deployMode=cluster \
    --properties=spark.yarn.appMasterEnv.DATA_URI=gs://airetail/retail_gold.db/daily_product_revenue \
    --properties=spark.yarn.appMasterEnv.PROJECT_ID=tidy-fort-361710 \
    --properties=spark.yarn.appMasterEnv.DATASET_NAME=retail \
    --properties=spark.yarn.appMasterEnv.GCS_TEMP_BUCKET=airetail \
    gs://airetail/apps/daily_product_revenue_bq/app.py
```

In [None]:
!gcloud dataproc workflow-templates \
    add-job pyspark \
    --step-id=job-load-dpr-bq \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
	--properties=spark.name="BigQuery Loader - Daily Product Revenue",spark.submit.deployMode=cluster,spark.yarn.appMasterEnv.DATA_URI=gs://airetail/retail_gold.db/daily_product_revenue,spark.yarn.appMasterEnv.PROJECT_ID=tidy-fort-361710,spark.yarn.appMasterEnv.DATASET_NAME=retail,spark.yarn.appMasterEnv.GCS_TEMP_BUCKET=airetail \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-daily-product-revenue \
    gs://airetail/apps/daily_product_revenue_bq/app.py

In [None]:
!gcloud dataproc workflow-templates describe wf-daily-product-revenue-bq

Here is the command to instantiate or run Dataproc Workflow.

```shell
gcloud dataproc workflow-templates \
    instantiate wf-daily-product-revenue-bq
```

In [None]:
!gcloud dataproc clusters list

In [None]:
!gcloud dataproc clusters start aidataprocdev

In [None]:
# This will take some time to run

!gcloud dataproc workflow-templates instantiate wf-daily-product-revenue-bq

In [None]:
!gcloud dataproc workflow-templates remove-job

In [None]:
!gcloud dataproc workflow-templates remove-job wf-daily-product-revenue-bq --step-id=job-load-dpr-bq --quiet

In [None]:
!gcloud dataproc workflow-templates \
    add-job pyspark \
    --step-id=job-load-dpr-bq \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
	--properties=spark.name="BigQuery Loader - Daily Product Revenue",spark.submit.deployMode=cluster,spark.yarn.appMasterEnv.DATA_URI=gs://airetail/retail_gold.db/daily_product_revenue,spark.yarn.appMasterEnv.PROJECT_ID=tidy-fort-361710,spark.yarn.appMasterEnv.DATASET_NAME=retail,spark.yarn.appMasterEnv.GCS_TEMP_BUCKET=airetail \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-daily-product-revenue \
    gs://airetail/apps/daily_product_revenue_bq/app.py

In [None]:
!gcloud dataproc workflow-templates instantiate wf-daily-product-revenue-bq