## PySpark Basics in CML

**Apache Spark is a general purpose framework for distributed computing that offers high performance for both batch and stream processing. It exposes APIs for Java, Python, R, and Scala, as well as an interactive shell for you to run jobs.**

**In Cloudera Machine Learning (CML), Spark and its dependencies are bundled directly into the CML engine Docker image or CML Runtime. CML supports fully-containerized execution of Spark workloads via Spark's support for the Kubernetes cluster backend. Users can interact with Spark both interactively and in batch mode.**

 

![title](img/sparkonk8s.png)

**The bottom line: Spark on CML is dramatically easier to use than Spark on Yarn on CDSW or other platforms. There is no need to distribute dependencies among execuotors. You can easily spin up multi-executor distributed Spark Sessions and interrupt them as needed, all within quotas and other cost monitoring constraints**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt

**A simple, single pod Spark Session can be instantiated as shown below. This creates a single pod for the Spark application**

In [3]:
spark = SparkSession\
    .builder\
    .master("local")\
    .appName("SimpleSession")\
    .getOrCreate()

In [4]:
spark

You can reach the Spark UI from the Spark Session as shown above. If clicking on the Spark UI link doesn't work, try the following. 

Copy the current URL, replace the "editor" portion with "engines" and append "spark-ui" at the end as shown below. 

![title](img/spark_ui.png)

You can stop the Spark session any time as below

In [8]:
spark.stop()

Let's try a Spark Session with more horsepower. You can increase number of cores and memory for the Driver and Executors with the following settings

In [13]:
spark = SparkSession\
    .builder\
    .master("local")\
    .appName("DistributedSession")\
    .config("spark.executor.memory","2g")\
    .config("spark.executor.instances","5")\
    .config("spark.executor.cores","8")\
    .config("spark.driver.memory","2g")\
    .getOrCreate()

You just created a distributed Spark Session with 1 Driver and 5 Executors. Each Executor has 8 cores and 2GB of memory. Not bad!

In [15]:
spark.stop()

Now onto some more details... while the above Spark Session will work, you won't be able to save data to Cloud Storage

In order to do so, you need to set the region and the bucket. But how do we get access to the values needed?

First, navigate to the CML Workspace. Click on the name of the Data Lake associated with the Workspace.

![title](img/workspace_to_dl.png)

The region will be visualized at the center of the page while the cloud storage bucket will be under the "CLoud Storage" tab. Make sure you onlu copy the initialy prefix reflecting the bucket.

![title](img/data_lake.png)

Now we finally have everything we need to launch a Spark Session, read and write DataFrames, models, or files with Cloud Storage

In [21]:
spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region", "us-east-2")\
    .config("spark.yarn.access.hadoopFileSystems", "s3a://gd01-uat2/")\
    .getOrCreate()

Notice that this is a perfect use case for CML Environment Variables. A better approach would be to use

![title](img/new_env_vars.png)

Don't forget that in order to use the variables you will have to kill this session and start a new one. Come back and launch the new Spark Session as shown below. 

When you resume, you can skip the previous cells but do make sure to run the first two as those are required to import Spark and other libraries. 

In [4]:
spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region", os.environ["REGION"])\
    .config("spark.yarn.access.hadoopFileSystems", os.environ["STORAGE"])\
    .getOrCreate()

We can read files from the local project folder, S3, ADLS, GCS or many other sources. Let's start by reading from the data folder in the local CML project.

In [6]:
df = spark.read.option('inferschema','true').csv(
  "data/LoanStats_2015_subset_071821.csv",
  header=True,
  sep=',',
  nullValue='NA'
)

We just created a Spark DataFrame. We will learn more about this API in Notebook 3, "Spark SQL Intro"

In [10]:
#Printing number of rows and columns:
print('Dataframe Shape')
print((df.count(), len(df.columns)))

Dataframe Shape
(35578, 105)


Let's create a Spark Table. The fun thing about CML is that this Spark SQL table is immediately picked up by the Hive Metastore.

This means that the data automatically appears in CDW and can be queried via Impala or Hive. 

It also means it can be viewed in the Data Catalog or with Atlas.

In [13]:
df.write.format('parquet').mode("overwrite").saveAsTable('default.my_spark_table') #this command wouldn't have worked without specifying the Spark Session as we did

Next, access the data from a CDW Virtual Warehouse. Make sure to select a Warehouse associated with the same Data Lake as the one used by this CML Workspace. Open the Hue browser from the HUE icon associated with the VW on the far right column.

![title](img/find_dw.png)

We can run a simple select * statement as this table doesn't have too much data in it. 

![title](img/select_star.png)

Finally, let's save the data to S3. 

In [18]:
 df.write\
 .option("header","true")\
 .parquet("s3a://gd01-uat2/datalake/mydata") #this command wouldn't have worked without specifying the Spark Session as we did

Congratulations, you are now finished with Spark basics for CML! Next, open Notebook 3 to learn more about the Spark SQL API.