# Apache Spark DataFrames Project

## 1. Data Importation and Exploration

In [None]:
# To use Pyspark , We'll first install it
# ---
#
!pip install pyspark



In [None]:
# Next, we'll run a local spark session
# ---
#
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
#preview the first few rows


with open('saf_stock.csv') as f:
  for x in range(0,4):
    print(f.readline())

Date,Open,High,Low,Close,Volume,Adj Close

2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619234999999996

2012-01-04,60.209998999999996,60.349998,59.470001,59.709998999999996,9593300,52.078475

2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539



In [None]:
# To begin working, we'll load our data set into an RDD
# ---
# Dataset Download URL =  https://bit.ly/3pmchka
# Hint: Download and upload the file to colab
# ---
#
from pyspark.sql import SQLContext

# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)

# Read CSV data into a DataFrame object `df`
df = sqlCtx.read.csv("saf_stock.csv",header=True, inferSchema = True)

# Print the type
print(type(df))



<class 'pyspark.sql.dataframe.DataFrame'>


In [None]:
#next we will determine the column names and infer the data types 
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



Schema Observations:
1. There are 7 columns within the spark dataframe
2. all fields are nullable, meaning they cannot contain null values
3. the date column is in string format while volume column is an integer owing to the lack of decimal points

In [None]:
#Next we will display the first 5 records
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [None]:
#Finally, under exploration we will use the describe method to learn about the new df
df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

Obervations:
1. There are 1258 records in the database
2.

## 2. Data Preparation


In [None]:
# for the double dtype columns, we will format all the data to 2 decimal places i.e. format_number()
#we will pass each refomatting into a new df due to the immutable nature of spark DataFrames
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType

df1 = df.withColumn('Open',F.format_number('Open', 2).cast(DoubleType()))
df1.show(5)

+----------+-----+---------+---------+------------------+--------+------------------+
|      Date| Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+-----+---------+---------+------------------+--------+------------------+
|2012-01-03|59.97|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.21|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|59.35|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|59.42|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|59.03|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+-----+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [None]:
df2= df1.withColumn('High', F.format_number('High', 2).cast(DoubleType()))
df2.show(5)

+----------+-----+-----+---------+------------------+--------+------------------+
|      Date| Open| High|      Low|             Close|  Volume|         Adj Close|
+----------+-----+-----+---------+------------------+--------+------------------+
|2012-01-03|59.97|61.06|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.21|60.35|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|59.35|59.62|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|59.42|59.45|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|59.03|59.55|58.919998|             59.18| 6679300|51.616215000000004|
+----------+-----+-----+---------+------------------+--------+------------------+
only showing top 5 rows



In [None]:
df3=df2.withColumn('Low', F.format_number('Low', 2).cast(DoubleType()))
df3.show(5)

+----------+-----+-----+-----+------------------+--------+------------------+
|      Date| Open| High|  Low|             Close|  Volume|         Adj Close|
+----------+-----+-----+-----+------------------+--------+------------------+
|2012-01-03|59.97|61.06|59.87|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.21|60.35|59.47|59.709998999999996| 9593300|         52.078475|
|2012-01-05|59.35|59.62|58.37|         59.419998|12768200|         51.825539|
|2012-01-06|59.42|59.45|58.87|              59.0| 8069400|          51.45922|
|2012-01-09|59.03|59.55|58.92|             59.18| 6679300|51.616215000000004|
+----------+-----+-----+-----+------------------+--------+------------------+
only showing top 5 rows



In [None]:
df4=df3.withColumn('Close', F.format_number('Close', 2).cast(DoubleType()))
df4.show(5)

+----------+-----+-----+-----+-----+--------+------------------+
|      Date| Open| High|  Low|Close|  Volume|         Adj Close|
+----------+-----+-----+-----+-----+--------+------------------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|52.619234999999996|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|         52.078475|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|         51.825539|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400|          51.45922|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|51.616215000000004|
+----------+-----+-----+-----+-----+--------+------------------+
only showing top 5 rows



In [None]:
stocks=df4.withColumn('Adj Close', F.format_number('Adj Close', 2).cast(DoubleType()))
stocks.show(5)

+----------+-----+-----+-----+-----+--------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|
+----------+-----+-----+-----+-----+--------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 5 rows



In [None]:
# Next we need to create a new data frame with a column called HV Ratio that is the ratio of the
# High Price versus volume of stock traded for a day

hv_stocks = stocks.withColumn('HV Ratio', F.format_number(F.col('High')/F.col('Volume'),6))
hv_stocks.show(5)

+----------+-----+-----+-----+-----+--------+---------+--------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|HV Ratio|
+----------+-----+-----+-----+-----+--------+---------+--------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|0.000005|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|0.000006|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|0.000005|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400|    51.46|0.000007|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|0.000009|
+----------+-----+-----+-----+-----+--------+---------+--------+
only showing top 5 rows



## 3. Data Analysis


In [None]:
#Q1: What day had the Peak High in Price?

peak_price = hv_stocks.agg({"High": "max"}).collect()[0][0]
high = hv_stocks.filter(hv_stocks.High == peak_price)
high.show()

