In [None]:
# Process to install packages not native to Google CoLab - 
# in this case we are installing PySpark & Java

# MAKE SURE TO USE THIS BLOCK OF CODE FOR EVERY NOTEBOOK THAT YOU WANT TO USE SPARK IN

import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.<enter version>'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (185.10% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Conn                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Conn                                                                   

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

In [None]:
# Build a dataframe by using the createDataFrame() function in Spark
# Then insert row values, then define columns and column titles
dataframe = spark.createDataFrame([
                                   (0, "Here is our DataFrame"),
                                   (1, "We are making onf from scratch"),
                                   (2, "This will look very similar to a Pandas DataFrame")
], ["id", "words"])

dataframe.show()

+---+--------------------+
| id|               words|
+---+--------------------+
|  0|Here is our DataF...|
|  1|We are making onf...|
|  2|This will look ve...|
+---+--------------------+



In [None]:
# We can use SparkFiles to read in data directly to a DataFrame.
# This includes data from Amazon's Simple Storage Service (S3)

# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_1/food.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("food.csv"), sep=",", header=True)
df.show()

+-------+-----+
|   food|price|
+-------+-----+
|  pizza|    0|
|  sushi|   12|
|chinese|   10|
+-------+-----+



In [None]:
# Check the DataFrame schema(s) by printing it
df.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: string (nullable = true)



In [None]:
# We don't want "price" to be a string data-type column
# So now we're going to set our schema manually then apply it to the data

In [None]:
# Show the columns
df.columns

['food', 'price']

In [None]:
# Describe our data
df.describe()

DataFrame[summary: string, food: string, price: string]

In [None]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [None]:
# Next we'll create the schema by creating a StructType - one of Spark's complex types like a mpa or array
# The Struct Type will define the column name, the data type held, and a Boolean 
# that defines whether null values will be included or not


In [None]:
# Create the list of struct fields
schema = [StructField("food", StringType(), True), StructField("price", IntegerType(), True),]
schema

[StructField(food,StringType,true), StructField(price,IntegerType,true)]

In [None]:
# We'll now pass the manually consturcted schemas as fields in a StructType
# All of this will then be stored into a variable called "final"

In [None]:
#Pass in our fields
final = StructType(fields=schema)
final

StructType(List(StructField(food,StringType,true),StructField(price,IntegerType,true)))

In [None]:
# Now we can read in the data again, this time passing in 
# our new, manually created predefined schema

In [None]:
# Read our data with our new schema
dataframe = spark.read.csv(SparkFiles.get("food.csv"), schema=final, sep=",", header=True)
dataframe.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: integer (nullable = true)



In [None]:
dataframe['price']

Column<'price'>

In [None]:
type(dataframe['price'])

pyspark.sql.column.Column

In [None]:
dataframe.select('price')

DataFrame[price: int]

In [None]:
type(dataframe.select('price'))

pyspark.sql.dataframe.DataFrame

In [None]:
dataframe.select('price').show()

+-----+
|price|
+-----+
|    0|
|   12|
|   10|
+-----+



In [None]:
# Now we'll practice manipulating columns in Spark

In [None]:
# Examples of Column Manipulations Using Spark

# Add a new column
dataframe.withColumn('newprice', dataframe['price']).show()
# Update/Replace/Rename column name
dataframe.withColumnRenamed('price', 'newerprice').show()
# Double the price in a new column
dataframe.withColumn('doubleprice', dataframe['price']*2).show()
# Add a dollar to the price in a new column
dataframe.withColumn('add_one_dollar', dataframe['price']+1).show()
# Half the price in a new column
dataframe.withColumn('half_price', dataframe['price']/2).show()

+-------+-----+--------+
|   food|price|newprice|
+-------+-----+--------+
|  pizza|    0|       0|
|  sushi|   12|      12|
|chinese|   10|      10|
+-------+-----+--------+

+-------+----------+
|   food|newerprice|
+-------+----------+
|  pizza|         0|
|  sushi|        12|
|chinese|        10|
+-------+----------+

+-------+-----+-----------+
|   food|price|doubleprice|
+-------+-----+-----------+
|  pizza|    0|          0|
|  sushi|   12|         24|
|chinese|   10|         20|
+-------+-----+-----------+

+-------+-----+--------------+
|   food|price|add_one_dollar|
+-------+-----+--------------+
|  pizza|    0|             1|
|  sushi|   12|            13|
|chinese|   10|            11|
+-------+-----+--------------+

+-------+-----+----------+
|   food|price|half_price|
+-------+-----+----------+
|  pizza|    0|       0.0|
|  sushi|   12|       6.0|
|chinese|   10|       5.0|
+-------+-----+----------+



In [None]:
# Read in wine data from S3 Buckets
url1 = "https://s3.amazonaws.com/dataviz-curriculum/day_1/wine.csv"
spark.sparkContext.addFile(url1)
wine_df = spark.read.csv(SparkFiles.get("wine.csv"), sep=",", header=True)

# Show DataFrame
wine_df.show()

+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|country|         description|         designation|points|price|          province|            region_1|         region_2|           variety|              winery|
+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|     US|This tremendous 1...|   Martha's Vineyard|    96|  235|        California|         Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|
|  Spain|Ripe aromas of fi...|Carodorum Selecci...|    96|  110|    Northern Spain|                Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|
|     US|Mac Watson honors...|Special Selected ...|    96|   90|        California|      Knights Valley|           Sonoma|   Sauvignon Blanc|            Macauley|
|     US|This spent 20

