# Instructions
As a Data professional, you need to perform an analysis by answering questions about
some stock market data on Safaricom from the years 2012-2017.
You will need to perform the following:


# Pre-requisites

In [45]:
# import pyspark module for analysis
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

# Data Importation and Exploration


In [46]:
#Read the csv saf_stock

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

df1 = sqlContext.read.load('saf_stock.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')






In [47]:
#Determine the column names


df1.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

#### observation: there are spaces on columns names that need to be removed

In [48]:
#remove spaces on headers and replace with '_'

from pyspark.sql import functions as F

df = df1.select([F.col(col).alias(col.replace(' ', '_')) for col in df1.columns])
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj_Close']

In [49]:
#● Make observations about the schema.

df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj_Close: double (nullable = true)



In [50]:
#Show the first 5 rows of column
df.head(5)

[Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj_Close=52.619234999999996),
 Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj_Close=52.078475),
 Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj_Close=51.825539),
 Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj_Close=51.45922),
 Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj_Close=51.616215000000004)]

In [51]:
#first we create an sql table from sqlcontext
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df.createOrReplaceTempView('saf_stock')
tables = sqlCtx.tableNames()
print(tables)

['saf_stock']


In [52]:
#Use the describe method to learn about the data frame
query = 'select Open,High, Low, Close, Volume,Adj_Close from saf_stock'
sqlCtx.sql(query).describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj_Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

# Data Preparation


In [53]:
#Format all the data to 2 decimal places i.e. format_number()
import pyspark.sql.functions as func


df_1 = df.withColumn("Open", func.round(df["Open"], 2))
df_2 = df_1.withColumn("High", func.round(df_1["High"], 2))
df_3 = df_2.withColumn("Low", func.round(df_2["Low"], 2))
df_4= df_3.withColumn("Close", func.round(df_3["Close"], 2))
df_5 = df_4.withColumn("Volume", func.round(df_4["Volume"], 2))
df2 = df_5.withColumn("Adj_Close", func.round(df_5["Adj_Close"], 2))
df2.show()

+-------------------+-----+-----+-----+-----+--------+---------+
|               Date| Open| High|  Low|Close|  Volume|Adj_Close|
+-------------------+-----+-----+-----+-----+--------+---------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12668800|    52.62|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9593300|    52.08|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12768200|    51.83|
|2012-01-06 00:00:00|59.42|59.45|58.87| 59.0| 8069400|    51.46|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6679300|    51.62|
|2012-01-10 00:00:00|59.43|59.71|58.98|59.04| 6907300|    51.49|
|2012-01-11 00:00:00|59.06|59.53|59.04| 59.4| 6365600|    51.81|
|2012-01-12 00:00:00|59.79| 60.0| 59.4| 59.5| 7236400|     51.9|
|2012-01-13 00:00:00|59.18|59.61|59.01|59.54| 7729300|    51.93|
|2012-01-17 00:00:00|59.87|60.11|59.52|59.85| 8500000|     52.2|
|2012-01-18 00:00:00|59.79|60.03|59.65|60.01| 5911400|    52.34|
|2012-01-19 00:00:00|59.93|60.73|59.75|60.61| 9234600|    52.86|
|2012-01-20 00:00:00|60.7

In [54]:
####Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day


new_df = df2.withColumn('HV_Ratio', df2.High/df2.Volume)
new_df.show()

+-------------------+-----+-----+-----+-----+--------+---------+--------------------+
|               Date| Open| High|  Low|Close|  Volume|Adj_Close|            HV_Ratio|
+-------------------+-----+-----+-----+-----+--------+---------+--------------------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12668800|    52.62|4.819714574387472E-6|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9593300|    52.08|6.290848821573389...|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12768200|    51.83|4.669413073103491E-6|
|2012-01-06 00:00:00|59.42|59.45|58.87| 59.0| 8069400|    51.46|7.367338339901356E-6|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6679300|    51.62|8.915604928660188E-6|
|2012-01-10 00:00:00|59.43|59.71|58.98|59.04| 6907300|    51.49|8.644477581688938E-6|
|2012-01-11 00:00:00|59.06|59.53|59.04| 59.4| 6365600|    51.81| 9.35182857861003E-6|
|2012-01-12 00:00:00|59.79| 60.0| 59.4| 59.5| 7236400|     51.9| 8.29141562102703E-6|
|2012-01-13 00:00:00|59.18|59.61|59.01|59.54| 7729300|

