### Introduction to Apache Spark

In this notebook we start to work with Apache Spark. This notebook is based on material supplied by Cloudera under their Cloudera Academic Partner program and the *Spark: The Definitive Guide* book by Bill Chambers and Matei Zaharia. You can find out more about Spark here: [https://spark.apache.org/](https://spark.apache.org/ "Apache Spark"). 

We will use a Databricks Community Edition Spark Cluster for this notebook. Sign up for a free account here: [https://databricks.com/signup#signup/community](https://databricks.com/signup#signup/community)

Topics
- Working with text files
- Working with delimited files
- Working with Parquet files
- Working with Hive tables
- Generating a Spark Dataframe
- Working with Pandas Dataframes

In [0]:
import urllib

# Change the stuXXX number to match your own student ID
# Get the ACCESS_KEY and SECRET_KEY from your stuXXX.txt file
# MAKE THE NEEDED CHANGES TO THE NEXT THREE LINES  
STUDENT_NUMBER = "stuXXX"
ACCESS_KEY = "XXXXXXXXXXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"

# Configure access to the course data on S3
# You should only need to run this once
ENCODED_SECRET_KEY = urllib.parse.quote(SECRET_KEY, "")
AWS_BUCKET_NAME = "cis442f-course-data"
MOUNT_NAME = "cis442f-data"
dbutils.fs.mount("s3n://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

# Configure access to your own location for storing data on S3
# You should only need to run this once
#AWS_BUCKET_NAME = "cis442f-student-data/stu099"
AWS_BUCKET_NAME = "cis442f-student-data/"+STUDENT_NUMBER
MOUNT_NAME = "my-data"
dbutils.fs.mount("s3n://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)


In [0]:
# If you ever need to unmount these locations for any reason you can do so
# by uncommenting and running the following two lines
# dbutils.fs.unmount ("/mnt/cis442f-data")
# dbutils.fs.unmount ("/mnt/my-data")

You  should have 
 - Read access for `s3://cis442f-course-data` as `/mnt/cis442f-data` 
 - Read/write access for `s3://cis442f-student-data/stuXXX` as `/mnt/my_data`

In [0]:
# Check to see that you can see the contents of the S3 bucket
display(dbutils.fs.ls("/mnt/cis442f-data"))

path,name,size
dbfs:/mnt/cis442f-data/duocar/,duocar/,0
dbfs:/mnt/cis442f-data/for-hive/,for-hive/,0
dbfs:/mnt/cis442f-data/input/,input/,0
dbfs:/mnt/cis442f-data/output/,output/,0


In [0]:
# Check to see that you can see the contents of the S3 bucket where you can store data
display(dbutils.fs.ls("/mnt/my-data"))

path,name,size
dbfs:/mnt/my-data/backups/,backups/,0
dbfs:/mnt/my-data/cluster-map.html,cluster-map.html,3450
dbfs:/mnt/my-data/databricks-training/,databricks-training/,0
dbfs:/mnt/my-data/duocar/,duocar/,0
dbfs:/mnt/my-data/homework/,homework/,0
dbfs:/mnt/my-data/map.html,map.html,2637
dbfs:/mnt/my-data/myduocar/,myduocar/,0
dbfs:/mnt/my-data/output/,output/,0
dbfs:/mnt/my-data/practice/,practice/,0
dbfs:/mnt/my-data/today/,today/,0


In [0]:
# The databricks environment automatically creates a SparkSession for us
# We can see it by just typing
spark

In [0]:
# The SparkSession has many properties including its own version
spark.version

A DataFrame is the most common Structured API. It represents a table of data with rows and columns as we have seen before. The list that defines the columns and the data types within the columns is called the _schema_. In this simple example we create a dataframe from individual data elements to illustrate several properties of DataFrames. More commonly the data will be imported from other sources.

In [0]:
# Use the `createDataFrame` method to create a Spark DataFrame
from pyspark.sql.types import *

schema = StructType([StructField("class", StringType()), StructField("student_id", IntegerType())])

df = spark.createDataFrame([("Xiang",1), ("David",2),("Jinghu",3),("Sasha",4),("Bin",5), ("Karthikeyan",6),("Daniel",7),("Luan",8),("Yiqian",9)], schema=schema)


In [0]:
# Use `printSchema` method to print schema of the DataFrame
df.printSchema()

In [0]:
# Use the `show` method to view the DataFrame
df.show()

In [0]:
# A couple of helper functions to clean up directories if needed
# They are used in this notebook to remove the outputs of previous
# executions saved to S3 without throwing an error if they
# do not exist (e.g. the first time the notebook is run)

def file_or_dir_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise
      
def remove_dir_and_contents_if_exists (path):
  if file_or_dir_exists (path):
    dbutils.fs.rm(path, recurse=True)
    print(path + " removed")
  else:
    print(path + " did not exist")

#### **Working with text files**

As you would expect there are many ways of loading and saving data. The `text` method of the [DataFrameReader](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader) class reads each line of of a text file into a row of a DataFrame with a single column named *value*.

This way of importing data would be suitable for capturing unstructured data e.g. html, xml or text for natural language processing.

In [0]:
# In this example we are reading data form S3
products_txt = spark.read.text("/mnt/cis442f-data/input/examples8/products")

products_txt.show(5, truncate=False) # If 'truncate' set to True strings longer than 20 chars truncated. If set to a number, truncates long strings to that length. 
products_txt.head(5) 

The `text` method of the [DataFrameWriter](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter) of a text file for storing a dataframe (see below).

In [0]:
# If the /practice/products_txt directory already exists the next cell would
# report an error that "path dbfs:/practice/products_txt already exists."

# We can delete it if it already exists using the helper functions we created above
path = "/practice/products_txt"
remove_dir_and_contents_if_exists (path)

In [0]:
# We can store the data in text format to the cluster's storage (equivalant to the hdfs we have been working with)
products_txt.write.text("/practice/products_txt")

You can find out more about the [Databricks file system](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html). This is how to list the contents of a directory

In [0]:
# We can confirm that the data is there
display(dbutils.fs.ls("/practice/products_txt"))


path,name,size
dbfs:/practice/products_txt/_SUCCESS,_SUCCESS,0
dbfs:/practice/products_txt/_committed_3014625248701639749,_committed_3014625248701639749,113
dbfs:/practice/products_txt/_started_3014625248701639749,_started_3014625248701639749,0
dbfs:/practice/products_txt/part-00000-tid-3014625248701639749-77ee3289-0e4c-4bcb-aabc-9766e1f3512f-174-1-c000.txt,part-00000-tid-3014625248701639749-77ee3289-0e4c-4bcb-aabc-9766e1f3512f-174-1-c000.txt,207


In [0]:
# Extract the full path and file name of the first data file in the results directory

import re

listing = dbutils.fs.ls("/practice/products_txt")
path = "/dbfs/practice/products_txt/"

for item in listing:
  if re.match(".*part-.*\\.txt", item[0]):
    file_name = re.findall("part-.*\\.txt", item[0])
    break
  
dbfs_path_and_file = path + file_name[0]
dbfs_path_and_file = dbfs_path_and_file[5:] # remove /dbfs prefix

print (dbfs_path_and_file)


In [0]:
# Let's confirm that the data stored is what we expected
print(dbutils.fs.head(dbfs_path_and_file))

In [0]:
# We can delete this directory by uncommenting the following statement
# Notice the similarity to Unix and hdfs commands
# dbutils.fs.rm("/practice/products_txt", recurse=True)

There is not much storage on the Databricks community cluster. So, we will use S3 instead for most of our work

####**Working with S3**

As long as you ran the cell with your IAM key you should be able to read from and write to S3. 
- You have read access at `s3://cis442f-course-data` as `/mnt/cis442f-data`
- You have read/write access at `s3://cis442f-student-data/stuXXX` as `/mnt/my_data`

In [0]:
# Reading the riders data set into a DataFrame using the `text` method of the DataFrameReader class
riders_txt = spark.read.text("/mnt/cis442f-data/duocar/raw/riders/")
riders_txt.show(5, truncate=False)
riders_txt.head(5)

The `text` method can also write a compressed file. In this example to an S3 bucket. See [Compression Formats](https://www.cloudera.com/documentation/enterprise/latest/topics/introduction_compression.html) information from Cloudera.

Use Cloudberry or Cyberduck to check that these directories have been written to your S3 bucket. Of course the rider data is actually in a csv format so we would want to read it as such rather than as simple text. We look at that next.

In [0]:
# If we ran the next cell before the output directories will exist
# We can delete them if they exist
path = "/mnt/my-data/output/spark/riders_text_compressed/"
remove_dir_and_contents_if_exists (path)

path = "/mnt/my-data/output/spark/riders_text/"
remove_dir_and_contents_if_exists (path)

In [0]:
#Writing the riders dataframe to S3
write_string = "/mnt/my-data/output/spark/riders_text"
riders_txt.write.text(write_string)

#Writing the riders dataframe to S3 in a compressed format
write_string = "/mnt/my-data/output/spark/riders_text_compressed"
# print(write_string)
riders_txt.write.text(write_string, compression = "bzip2") 

In [0]:
# Check to see that you can see the contents of the S3 bucket where you can store data
display(dbutils.fs.ls("/mnt/my-data/output/spark"))

path,name,size
dbfs:/mnt/my-data/output/spark/riders_text/,riders_text/,0
dbfs:/mnt/my-data/output/spark/riders_text_compressed/,riders_text_compressed/,0


Use Cloudberry or Cyberduck to confirm that the compressed version really does take up less disk space

####**Working with Delimited Data**

The rider data is actually a comma-delimited text file.  The `csv` method of `DataFrameReader` class reads a delimited text file.

In the following example we use the `csv` method and let Spark do its best to figure out the schema (`inferSchema`) from the data in each field

In [0]:
# Note that we did not have to create a new DataFrame to take a peek at what it contains. We asked Spark to just shows us a sample of the data as requested
# Also note that backslashes (\) are used to allow the command to span several lines. This can make code easier to read
spark \
  .read \
  .csv("/mnt/cis442f-data/duocar/raw/riders/", sep=",", header=True, inferSchema=True) \
  .show(5)

This is actually a convenience function for the more general syntax in the next paragraph. 

**Note:** If you use either method with `header` set to `True`, Spark assumes that a header row occurs in *every* file in the data directory you load.

In [0]:
# Create a DataFrame from the raw data
riders = spark \
  .read \
  .format("csv") \
  .option("sep", ",") \
  .option("header", True) \
  .option("inferSchema", True) \
  .load("/mnt/cis442f-data/duocar/raw/riders/")

Spark does its best to infer the schema from the column names and values. 
The `printSchema()` method shows us the schema.

In [0]:
riders.printSchema()

You can manually specify the schema instead of inferring it from the header row and column value
**Note** that the types specified are Spark Types imported from pyspark.sql.types. See Spark Documentation for more on [Spark Datatypes](http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types)

In [0]:
from pyspark.sql.types import *
# Specify the schema using this format of phython a StructType of python lists of field names and datattypes (using Spark Datatypes)
schema = StructType([
    StructField("id", StringType()),
    StructField("birth_date", DateType()),
    StructField("start_date", DateType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("sex", StringType()),
    StructField("ethnicity", StringType()),
    StructField("student", IntegerType()),
    StructField("home_block", StringType()),
    StructField("home_lat", DoubleType()),
    StructField("home_lon", DoubleType()),
    StructField("work_lat", DoubleType()),
    StructField("work_lon", DoubleType())
])

In [0]:
# Pass the schema to the `DataFrameReader`
riders2 = spark \
  .read \
  .format("csv") \
  .option("sep", ",") \
  .option("header", True) \
  .schema(schema) \
  .load("/mnt/cis442f-data/duocar/raw/riders/")

# Note:We must include the header option otherwise Spark will read the
# header row as a valid record

# Confirm the explicit schema:
riders2.printSchema()

# Note that the Spark DataTypes have been mapped to python DataTypes

In [0]:
#Remove existing tab delimited file if it exists
path = "/mnt/my-data/practice/riders_tsv"
remove_dir_and_contents_if_exists (path)

# Write the file to a tab-delimited file:
riders.write.csv(path, sep="\t")

In [0]:
# Check to see that you can see the contents of the S3 bucket where you can store data
display(dbutils.fs.ls("/mnt/my-data/practice/riders_tsv"))

path,name,size
dbfs:/mnt/my-data/practice/riders_tsv/_SUCCESS,_SUCCESS,0
dbfs:/mnt/my-data/practice/riders_tsv/_committed_8156922563988798198,_committed_8156922563988798198,113
dbfs:/mnt/my-data/practice/riders_tsv/_started_8156922563988798198,_started_8156922563988798198,0
dbfs:/mnt/my-data/practice/riders_tsv/part-00000-tid-8156922563988798198-e5c62397-1e95-404e-ad82-0a3f19aea076-200-1-c000.csv,part-00000-tid-8156922563988798198-e5c62397-1e95-404e-ad82-0a3f19aea076-200-1-c000.csv,195207


Check that the data is saved the way you expect

###**Working with Parquet files**

[Parquet](https://parquet.apache.org/) is a very popular columnar storage format for Hadoop.  Use the `parquet` method of the `DataFrameWriter` class to save a DataFrame in Parquet

In [0]:
#Remove existing parquet file if it exists
path = "/mnt/my-data/practice/riders_parquet/"
remove_dir_and_contents_if_exists (path)

riders.write.parquet(path)

# Note that the schema is stored with the data:
spark.read.parquet(path).printSchema() 

####**Working with Hive tables in Spark**
Use the `sql` method of the `SparkSession` class to run Hive queries (if the cluster has Hive installed)

In [0]:
# Use `sql` method of `SparkSession` class to run Hive queries
spark.sql("SHOW DATABASES").show()
# spark.sql("USE examples")

In [0]:
# We can create a table from an existing dataframe
riders.createOrReplaceTempView("riders_data")

In [0]:
# Use `sql` method of `SparkSession` class to run Hive queries
spark.sql("USE default").show()
spark.sql("SHOW TABLES").show()


In [0]:
# Use `sql` method of `SparkSession` class to run Hive queries
spark.sql("DESCRIBE riders_data").show()
spark.sql("SELECT * FROM riders_data LIMIT 10").show()

Note that the result of a Hive query is simply a Spark DataFrame. So, is is possible to use any of the many DataFrame methods that we will learn about on the results of a query from a table in Hive.
 It is also possible to save a DataFame as a Hive table and manipulate it using SQL type commands.

In [0]:
riders_via_sql = spark.sql("SELECT * FROM riders_data")
riders_via_sql.printSchema()
riders_via_sql.show(5)

In [0]:
# Create a database
spark.sql("CREATE DATABASE IF NOT EXISTS stu099").show()  

In [0]:
# Create table name
table_name = "stu099.riders_via_sql"  

# Use the `saveAsTable` method of the `DataFrameWriter`
# class to save a DataFrame as a Hive table
riders.write.saveAsTable(table_name)

# You can now manipulate this table in Hive
query = "DESCRIBE %s" % table_name
print(query)
spark.sql(query).show()

In [0]:
# Now drop the table to clean up our database
query = "DROP TABLE IF EXISTS %s" % table_name
print(query)
spark.sql(query)

####**Generating a Spark DataFrame**

Sometimes we need to generate a Spark DataFrame from scratch, for example, for testing purposes.
We already saw the use of `createDataFrame` method to create a Spark DataFrame

    from pyspark.sql.types import *
    schema = StructType([StructField("class", StringType()), StructField("student_id", IntegerType())])
    df = spark.createDataFrame([("Xiang",1), ("David",2),("Jinghu",3),("Sasha",4),("Bin",5), ("Karthikeyan",6),("Daniel",7),("Luan",8),("Yiqian",9)], schema=schema)

Use the `range` method to generate a sequence of integers and add new columns as appropriate.

In [0]:
# Use the `range` method to generate a sequence of integers and add new columns
# as appropriate.
spark.range(1000).show(5)

# Use the `rand` function to generate a uniform random variable:
from pyspark.sql.functions import rand
spark \
  .range(1000) \
  .withColumn("uniform", rand(12345)) \
  .show(5)

In [0]:
# or a Bernoulli random variable with `p = 0.25`:
bern_df = spark \
  .range(1000) \
  .withColumn("Bernoulli", (rand(12345) < 0.25).cast("int"))
  
# Generate a summary using the functional style:
bern_df.groupby("Bernoulli").count().show()

# Generate a summary using the SQL style:
bern_df.createOrReplaceTempView("bern")
spark.sql("SELECT Bernoulli, COUNT(*) AS count \
    FROM bern \
    GROUP BY Bernoulli") \
  .show()

In [0]:
# Use the `randn` function to generate a normal random variable:
from pyspark.sql.functions import randn
ran_df = spark.range(1000).withColumn("normal", 42 +  2 * randn(54321))
ran_df.show(5)

# In Spark we can use the `describe` method to get some overview statistics
# Here the mean of the normal column is ~42 with a standard deviation ~2
ran_df.describe("id", "normal").show()

### Working with Pandas
Be very careful not to download 'Big Data' into Pandas ... billions of rows could overwhelm your local machine. You would usually only download summary results or aggregations for further analysis or visualization

In [0]:
#Note the size of the riders Spark Dataframe
riders.count()

In [0]:
import pandas as pd

# take a sample of the riders Spark dataframe and load it into a Pandas dataframe
riders_pd = riders.sample(0.05).toPandas()
riders_pd.head()

Unnamed: 0,id,birth_date,start_date,first_name,last_name,sex,ethnicity,student,home_block,home_lat,home_lon,work_lat,work_lon
0,220200000014,1998-07-08,2017-01-01,Robert,Dunnan,male,White,1,380170003002002,46.897359,-96.801023,,
1,220200000036,1996-12-15,2017-01-02,Ben,Sparks,male,White,1,380170003002004,46.895864,-96.805807,,
2,220200000037,1996-07-27,2017-01-02,Isaac,Schamel,male,White,0,380170103033007,46.8226,-96.82657,,
3,220200000041,1945-05-22,2017-01-02,Emily,Fredrickson,female,White,0,380170406003014,46.649363,-97.016428,46.645552,-97.008445
4,220200000042,1975-06-30,2017-01-02,Courtney,Tarpley,female,White,0,380170405003010,46.785502,-96.82317,46.840588,-96.868087


In [0]:
demo_via_pandas = spark.createDataFrame(riders_pd)
demo_via_pandas.show(5)
demo_via_pandas.count()

###Hands On

![Hands-on](https://cis442f-open-data.s3.amazonaws.com/pictures/hands.png "Hands-on")


#### Exercises

(1) Create a small dataframe called person with the following
- data [(42, "Tianyuan", 11), (43, "Ziran", 11), (44, "Yubo", 12), (45, “Ling”, 16), (46, “William”, 17)] and
- schema ["id", "name", "city_id"]

(2) Use some methods we used above to gain insights about the DataFrame you just created
- print the schema of the datafame
- view its contents
- count the number of records using the `count()` method
    
(3) Read the raw driver file from S3 into a Spark DataFrame.

(4) Save the driver DataFrame as a JSON file in your practice directory.


Expect to use the Databricks and/or Spark documentation as well as Google to figure out how to complete the following 

(5) Figure out how to inspect the JSON file.

(6) Read the driver JSON file into a Spark DataFrame.

(7) Figure out how to delete the JSON file from within this Databricks notebook 

(8) Figure out how to remove the practice subdirectory from your S3 storage from within this Databricks notebook


**References you might need to learn about reading and writing in JSON data format**

[DataFrameReader](http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.json.html#pyspark.sql.DataFrameReader.json)

[DataFrameWriter](http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.json.html#pyspark.sql.DataFrameWriter.json)

**The following are essential reference more generally**

[PySpark Documentation](http://spark.apache.org/docs/latest/api/python/index.html)

[PySpark API Reference ](http://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html)

By the way . . . . Databricks has some datasets available for you to play with

In [0]:
display(dbutils.fs.ls("/databricks-datasets/"))



path,name,size
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-datasets/COVID/,COVID/,0
dbfs:/databricks-datasets/README.md,README.md,976
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359
dbfs:/databricks-datasets/adult/,adult/,0
dbfs:/databricks-datasets/airlines/,airlines/,0
dbfs:/databricks-datasets/amazon/,amazon/,0
dbfs:/databricks-datasets/asa/,asa/,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0


In [0]:
print(dbutils.fs.head("/databricks-datasets/README.md"))

In [0]:
print(dbutils.fs.head("/databricks-datasets/README.md"))

In [0]:
display(dbutils.fs.ls("/databricks-datasets/COVID/CORD-19/CORD-19.readme.md"))
print(dbutils.fs.head("/databricks-datasets/COVID/CORD-19/CORD-19.readme.md"))

path,name,size
dbfs:/databricks-datasets/COVID/CORD-19/CORD-19.readme.md,CORD-19.readme.md,2592


In [0]:
# Let's read in some flight data
flightData2015 = spark.read.option("inferSchema", "true") \
    .option("header", "true").csv("/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv")


In [0]:
flightData2015.show(5)


In [0]:
flightData2015.sort("count").show(5)

In [0]:
flightData2015.sort("count").explain()

In [0]:
%sh
ls