In [None]:
# Now we'll deeply consider the differences between transformations and actions

In [None]:
# Transformations

In [None]:
# Order a DataFrame by ascending values
wine_df.orderBy(wine_df["points"].desc())

DataFrame[country: string, description: string, designation: string, points: string, price: string, province: string, region_1: string, region_2: string, variety: string, winery: string]

In [None]:
# ^ This is a Transformation because we didn't ask spark to run this command,
# We only told it what we want when we do eventually call this command

# These are instructions for Spark, Spark will recognize these instructions
# but won't act on them yet because we haven't asked for it to run/produce anything

In [None]:
# Actions

In [None]:
# Order a DataFrame by ascending values and call/show this change
wine_df.orderBy(wine_df["points"].desc()).show(15)

+-------+--------------------+--------------------+------+-----+------------+--------------------+------------+--------------------+--------------------+
|country|         description|         designation|points|price|    province|            region_1|    region_2|             variety|              winery|
+-------+--------------------+--------------------+------+-----+------------+--------------------+------------+--------------------+--------------------+
| France|98-100 Barrel sam...|       Barrel sample|    99| null|    Bordeaux|             Margaux|        null|Bordeaux-style Re...|      Ch̢teau Palmer|
| France|98-100 Barrel sam...|       Barrel sample|    99| null|    Bordeaux|            Pauillac|        null|Bordeaux-style Re...|Ch̢teau Pontet-Canet|
| France|98-100 Barrel sam...|       Barrel sample|    99| null|    Bordeaux|           Sauternes|        null|Bordeaux-style Wh...|     Ch̢teau d'Yquem|
| France|A magnificent Cha...|Dom P̩rignon Oeno...|    99|  385|   Champagne

In [None]:
# ^ This is an Action because we are telling Spark to run 
# and pull up our requested organization

# show() is an action that gives Spark the go-ahead to run 
# all of the transformations we gave it and produce a result

In [None]:
# Spark can import more functions, such as averages for example

In [None]:
# Import additional functions
from pyspark.sql.functions import avg
wine_df.select(avg("points")).show()

+-----------------+
|      avg(points)|
+-----------------+
|87.88834105383143|
+-----------------+



In [None]:
# ^ In this example the avg() function is a transformation while show() is an action

In [None]:
# Filter data or columns
wine_df.filter("price<20").show(8)

+--------+--------------------+--------------------+------+-----+---------------+--------------------+--------------------+------------------+--------------------+
| country|         description|         designation|points|price|       province|            region_1|            region_2|           variety|              winery|
+--------+--------------------+--------------------+------+-----+---------------+--------------------+--------------------+------------------+--------------------+
|Bulgaria|This Bulgarian Ma...|             Bergul̩|    90|   15|       Bulgaria|                null|                null|            Mavrud|        Villa Melnik|
|   Spain|Earthy plum and c...|              Amandi|    90|   17|        Galicia|       Ribeira Sacra|                null|           Menc�_a|      Don Bernardino|
|      US|There's a lot to ...|                null|    90|   18|     California|Russian River Valley|              Sonoma|        Chardonnay|            De Loach|
|      US|Massiv

In [None]:
# ^ filter() is a transformation while show() is an action

In [None]:
# We can Filter and select certain columns

In [None]:
# Filter by price on certain columns
wine_df.filter("price<25").select(['points','country','winery','price']).show(10)

+------+--------+--------------------+-----+
|points| country|              winery|price|
+------+--------+--------------------+-----+
|    95|      US|               Heitz|   24|
|    90|Bulgaria|        Villa Melnik|   15|
|    90|  France|      Bouvet-Ladubay|   22|
|    90|   Italy|    Casina di Cornia|   23|
|    90|   Spain|      Don Bernardino|   17|
|    90|      US|            De Loach|   18|
|    91|Portugal|    Herdade do Rocim|   23|
|    91|      US|   Trinity Vineyards|   19|
|    91|Portugal|Adega Cooperativa...|   15|
|    91|      US|          Eagle Glen|   22|
+------+--------+--------------------+-----+
only showing top 10 rows



In [None]:
# ^ Filter() and select() are two separate transformations while show() is an action

In [None]:
# Perform the same transformations using Python:

# Filter
wine_df.filter("price<40").show(5)
# Filter by price on certain columns
wine_df.filter("price<40").select(['points', 'country', 'winery', 'price']).show()

# Filter by exact state
wine_df.filter(wine_df['country'] == 'US').show()

+--------+--------------------+--------------------+------+-----+------------+--------------------+--------+---------------+--------------+
| country|         description|         designation|points|price|    province|            region_1|region_2|        variety|        winery|
+--------+--------------------+--------------------+------+-----+------------+--------------------+--------+---------------+--------------+
|      US|Heitz has made th...|          Grignolino|    95|   24|  California|         Napa Valley|    Napa|           Ros̩|         Heitz|
|Bulgaria|This Bulgarian Ma...|             Bergul̩|    90|   15|    Bulgaria|                null|    null|         Mavrud|  Villa Melnik|
|      US|Steely and perfum...|            Babushka|    90|   37|  California|Russian River Valley|  Sonoma|     Chardonnay|      Zepaltas|
|  France|Pale in color, th...|Nonpareil Tr̩sor ...|    90|   22|France Other|        Vin Mousseux|    null|Sparkling Blend|Bouvet-Ladubay|
|   Italy|Aromas of 