# Data Analysis

In [55]:
#What day had the Peak High in Price?

#new_df.agg({'High': 'max' }).collect()

#df.groupBy('address').max('height').collect()

#de = new_df.agg({'High': 'max' })
#de.show()
#de = select High.Max
import pandas as pd
t = new_df.agg({'High': 'max' })

x = t.collect()[0][0]

x



90.97

In [56]:
new_df.head()

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.97, High=61.06, Low=59.87, Close=60.33, Volume=12668800, Adj_Close=52.62, HV_Ratio=4.819714574387472e-06)

In [57]:
#What is the mean of the Close column?

from pyspark.sql.functions import avg
new_df.select(avg(new_df.Close)).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844992050863|
+-----------------+



In [58]:
#What is the max and min of the Volume column?
from pyspark.sql.functions import max
from pyspark.sql.functions import min
new_df.select(max(new_df.Volume).alias("Volume_max"), 
          min(new_df.Volume).alias("Volume_min")
    ).show()

+----------+----------+
|Volume_max|Volume_min|
+----------+----------+
|  80898100|   2094900|
+----------+----------+



In [59]:
#How many days was the Close lower than 60 dollars?
from pyspark.sql.functions import count
below_60_dolars = ''
below_60_dolars = 'select Close from saf_stock where Close < 60'
x = sqlCtx.sql(below_60_dolars).count()

print('Counts of below 60 dollars = ', x)

Counts of below 60 dollars =  81


In [60]:
#What percentage of the time was the High greater than 80 dollars?
#count the total times of High
High_count = ''

High_count = 'select High from saf_stock'
y = sqlCtx.sql(High_count).count()

#count the number of times High is greater than 80
High_Above_80_dolars = ''
High_Above_80_dolars = 'select High from saf_stock where High > 80'
z = sqlCtx.sql(High_Above_80_dolars).count()

# calculate the % using the occurence

a = 100*(z/y)

print ('What percentage of the time was the High greater than 80 dollars = ',a , '%' )

What percentage of the time was the High greater than 80 dollars =  9.141494435612083 %


In [61]:
#What is the Pearson correlation between High and Volume?

from pyspark.ml.stat import Correlation

new_df.corr('High' , 'Volume')

-0.33843260582148915

In [62]:
#What is the max High per year?

#split the column Date to year month and day 

#import split from pyspark

from pyspark.sql.functions import split

#extract the year from the date column

year_df = new_df.withColumn('year', split(new_df['Date'], '-').getItem(0))


#use the year to group by max of high per year

year_df.groupBy("year").max("High").show()

+----+---------+
|year|max(High)|
+----+---------+
|2016|    75.19|
|2012|     77.6|
|2014|    88.09|
|2013|    81.37|
|2015|    90.97|
+----+---------+



In [63]:
#What is the average Close for each Calendar Month?

#create a column for month.
calendar_month = new_df.withColumn('month', split(new_df['Date'], '-').getItem(1)) 

#calendar_month.show()

#group by the monthly average of close
calendar_month.groupBy("month").avg("Close").show()

+-----+-----------------+
|month|       avg(Close)|
+-----+-----------------+
|   07|74.43971962616824|
|   11|72.11108910891085|
|   01|71.44801980198022|
|   09|72.18411764705883|
|   05|72.30971698113206|
|   08|73.02981818181819|
|   03|71.77794392523363|
|   02|71.30680412371134|
|   06|72.49537735849057|
|   10|71.57854545454546|
|   12|72.84792452830189|
|   04|72.97361904761907|
+-----+-----------------+

