<a href="https://colab.research.google.com/github/joekibz/moringa-wk6/blob/main/%5BJThiongo%5D_Apache_Spark_DataFrames_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Solution Notebook: Spark dataframes project


## Pre-requisites

In [1]:
# Installing pyspark
# ---
#
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
# Next, we run a local spark session
# ---
#
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import format_number

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

##Data Importation and Exploration

In [3]:
f = open('saf_stock.csv')

for i in range(0,4):
    print(f.readline())

Date,Open,High,Low,Close,Volume,Adj Close

2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619234999999996

2012-01-04,60.209998999999996,60.349998,59.470001,59.709998999999996,9593300,52.078475

2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539



Inferring the data types:
<br>Date column -> <i>string data type</i>
<br>Open, High, Low, Close, Volume, Adj close columns  -> <i>float data type</i>
<br>

###Determine the column names

The column names are as follows:<br>
-Date<br>
-Open<br>
-High<br>
-Low<br>
-Close<br>
-Volume<br>
-Adj Close<br>

###Start a spark session and load the stock file while inferring the data types. Make observations about the schema.


In [4]:
# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)

# Read JSON data into a DataFrame object `df`
df = sqlCtx.read.csv("saf_stock.csv", header=True)

df.printSchema()



root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



Spark inferred table with 7 columns, all of string datatype with option nullable=True

###Show the first 5 rows


In [5]:
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



###Use the describe method to learn about the data frame

In [6]:
df.createOrReplaceTempView('saf_stock')
tables = sqlCtx.tableNames()
print(tables)

['saf_stock']


In [7]:
#describe ...

query = 'select * from saf_stock'
sqlCtx.sql(query).describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|         10010500|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

##Data Preparation

###Format all the data to 2 decimal places i.e. format_number()

In [8]:
#fix 'Adj Close' column to remove space in column name

from pyspark.sql.types import IntegerType, FloatType
df = df.withColumnRenamed("Adj Close", "Adj_Close")

df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj_Close']

In [9]:
#Cast numeric columns from string type to float to permit later mathematical operations

df=df \
.withColumn("Open",df.Open.cast(FloatType())) \
.withColumn("High",df.High.cast(FloatType())) \
.withColumn("Low",df.Low.cast(FloatType())) \
.withColumn("Close",df.Close.cast(FloatType())) \
.withColumn("Volume",df.Volume.cast(FloatType())) \
.withColumn("Adj_Close",df.Adj_Close.cast(FloatType()))

df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Volume: float (nullable = true)
 |-- Adj_Close: float (nullable = true)



In [10]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [11]:
#Format_number(), 2 decimal places ....

fmt_df = df.select("Date", \
format_number("Open", 2).alias("Open"), \
format_number("High", 2).alias("High"), \
format_number("Low", 2).alias("Low"), \
format_number("Close", 2).alias("close"), \
format_number("Volume", 2).alias("Volume"), \
format_number("Adj_Close", 2).alias("Adj_Close"))


In [12]:
#Show formatted_df
fmt_df.head(5)


[Row(Date='2012-01-03', Open='59.97', High='61.06', Low='59.87', close='60.33', Volume='12,668,800.00', Adj_Close='52.62'),
 Row(Date='2012-01-04', Open='60.21', High='60.35', Low='59.47', close='59.71', Volume='9,593,300.00', Adj_Close='52.08'),
 Row(Date='2012-01-05', Open='59.35', High='59.62', Low='58.37', close='59.42', Volume='12,768,200.00', Adj_Close='51.83'),
 Row(Date='2012-01-06', Open='59.42', High='59.45', Low='58.87', close='59.00', Volume='8,069,400.00', Adj_Close='51.46'),
 Row(Date='2012-01-09', Open='59.03', High='59.55', Low='58.92', close='59.18', Volume='6,679,300.00', Adj_Close='51.62')]

###Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day


In [13]:

df = df.withColumn("HV_Ratio", df.High / df.Volume)

df.head(5)

[Row(Date='2012-01-03', Open=59.970001220703125, High=61.060001373291016, Low=59.869998931884766, Close=60.33000183105469, Volume=12668800.0, Adj_Close=52.61923599243164, HV_Ratio=4.819714682786927e-06),
 Row(Date='2012-01-04', Open=60.209999084472656, High=60.349998474121094, Low=59.470001220703125, Close=59.709999084472656, Volume=9593300.0, Adj_Close=52.07847595214844, HV_Ratio=6.290848662516662e-06),
 Row(Date='2012-01-05', Open=59.349998474121094, High=59.619998931884766, Low=58.369998931884766, Close=59.41999816894531, Volume=12768200.0, Adj_Close=51.825538635253906, HV_Ratio=4.66941298944916e-06),
 Row(Date='2012-01-06', Open=59.41999816894531, High=59.45000076293945, Low=58.869998931884766, Close=59.0, Volume=8069400.0, Adj_Close=51.45922088623047, HV_Ratio=7.36733843444859e-06),
 Row(Date='2012-01-09', Open=59.029998779296875, High=59.54999923706055, Low=58.91999816894531, Close=59.18000030517578, Volume=6679300.0, Adj_Close=51.616214752197266, HV_Ratio=8.915604814435727e-06)]

