# Spark DataFrame - Operations
Now that you know the basics, let's get into operations.

Objective: This exercise is similar to the Basics exercise, but uses DataFrame methods instead of SQL. We'll also be going through some more complex operations with a more realisitic dataset.

In [1]:
!apt-get install openjdk-11-jdk-headless -qq
!pip install -q pyspark findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
# No need to download Spark—pip installation includes Spark JARs

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ColabSpark").getOrCreate()
spark

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Schemas can only be inferred for CSV files.
#change the path
df = spark.read.csv('Datasets/apple_stock_data.csv', inferSchema=True, header=True)
df.printSchema()

In [6]:
# Let's get a better look at the data.
# We know that we can show a DataFrame, but that's resulted in a mess!
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [7]:
# Instead, let's just grab the first row. Much neater!
df.head(1)

[Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

## DataFrame Methods

In [8]:
# Even though we know SQL is available, let's try out some of the DataFrame methods.
# For this example, let's have a look at the opeening and closing value where close is less than 500.
df.filter("Close < 500").select('Open','Close').show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [9]:
# We can also use Python within the DataFrame filter method!
df.filter(df['Close'] < 500).select('Open','Close').show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [10]:
# And we can use multiple operations!
# Here we're looking for significant increases in stock.
df.filter( (df['Close'] > 500) & (df['Open'] < 495) ).select('Open','Close').show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        491.500008|502.20999900000004|
|494.63999900000005|506.08998099999997|
+------------------+------------------+



## Using Collect
You may have noticed that showing a DataFrame can be quite messy and useless. Instead, let's try using the collect method to visualise the data. It's not necessarily better, just a different method of achieving similar results.

In [11]:
# Let's pick a row of data with a low of $197.16 and collect it.
employeeResult = df.filter(df['Low'] == 197.16).collect()

In [12]:
# When we collect it, you may notice an interesting format.
employeeResult

[Row(Date=datetime.date(2010, 1, 22), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [13]:
# We can select the first row of data to shed the outer brackets.
employeeRow = employeeResult[0]

employeeRow

Row(Date=datetime.date(2010, 1, 22), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [14]:
# And then visualise it simply as a dictionary.
employeeRow.asDict()

{'Date': datetime.date(2010, 1, 22),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [15]:
# Why convert it into a dictionary? Because dictionaries have a lot of methods available.
# For example, we can simply call volume from the dictionary.
employeeRow.asDict()['Volume']

220441900

# Aggregation and Dates
Let's shift gears a bit and focus on something different. Instead of simply eploring the data, let's try to find the average stock closing price per year. To do this, we'll first have to manipulate the Date column. Let's begin!

In [16]:
# Let's import the relevant functions.
from pyspark.sql.functions import dayofmonth,month,hour,year,format_number

In [17]:
# And create a new column using the year function to manipulate date.
df_with_year = df.withColumn("Year",year(df["Date"]))

df_with_year.head(1)

[Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039, Year=2010)]

In [18]:
# Now let's sumamrise the data by year, find the mean of each year and select the two columns we'd like to see.
df_summary = df_with_year.groupBy("Year").mean().select(['Year','avg(Close)'])
df_summary.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



While the data may be accurate, it's not necessarily appropriate in a professional context. Instead, let's make a few adjustments to make it more appealing.

In [19]:
# To make it more visually appealing, let's format the mean to two decimal places.
df_formatted = df_summary.select(['Year', format_number("avg(Close)",2)])
df_formatted.show()

+----+----------------------------+
|Year|format_number(avg(Close), 2)|
+----+----------------------------+
|2015|                      120.04|
|2013|                      472.63|
|2014|                      295.40|
|2012|                      576.05|
|2016|                      104.60|
|2010|                      259.84|
|2011|                      364.00|
+----+----------------------------+



In [20]:
# Let's change the name of the column to something that makes sense.
df_renamed = df_formatted.withColumnRenamed("format_number(avg(Close), 2)","Average Closing Price")
df_renamed.show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2015|               120.04|
|2013|               472.63|
|2014|               295.40|
|2012|               576.05|
|2016|               104.60|
|2010|               259.84|
|2011|               364.00|
+----+---------------------+



In [21]:
# And finally order it by year.
df_renamed.orderBy('Year').show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2010|               259.84|
|2011|               364.00|
|2012|               576.05|
|2013|               472.63|
|2014|               295.40|
|2015|               120.04|
|2016|               104.60|
+----+---------------------+



Great job! At this stage, it's a good idea to continue exploring the basics of DataFrames. Try different methods or reading the documentation.

When you feel comfortable, move on to the DataFrame Data Cleaning Exercise.

If you would like a simpler aggregation example, try the DataFrame Aggregation Exercise.