# Spark DataFrame - Operations
Now that you know the basics, let's get into operations. 

Objective: This exercise is similar to the Basics exercise, but uses DataFrame methods instead of SQL. We'll also be going through some more complex operations with a more realisitic dataset. 

In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('operations').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/04 05:13:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/04 05:13:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/05/04 05:13:50 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/05/04 05:13:50 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [3]:
# Schemas can only be inferred for CSV files. 
df = spark.read.csv('Datasets/apple_stock_data.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [4]:
# Let's get a better look at the data.
# We know that we can show a DataFrame, but that's resulted in a mess! 
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [6]:
# Instead, let's just grab the first row. Much neater! 
df.head(5)

[Row(Date='2010-01-04', Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date='2010-01-05', Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date='2010-01-06', Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004),
 Row(Date='2010-01-07', Open=211.75, High=212.000006, Low=209.050005, Close=210.58, Volume=119282800, Adj Close=27.28265),
 Row(Date='2010-01-08', Open=210.299994, High=212.000006, Low=209.06000500000002, Close=211.98000499999998, Volume=111902700, Adj Close=27.464034)]

## DataFrame Methods

In [14]:
# Even though we know SQL is available, let's try out some of the DataFrame methods.
# For this example, let's have a look at the opeening and closing value where close is less than 500.
df.filter("Close < 200").select('Date','Open','Close').show()
df.filter((df["Close"] < 200) & (df["Open"] < 200)).select('Date','Open','Close').show()

+----------+------------------+------------------+
|      Date|              Open|             Close|
+----------+------------------+------------------+
|2010-01-22|206.78000600000001|            197.75|
|2010-01-28|        204.930004|        199.289995|
|2010-01-29|        201.079996|        192.060003|
|2010-02-01|192.36999699999998|        194.729998|
|2010-02-02|        195.909998|        195.859997|
|2010-02-03|        195.169994|        199.229994|
|2010-02-04|        196.730003|        192.050003|
|2010-02-05|192.63000300000002|        195.460001|
|2010-02-08|        195.690006|194.11999699999998|
|2010-02-09|        196.419996|196.19000400000002|
|2010-02-10|        195.889997|195.12000700000002|
|2010-02-11|        194.880001|        198.669994|
|2010-02-23|        199.999998|        197.059998|
|2014-06-09|         92.699997|         93.699997|
|2014-06-10|         94.730003|             94.25|
|2014-06-11|         94.129997|         93.860001|
|2014-06-12|         94.040001|

In [21]:
# We can also use Python within the DataFrame filter method!
df.filter(df['Close'] < 500).select('Open','Close').show()
df.filter((df['Close'] < 100) & (df['Open'] < 100)).select('Open','Close').show()


+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows

+-------------

In [22]:
# And we can use multiple operations! 
# Here we're looking for significant increases in stock.
df.filter( (df['Close'] > 500) & (df['Open'] < 495) ).select('Open','Close').show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        491.500008|502.20999900000004|
|494.63999900000005|506.08998099999997|
+------------------+------------------+



## Using Collect
You may have noticed that showing a DataFrame can be quite messy and useless. Instead, let's try using the collect method to visualise the data. It's not necessarily better, just a different method of achieving similar results.

In [24]:
# Let's pick a row of data with a low of $197.16 and collect it.  
employeeResult = df.filter(df['Low'] == 197.16).collect()

In [25]:
# When we collect it, you may notice an interesting format. 
employeeResult

[Row(Date='2010-01-22', Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [33]:
# We can select the first row of data to shed the outer brackets.
employeeRow = employeeResult[0] #employeeResult[0] accesses the first row
#employeeRow = employeeResult[0][1] #employeeResult[0][1] accesses the 2nd column of that row
employeeRow

Row(Date='2010-01-22', Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [34]:
# And then visualise it simply as a dictionary. 
employeeRow.asDict()

{'Date': '2010-01-22',
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [36]:
# Why convert it into a dictionary? Because dictionaries have a lot of methods available.
# For example, we can simply call volume from the dictionary. 
employeeRow.asDict()['Date']

'2010-01-22'

# Aggregation and Dates
Let's shift gears a bit and focus on something different. Instead of simply eploring the data, let's try to find the average stock closing price per year. To do this, we'll first have to manipulate the Date column. Let's begin! 

In [37]:
# Let's import the relevant functions.
from pyspark.sql.functions import dayofmonth,month,hour,year,format_number

In [67]:
# And create a new column using the year function to manipulate date. 
df_with_year = df.withColumn("Year",year(df["Date"]))
df.show()
df_with_year.show()

df_with_month = df.withColumn("Month",month(df["Date"]))
df_with_date.show()


+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [71]:
from pyspark.sql.functions import year, month, date_format
df_with_ymd = df.withColumn("Year", year(df["Date"])) \
                .withColumn("Month", month(df["Date"])) \
                .withColumn("Dates", dayofmonth(df["Date"]))
df_with_ymd.select('Date','Open','Close', 'Year', 'Month', 'Dates').show()

+----------+------------------+------------------+----+-----+-----+
|      Date|              Open|             Close|Year|Month|Dates|
+----------+------------------+------------------+----+-----+-----+
|2010-01-04|        213.429998|        214.009998|2010|    1|    4|
|2010-01-05|        214.599998|        214.379993|2010|    1|    5|
|2010-01-06|        214.379993|        210.969995|2010|    1|    6|
|2010-01-07|            211.75|            210.58|2010|    1|    7|
|2010-01-08|        210.299994|211.98000499999998|2010|    1|    8|
|2010-01-11|212.79999700000002|210.11000299999998|2010|    1|   11|
|2010-01-12|209.18999499999998|        207.720001|2010|    1|   12|
|2010-01-13|        207.870005|        210.650002|2010|    1|   13|
|2010-01-14|210.11000299999998|            209.43|2010|    1|   14|
|2010-01-15|210.92999500000002|            205.93|2010|    1|   15|
|2010-01-19|        208.330002|        215.039995|2010|    1|   19|
|2010-01-20|        214.910006|            211.7

In [82]:
# Now let's sumamrise the data by year, find the mean of each year and select the two columns we'd like to see.
df_summary_Year = df_with_year.groupBy("Year").mean().select(['Year','avg(Open)'])
df_summary_Year.show()
df_summary_Year.orderBy('Year').show()

+----+------------------+
|Year|         avg(Open)|
+----+------------------+
|2015|120.17575393253965|
|2013| 473.1281355634922|
|2014| 295.1426195357143|
|2012|     576.652720788|
|2016|104.50777772619044|
|2010| 259.9576190992064|
|2011|364.06142773412705|
+----+------------------+

+----+------------------+
|Year|         avg(Open)|
+----+------------------+
|2010| 259.9576190992064|
|2011|364.06142773412705|
|2012|     576.652720788|
|2013| 473.1281355634922|
|2014| 295.1426195357143|
|2015|120.17575393253965|
|2016|104.50777772619044|
+----+------------------+



In [83]:
# Now let's sumamrise the data by Month, find the mean of each Month and select the two columns we'd like to see.
df_summary_Month = df_with_ymd.groupBy("Month").mean().select(['Month','avg(Open)'])
df_summary_Month.show()
df_summary_Month.orderBy('Month').show()

+-----+------------------+
|Month|         avg(Open)|
+-----+------------------+
|   12|302.76953076510085|
|    1|322.90628572142856|
|    6|288.75166685333335|
|    3| 332.8893468300655|
|    5|351.59870752380965|
|    9| 301.3590970763887|
|    4| 341.0048631506845|
|    8| 300.2057422967742|
|    7| 281.2487831148649|
|   10| 308.5571721973687|
|   11|  306.612237797203|
|    2| 320.7831106888889|
+-----+------------------+

+-----+------------------+
|Month|         avg(Open)|
+-----+------------------+
|    1|322.90628572142856|
|    2| 320.7831106888889|
|    3| 332.8893468300655|
|    4| 341.0048631506845|
|    5|351.59870752380965|
|    6|288.75166685333335|
|    7| 281.2487831148649|
|    8| 300.2057422967742|
|    9| 301.3590970763887|
|   10| 308.5571721973687|
|   11|  306.612237797203|
|   12|302.76953076510085|
+-----+------------------+



In [84]:
# Now let's sumamrise the data by Dates, find the mean of each Date and select the two columns we'd like to see.
df_summary_Date = df_with_ymd.groupBy("Dates").mean().select(['Dates','avg(Open)'])
df_summary_Date.show()
df_summary_Date.orderBy('Dates').show()

+-----+------------------+
|Dates|         avg(Open)|
+-----+------------------+
|   31|   335.03625115625|
|   28| 316.2089659482759|
|   26| 314.0460731428571|
|   27| 328.8184208245614|
|   12|310.47483123333336|
|   22| 297.0716955423729|
|    1|301.86599863636366|
|   13| 321.4972878305083|
|    6|325.53614014035094|
|   16| 305.5162059827586|
|    3| 324.9258940892858|
|   20| 318.2015779473684|
|    5|323.07684407017535|
|   19| 315.9981353898305|
|   15|  305.063053101695|
|    9| 305.1209995666666|
|   17|308.29421015789484|
|    4| 317.8955350357143|
|    8| 297.7521300983607|
|   23|        308.176668|
+-----+------------------+
only showing top 20 rows

+-----+------------------+
|Dates|         avg(Open)|
+-----+------------------+
|    1|301.86599863636366|
|    2| 310.9580718070175|
|    3| 324.9258940892858|
|    4| 317.8955350357143|
|    5|323.07684407017535|
|    6|325.53614014035094|
|    7| 321.1116946949151|
|    8| 297.7521300983607|
|    9| 305.1209995666666|
| 

While the data may be accurate, it's not necessarily appropriate in a professional context. Instead, let's make a few adjustments to make it more appealing.

In [79]:
# To make it more visually appealing, let's format the mean to two decimal places.
df_formatted = df_summary_Year.select(['Year', format_number("avg(Open)",2)])
df_formatted.show()

+----+---------------------------+
|Year|format_number(avg(Open), 2)|
+----+---------------------------+
|2015|                     120.18|
|2013|                     473.13|
|2014|                     295.14|
|2012|                     576.65|
|2016|                     104.51|
|2010|                     259.96|
|2011|                     364.06|
+----+---------------------------+



In [80]:
# Let's change the name of the column to something that makes sense.
df_renamed = df_formatted.withColumnRenamed("format_number(avg(Open), 2)","Average Opening Price")
df_renamed.show()

+----+---------------------+
|Year|Average Opening Price|
+----+---------------------+
|2015|               120.18|
|2013|               473.13|
|2014|               295.14|
|2012|               576.65|
|2016|               104.51|
|2010|               259.96|
|2011|               364.06|
+----+---------------------+



In [81]:
# And finally order it by year.
df_renamed.orderBy('Year').show()

+----+---------------------+
|Year|Average Opening Price|
+----+---------------------+
|2010|               259.96|
|2011|               364.06|
|2012|               576.65|
|2013|               473.13|
|2014|               295.14|
|2015|               120.18|
|2016|               104.51|
+----+---------------------+



Great job! At this stage, it's a good idea to continue exploring the basics of DataFrames. Try different methods or reading the documentation.

When you feel comfortable, move on to the DataFrame Data Cleaning Exercise. 

If you would like a simpler aggregation example, try the DataFrame Aggregation Exercise. 