**2022-04-11 `16.1-Big-Data - PySpark Fundamentals`**

**Resources**
* [Spark Docs: The most useful link of all for Spark](https://spark.apache.org/docs/3.2.1/api/sql/index.html)

**Objectives**
* Get up and running with Google Colab Notebook.
* Understand how Spark works.
* Store a dataset in a PySpark DataFrame.
* Filter DataFrames.
* Work with dates in data and plot results.

**Presentation**
* [16.1-Big-Data - PySpark Fundamentals](https://ucb.bootcampcontent.com/UCB-Coding-Bootcamp/ucb-virt-data-pt-10-2021-u-b/-/blob/master/03-Lesson-Plans/16-Big-Data/Slideshows/Data-M_16.1-Big-Data.pdf)

**Google Colab**
* [Colab Welcome](https://colab.research.google.com/notebooks/welcome.ipynb)
* [Google Drive](https://drive.google.com/)

![](../Images/Colab-Menu.png)

# ==========================================

### 1.01 Instructor Do: Spark Overview (0:10)

* [Colab Welcome](https://colab.research.google.com/notebooks/welcome.ipynb)
* [Google Drive](https://drive.google.com/)

# ==========================================

### 1.02 Everyone Do: Set Up Google Colab (0:10)

* [Colab Welcome](https://colab.research.google.com/notebooks/welcome.ipynb)
* [Google Drive](https://drive.google.com/)

# ==========================================

### 1.03 Instructor Do: PySpark DataFrame Basics (0:10)

[Spark distribution](http://www.apache.org/dist/spark/)

* Dataframe Basics

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

In [None]:
# Create DataFrame manually
dataframe = spark.createDataFrame([
                                (0, "Here is our DataFrame"),
                                (1, "We are making one from scratch"),
                                (2, "This will look very similar to a Pandas DataFrame")
], ["id", "words"])

dataframe.show()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_1/food.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("food.csv"), sep=",", header=True)

# Show DataFrame
df.show()

In [None]:
# Print our schema
df.printSchema()

In [None]:
# Show the columns
df.columns

In [None]:
# Describe our data
df.describe()

In [None]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [None]:
# Next we need to create the list of struct fields
schema = [StructField("food", StringType(), True), StructField("price", IntegerType(), True),]
schema

In [None]:
# Pass in our fields
final = StructType(fields=schema)
final

In [None]:
# Read our data with our new schema
dataframe = spark.read.csv(SparkFiles.get("food.csv"), schema=final, sep=",", header=True)
dataframe.show()

In [None]:
# Print it out
dataframe.printSchema()

* Accessing data

In [None]:
dataframe['price']

In [None]:
type(dataframe['price'])

In [None]:
dataframe.select('price')

In [None]:
type(dataframe.select('price'))

In [None]:
dataframe.select('price').show()

* Manipulating Columns

In [None]:
# Add new column
dataframe.withColumn('newprice', dataframe['price']).show()

In [None]:
# Update column name
dataframe.withColumnRenamed('price','newerprice').show()

In [None]:
# Double the price
dataframe.withColumn('doubleprice',dataframe['price']*2).show()

In [None]:
# Add a dollar to the price
dataframe.withColumn('add_one_dollar',dataframe['price']+1).show()

In [None]:
# Half the price
dataframe.withColumn('half_price',dataframe['price']/2).show()

In [None]:
# Collecting a column as a list
dataframe.select("price").collect()

* Converting PySpark DataFrame to Pandas DataFrame

In [None]:
import pandas as pd
pandas_df = dataframe.toPandas() 

# ==========================================

### 1.04 Student Do: Demographic DataFrame Basics (0:15)

# Demographic DataFrame Basics

## Instructions

In this activity, you will get the chance to explore Spark DataFrames. Follow the comments in the Notebook to clean and display stock data using Spark DataFrames. Remember to consult the [documentation](http://spark.apache.org/docs/latest/api/python/index.html).

---

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Demographics").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_1/demographics.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("demographics.csv"), sep=",", header=True)

# Show DataFrame
df.show()

In [None]:
# Print the column names
df.columns

In [None]:
# Print out the first 10 rows
df.show(10)

In [None]:
# Select the age, height_meter, and weight_kg columns and use describe to show the summary statistics
df.select(["age", "height_meter", "weight_kg"]).describe().show()

In [None]:
# Print the schema to see the types
df.printSchema()

In [None]:
# Rename the Salary column to `Salary (1k)` and show only this new column
df = df.withColumnRenamed('Salary', 'Salary (1k)')
df.select("Salary (1k)").show()

In [None]:
# Create a new column called `Salary` where the values are the `Salary (1k)` * 1000
# Show the columns `Salary` and `Salary (1k)`
df = df.withColumn("Salary", df["Salary (1k)"] * 1000)
df.select(["Salary", "Salary (1k)"]).show()

# ==========================================

### 1.05 Instructor Do: PySpark DataFrame Filtering (0:05)

* Filtering Data

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkFunctions").getOrCreate()

In [None]:
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_1/wine.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("wine.csv"), sep=",", header=True)

# Show DataFrame
df.show()

In [None]:
# Order a DataFrame by ascending values
df.orderBy(df["points"].asc()).show(5)

In [None]:
# Order a DataFrame by descending values
df.orderBy(df["points"].desc()).show(5)

In [None]:
# Import average function
from pyspark.sql.functions import avg
df.select(avg("points")).show()

In [None]:
# Using filter
df.filter("price<20").show()

In [None]:
# Filter by price on certain columns
df.filter("price<20").select(['points','country', 'winery','price']).show()

* Using Python Comparison Operators

In [None]:
# Same results only this time using python
df.filter(df["price"] < 200).show()

In [None]:
df.filter( (df["price"] < 200) | (df['points'] > 80) ).show()

In [None]:
df.filter(df["country"] == "US").show()

# ==========================================

### 1.06 Student Do: PySpark Demographic Filtering (0:20)

## Instructions

Using PySpark methods and the demographics dataset, answer the following questions:

* Which occupation had the highest salary?
* Which occupation had the lowest salary?
* What is the mean salary of this dataset?
* What is the `max` and `min` of the Salary column?
* Which occupations have salaries above 80k? List all of them.

### Bonus

What is the average age and height for each academic degree type?

**Hint:** You will need to use `groupBy` to answer this question.

---

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demographicsFilter").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_1/demographics.csv"
spark.sparkContext.addFile(url)
df = spark.read.option('header', 'true').csv(SparkFiles.get("demographics.csv"), inferSchema=True, sep=',')

# Show DataFrame
df.show()

In [None]:
# What occupation had the highest salary?
df.orderBy(df["Salary"].desc()).select("occupation", "Salary").limit(1).show()

In [None]:
# What occupation had the lowest salary?
df.orderBy(df["Salary"]).select("occupation", "Salary").limit(1).show()

In [None]:
# What is the mean salary of this dataset?
from pyspark.sql.functions import mean
df.select(mean("Salary")).show()

In [None]:
# What is the max and min of the Salary column?
from pyspark.sql.functions import max, min
df.select(max("Salary"), min("Salary")).show()

In [None]:
# Show all of the occupations where salaries were above 80k
from pyspark.sql.functions import count
df.filter("Salary > 80").select("occupation").show()

In [None]:
# BONUS
# What is the average age and height for each academic degree type?
# HINT: You will need to use `groupby` to solve this
avg_df = df.groupBy("academic_degree").avg()
avg_df.select("academic_degree", "avg(age)", "avg(height_meter)").show()

# ==========================================

### 1.07 Instructor Do: PySpark DataFrame Dates (0:05)

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkDates").getOrCreate()

In [None]:
# Load in data
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_1/rainfall.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("rainfall.csv"), sep=",", header=True, inferSchema=True, timestampFormat="yyyy/MM/dd HH:mm:ss")
df.show()

In [None]:
# Show schema to confirm date type
df.printSchema()

In [None]:
# Import date time functions
from pyspark.sql.functions import year

# Show the year for the date column
df.select(year(df["date"])).show()

In [None]:
# Save the year as a new column
df = df.withColumn("year", year(df['date']))
df.show()

In [None]:
# Find the average precipitation per year
averages = df.groupBy("year").avg()
averages.orderBy("year").select("year", "avg(prcp)").show()

In [None]:
from pyspark.sql.functions import month
df.select(month(df['Date'])).show()

In [None]:
df = df.withColumn("month", month(df['date']))
df.head()

In [None]:
maxes = df.groupBy("month").max()
maxes.orderBy("month").select("month", "max(prcp)").show()

In [None]:
# Import the summarized data to a pandas dataframe for plotting
# Note: If your summarized data is still too big for your local memory then your notebook may crash

pandas_df = maxes.orderBy("month").select("month", "max(prcp)").toPandas()
pandas_df.head()

In [None]:
import matplotlib.pyplot as plt
pandas_df.set_index("month", inplace=True)
pandas_df.plot.bar()

# ==========================================

### 1.08 Everyone Do: Plotting Bigfoot (0:15)

# Plotting Bigfoot

## Instructions

**Part 1**

1. Using the Bigfoot data, import the time functions and load in the DataFrame.
2. Create a new DataFrame with the column `Year`.
3. Save Year as a new column.
4. Find the total number of Bigfoot sightings per year.

**Part 2**

1. Import the summarized data to a Pandas DataFrame for plotting.
2. Clean the data and rename the columns Year and Sightings.
3. Plot the year and sightings.

---

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("bigfoot").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_1/bigfoot.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("bigfoot.csv"), header=True, inferSchema=True, timestampFormat="yyyy/MM/dd HH:mm:ss")

# Show DataFrame
df.show()

In [None]:
# Import date time functions
from pyspark.sql.functions import month, year

In [None]:
# Create a new DataFrame with the column Year
df.select(year(df["timestamp"])).show()

In [None]:
# Save the year as a new column
df = df.withColumn("year", year(df['timestamp']))
df.show()

In [None]:
# Find the total bigfoot sightings per year
averages = df.groupBy("year").count()
averages.orderBy("year").select("year", "count").show()

In [None]:
# Import the summarized data to a pandas DataFrame for plotting
# Note: If your summarized data is still too big for your local memory then your notebook may crash
import pandas as pd
pandas_df = averages.orderBy("year").select("year", "count").toPandas()
pandas_df.head()

In [None]:
# Clean the data and rename the columns to "year" and "sightings"
pandas_df = pandas_df.dropna()
pandas_df = pandas_df.rename(columns={"count": "sightings"})
pandas_df.head()

In [None]:
# Plot the year and sightings
%matplotlib inline
pandas_df.plot("year", "sightings")

# ==========================================

### Rating Class Objectives

* rate your understanding using 1-5 method in each objective

In [None]:
title = "16.1-Big-Data - PySpark Fundamentals"
objectives = [
    "Get up and running with Google Colab Notebook",
    "Understand how Spark works",
    "Store a dataset in a PySpark DataFrame",
    "Filter DataFrames",
    "Work with dates in data and plot results",
]
rating = []
total = 0
for i in range(len(objectives)):
    rate = input(objectives[i]+"? ")
    total += int(rate)
    rating.append(objectives[i] + ". (" + rate + "/5)")
print("="*96)
print(f"Self Evaluation for: {title}")
print("-"*24)
for i in rating:
    print(i)
print("-"*64)
print("Average: " + str(total/len(objectives)))