##Data Analysis

In [14]:
df.createOrReplaceTempView('full_df')
tables = sqlCtx.tableNames()
print(tables)

['full_df', 'saf_stock']


###What day had the Peak High in Price?


In [15]:
query = 'select Date from full_df where High = (select MAX(High) from full_df)'
sqlCtx.sql(query).show()



+----------+
|      Date|
+----------+
|2015-01-13|
+----------+



###What is the mean of the Close column?

In [16]:
query = 'select AVG(Close) as Average_Close from full_df'
sqlCtx.sql(query).show()

+-----------------+
|    Average_Close|
+-----------------+
|72.38844997363553|
+-----------------+



###What is the max and min of the Volume column?

In [17]:
query = 'select MAX(Volume) as Max_Volume, MIN(Volume) as Min_Volume from full_df'
sqlCtx.sql(query).show()

+-----------+----------+
| Max_Volume|Min_Volume|
+-----------+----------+
|8.0898096E7| 2094900.0|
+-----------+----------+



###How many days was the Close lower than 60 dollars?

In [18]:
query = 'Select COUNT(*) as Close_LessThan_60 from full_df where Close < 60'
sqlCtx.sql(query).show()

+-----------------+
|Close_LessThan_60|
+-----------------+
|               81|
+-----------------+



###What percentage of the time was the High greater than 80 dollars?

In [19]:

query = 'SELECT (SELECT COUNT(High) FROM full_df where High > 80) * 100/ (select COUNT(High) from full_df) as Percentage  From full_df group by Percentage'

sqlCtx.sql(query).show()



+-----------------+
|       Percentage|
+-----------------+
|9.141494435612083|
+-----------------+



###What is the Pearson correlation between High and Volume?

In [20]:
df.stat.corr('High', 'Volume')

-0.3384326095027024

###What is the max High per year?


In [21]:
#newdf to extract year and month values from Date string...

from pyspark.sql.functions import *

newdf = df.select(year(df.Date).alias('_year'), month(df.Date).alias('_month'), "High", "Close")
newdf.show()

+-----+------+-----+-----+
|_year|_month| High|Close|
+-----+------+-----+-----+
| 2012|     1|61.06|60.33|
| 2012|     1|60.35|59.71|
| 2012|     1|59.62|59.42|
| 2012|     1|59.45| 59.0|
| 2012|     1|59.55|59.18|
| 2012|     1|59.71|59.04|
| 2012|     1|59.53| 59.4|
| 2012|     1| 60.0| 59.5|
| 2012|     1|59.61|59.54|
| 2012|     1|60.11|59.85|
| 2012|     1|60.03|60.01|
| 2012|     1|60.73|60.61|
| 2012|     1|61.25|61.01|
| 2012|     1|60.98|60.91|
| 2012|     1| 62.0|61.39|
| 2012|     1|61.61|61.47|
| 2012|     1|61.84|60.97|
| 2012|     1|61.12|60.71|
| 2012|     1|61.32| 61.3|
| 2012|     1|61.57|61.36|
+-----+------+-----+-----+
only showing top 20 rows



In [22]:
newdf.createOrReplaceTempView('full_df2')
tables = sqlCtx.tableNames()
print(tables)



['full_df', 'full_df2', 'saf_stock']


In [23]:
#Get the MAX High per year ...

query = 'select _year, MAX(High) as year_high from full_df2 group by _year'
sqlCtx.sql(query).show()

+-----+---------+
|_year|year_high|
+-----+---------+
| 2015|    90.97|
| 2013|    81.37|
| 2014|    88.09|
| 2012|     77.6|
| 2016|    75.19|
+-----+---------+



###What is the average Close for each calendar month?

In [24]:
query = 'select _month, AVG(Close) as month_average from full_df2 group by _month'
sqlCtx.sql(query).show()

+------+-----------------+
|_month|    month_average|
+------+-----------------+
|    12|72.84792482628012|
|     1| 71.4480196131338|
|     6| 72.4953774506191|
|     3|71.77794376266337|
|     5|72.30971685445533|
|     9|72.18411782208611|
|     4|72.97361900692894|
|     8| 73.0298185521906|
|     7|74.43971944078106|
|    10| 71.5785454489968|
|    11|72.11108927207418|
|     2|71.30680438169499|
+------+-----------------+

