In [None]:
# Module 16.4.1 PySpark in Google Colab Notebooks
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Module 16.4.2 Spark DataFrames and Datasets
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()


In [None]:
dataframe = spark.createDataFrame([
                                   (0,"Here is our DataFrame"),
                                   (1, "We are making one from scratch"),
                                   (2,"This will look very similar to a Panda DataFrame")
], ["id","words"])

dataframe.show()


In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_1/food.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("food.csv"), sep=",", header=True)


In [None]:
# Show DataFrame
df.show()

In [None]:
# Print our schema
df.printSchema()

In [None]:
# sShow the columns
df.columns

In [None]:
# Describe our data
df.describe()

In [None]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [None]:
# Next we need to create the list of struct fields
schema = [StructField("food", StringType(), True), StructField("price", IntegerType(), True),]
schema

In [None]:
# Pass in our fields
final = StructType(fields=schema)
final

In [None]:
# Read our data with our new schema
dataframe = spark.read.csv(SparkFiles.get("food.csv"), schema=final, sep=",", header=True)
dataframe.printSchema()

In [None]:
dataframe["price"]

In [None]:
type(dataframe['price'])

In [None]:
dataframe.select('price')

In [None]:
type(dataframe.select('price'))


In [None]:

dataframe.select('price').show()

In [None]:

# Add new column
dataframe.withColumn('newprice', dataframe['price']).show()
# Update column name
dataframe.withColumnRenamed('price','newerprice').show()
# Double the price
dataframe.withColumn('doubleprice',dataframe['price']*2).show()
# Add a dollar to the price
dataframe.withColumn('add_one_dollar',dataframe['price']+1).show()
# Half the price
dataframe.withColumn('half_price',dataframe['price']/2).show()

In [None]:
# Module 16.4.3 Spark Functions
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameFunctions").getOrCreate()


In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_1/wine.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("wine.csv"), sep=",", header=True)

# Show DataFrame
df.show()

In [None]:
# Order a DataFrame by ascending values
df.orderBy(df["points"].desc())

In [None]:
df.orderBy(df["points"].desc()).show(5)


In [None]:
#Skill Drill 16.4.3
df.orderBy(df["points"].asc()).show(50)

In [None]:
# Import Functions
from pyspark.sql.functions import avg 
df.select(avg("points")).show()

In [None]:
# Filter SQL method
df.filter("price<20").show(5)

In [None]:
# Filter Python method
df.filter("price<20").show(5)
# Filter by price on certain columns
df.filter("price<20").select(['points','country', 'winery','price']).show(5)

# Filter on exact state
df.filter(df["country"] == "US").show()

In [None]:
# Skill Drill 16.4.3 
# Filter Python method
df.filter("price>15").show(5)
# Filter by price on certain columns
df.filter("price>15").select(['points','country', 'winery','price']).show(5)

# Filter on exact state
df.filter(df["province"] == "California").show()

In [None]:
# Skill Drill 16.4.3 
# Filter Python method
df.filter("price>15").show(5)
# Filter by price on certain columns
df.filter("price>15").select(['points','country', 'winery','price']).show(5)

# Filter on exact state
df.filter(df["province"] == "California").show()