+----------+----+-----+-----+-----+-------+---------+--------+
|      Date|Open| High|  Low|Close| Volume|Adj Close|HV Ratio|
+----------+----+-----+-----+-----+-------+---------+--------+
|2015-01-13|90.8|90.97|88.93|89.31|8215400|    83.83|0.000011|
+----------+----+-----+-----+-----+-------+---------+--------+



In [None]:
#Q2: What is the mean of the Close column?
close_mean = hv_stocks.agg({'Close': 'mean'}).show()


+-----------------+
|       avg(Close)|
+-----------------+
|72.38844992050863|
+-----------------+



In [None]:
#Q3: What is the max and min of the Volume column?
peak_volume = hv_stocks.agg({"Volume": "max"}).show()
min_volume = hv_stocks.agg({"Volume": "min"}).show()


+-----------+
|max(Volume)|
+-----------+
|   80898100|
+-----------+

+-----------+
|min(Volume)|
+-----------+
|    2094900|
+-----------+



In [None]:
#Q4: How many days was the Close lower than 60 dollars?
low_close= hv_stocks.filter(hv_stocks.Close < 60).show()


+----------+-----+-----+-----+-----+--------+---------+--------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|HV Ratio|
+----------+-----+-----+-----+-----+--------+---------+--------+
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|0.000006|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|0.000005|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400|    51.46|0.000007|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|0.000009|
|2012-01-10|59.43|59.71|58.98|59.04| 6907300|    51.49|0.000009|
|2012-01-11|59.06|59.53|59.04| 59.4| 6365600|    51.81|0.000009|
|2012-01-12|59.79| 60.0| 59.4| 59.5| 7236400|     51.9|0.000008|
|2012-01-13|59.18|59.61|59.01|59.54| 7729300|    51.93|0.000008|
|2012-01-17|59.87|60.11|59.52|59.85| 8500000|     52.2|0.000007|
|2012-02-22|59.58| 59.9|58.37| 58.6|28630200|    51.11|0.000002|
|2012-02-23|58.59| 58.9|58.21|58.54|14880300|    51.06|0.000004|
|2012-02-24|58.75|58.95| 58.5|58.79| 9925900|    51.28|0.000006|
|2012-02-27| 58.7|58.78|5

In [None]:
#Q5: What percentage of the time was the High greater than 80 dollars?
total_days = hv_stocks.count()
high_greater_80 = hv_stocks[hv_stocks['High'] > 80]
days_high_greater_80 = high_greater_80.count()
print('Total days:',total_days,'\nNumber of days High was greater than 80:',days_high_greater_80)
print('Percentage of days High was greater than 80: {}%'.format((days_high_greater_80/total_days) * 100))
high_greater_80.show()

Total days: 1258 
Number of days High was greater than 80: 115
Percentage of days High was greater than 80: 9.141494435612083%
+----------+-----+-----+-----+-----+--------+---------+--------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|HV Ratio|
+----------+-----+-----+-----+-----+--------+---------+--------+
|2013-11-25|80.06|80.57|79.91|80.43| 5670400|    73.22|0.000014|
|2013-11-26|80.44|80.68|80.11|80.68| 5537800|    73.45|0.000015|
|2013-11-27|80.55| 81.0|80.38|80.93| 4813300|    73.67|0.000017|
|2013-11-29|81.17|81.35|80.82|81.01| 3447200|    73.75|0.000024|
|2013-12-02|80.89|81.28|80.37|81.11| 6178400|    73.84|0.000013|
|2013-12-03|81.21|81.33| 80.7|81.21| 7506400|    73.93|0.000011|
|2013-12-04|80.64|81.37|79.91|80.22| 7641200|    73.45|0.000011|
|2013-12-06|79.71|80.23|79.64|79.94| 5088100|     73.2|0.000016|
|2013-12-09|80.24|80.43| 79.7|79.95| 4491600|    73.21|0.000018|
|2014-11-10| 78.6|80.13|78.42|79.44|12640500|    74.15|0.000006|
|2014-11-13|80.96|83.06|80.8

In [None]:
#Q6: What is the Pearson correlation between High and Volume?
# hv_stocks.dtypes
hv_stocks.corr('High','Volume')

-0.33843260582148915

In [None]:
#Q7: What is the max High per year?
hv_stocks.groupby(F.date_format('Date','yyyy').alias('Year')).agg({'High': 'max'}).sort('Year').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2012|     77.6|
|2013|    81.37|
|2014|    88.09|
|2015|    90.97|
|2016|    75.19|
+----+---------+



In [None]:
#Q8: What is the average Close for each Calendar Month?
hv_stocks.groupby(F.date_format('Date','MM').alias('Month')).agg({'Close': 'mean'}).sort('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|   01|71.44801980198022|
|   02|71.30680412371134|
|   03|71.77794392523363|
|   04|72.97361904761907|
|   05|72.30971698113206|
|   06|72.49537735849057|
|   07|74.43971962616824|
|   08|73.02981818181819|
|   09|72.18411764705883|
|   10|71.57854545454546|
|   11|72.11108910891085|
|   12|72.84792452830189|
+-----+-----------------+

