## Apache Spark DataFrames Project_2021

## Project Deliverable

You will be required to submit:

● A GitHub repository with your project written in Pyspark.

#### Instructions

As a Data professional, you need to perform an analysis by answering questions about
some stock market data on Safaricom from the years 2012-2017.
You will need to perform the following:

#### Data Importation and Exploration

● Start a spark session and load the stock file while inferring the data types.

● Determine the column names

● Make observations about the schema.

● Show the first 5 rows

● Use the describe method to learn about the data frame

#### Data Preparation
Format all the data to 2 decimal places i.e. format_number()

● Create a new data frame with a column called HV Ratio that is the ratio of the

- High Price versus volume of stock traded for a day

#### Data Analysis

● What day had the Peak High in Price?

● What is the mean of the Close column?

● What is the max and min of the Volume column?

● How many days was the Close lower than 60 dollars?

● What percentage of the time was the High greater than 80 dollars?

● What is the Pearson correlation between High and Volume?

● What is the max High per year?

● What is the average Close for each Calendar Month?

In [1]:
##Pre-requisites
# Installing pyspark
!pip install pyspark



In [2]:
! pip install pyspark



In [3]:
!pip install findspark



In [4]:
# Next, we run a local spark session
# ---
#
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [5]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df_data_1 = sqlCtx.read.format("csv").option("header","true").load("saf_stock.csv")
#Register the DataFrame as a SQL temporary view
df_data_1.registerTempTable('saf_stock')



In [6]:
#We now read our data set 
display(type(df_data_1))
print(type(df_data_1))

pyspark.sql.dataframe.DataFrame

<class 'pyspark.sql.dataframe.DataFrame'>


#### Determine the column names

In [7]:
df_data_1.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

The stocks dataframe has the below columns

['Date',]

['Open', ]

['High',]

['Low', ]

['Close',]

['Volume',]

['Adj Close]

In [8]:
sqlCtx.sql('SELECT * FROM saf_stock').show(20)# This is an SQL Query that returns the columns of the dataframe 
#with 20 results

+---------+---------+---------+---------+---------+--------+---------+
|     Date|     Open|     High|      Low|    Close|  Volume|Adj Close|
+---------+---------+---------+---------+---------+--------+---------+
| 1/3/2012|59.970001|61.060001|59.869999|60.330002|12668800|52.619235|
| 1/4/2012|60.209999|60.349998|59.470001|59.709999| 9593300|52.078475|
| 1/5/2012|59.349998|59.619999|58.369999|59.419998|12768200|51.825539|
| 1/6/2012|59.419998|59.450001|58.869999|       59| 8069400| 51.45922|
| 1/9/2012|59.029999|59.549999|58.919998|    59.18| 6679300|51.616215|
|1/10/2012|    59.43|59.709999|    58.98|59.040001| 6907300|51.494109|
|1/11/2012|59.060001|59.529999|59.040001|59.400002| 6365600|51.808098|
|1/12/2012|59.790001|       60|59.400002|     59.5| 7236400|51.895316|
|1/13/2012|    59.18|59.610001|59.009998|59.540001| 7729300|51.930204|
|1/17/2012|59.869999|60.110001|    59.52|59.849998| 8500000|52.200581|
|1/18/2012|59.790001|60.029999|59.650002|60.009998| 5911400|52.340131|
|1/19/

### Make observations about the schema

In [9]:
df_data_1.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



- our dataframe has seven columns type of string.

#### Show the first 5 rows

In [10]:
sqlCtx.sql('SELECT * FROM saf_stock').show(5)

+--------+---------+---------+---------+---------+--------+---------+
|    Date|     Open|     High|      Low|    Close|  Volume|Adj Close|
+--------+---------+---------+---------+---------+--------+---------+
|1/3/2012|59.970001|61.060001|59.869999|60.330002|12668800|52.619235|
|1/4/2012|60.209999|60.349998|59.470001|59.709999| 9593300|52.078475|
|1/5/2012|59.349998|59.619999|58.369999|59.419998|12768200|51.825539|
|1/6/2012|59.419998|59.450001|58.869999|       59| 8069400| 51.45922|
|1/9/2012|59.029999|59.549999|58.919998|    59.18| 6679300|51.616215|
+--------+---------+---------+---------+---------+--------+---------+
only showing top 5 rows



#### Use the describe method to learn about the data frame

In [11]:
#Statistical summary of the dataframe
sqlCtx.sql('SELECT * FROM saf_stock').describe().show(5)

+-------+---------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|     Date|             Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+---------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|     1258|             1258|             1258|             1258|             1258|             1258|             1258|
|   mean|     null|72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|     null| 6.76809024470826|6.768186808159218|6.744075756255496| 6.75685916373299|  4519780.8431556|6.722609449996858|
|    min|1/10/2012|        56.389999|        57.060001|        56.299999|        56.419998|         10010500|        50.363689|
|    max| 9/9/2016|        90.800003|        90.970001|            89.25|        90.470001|          999

Use the describe() method to calculate summary, statistics for the DataFrame and the show() method to display the results.

In [12]:
#similarly it can be done as below.
query = 'select * from saf_stock'
sqlCtx.sql(query).describe().show()

+-------+---------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|     Date|             Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+---------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|     1258|             1258|             1258|             1258|             1258|             1258|             1258|
|   mean|     null|72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|     null| 6.76809024470826|6.768186808159218|6.744075756255496| 6.75685916373299|  4519780.8431556|6.722609449996858|
|    min|1/10/2012|        56.389999|        57.060001|        56.299999|        56.419998|         10010500|        50.363689|
|    max| 9/9/2016|        90.800003|        90.970001|            89.25|        90.470001|          999

### 3. Data Preparation

- Format all the data to 2 decimal places i.e. format_number()

In [13]:
import pyspark.ml
from pyspark.sql.types import FloatType
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import format_number,col
cols = ['Open', 'High', 'Low', 'Close', 'Adj Close']
for col in cols:
  df_data_1 = df_data_1.withColumn(col, format_number(df_data_1[col].cast("float"), 2))
#conversion
df_data_1 = df_data_1.withColumn('Volume', df_data_1.Volume.cast(DecimalType(18, 2)))

In [14]:
df_data_1.show(10)

+---------+-----+-----+-----+-----+-----------+---------+
|     Date| Open| High|  Low|Close|     Volume|Adj Close|
+---------+-----+-----+-----+-----+-----------+---------+
| 1/3/2012|59.97|61.06|59.87|60.33|12668800.00|    52.62|
| 1/4/2012|60.21|60.35|59.47|59.71| 9593300.00|    52.08|
| 1/5/2012|59.35|59.62|58.37|59.42|12768200.00|    51.83|
| 1/6/2012|59.42|59.45|58.87|59.00| 8069400.00|    51.46|
| 1/9/2012|59.03|59.55|58.92|59.18| 6679300.00|    51.62|
|1/10/2012|59.43|59.71|58.98|59.04| 6907300.00|    51.49|
|1/11/2012|59.06|59.53|59.04|59.40| 6365600.00|    51.81|
|1/12/2012|59.79|60.00|59.40|59.50| 7236400.00|    51.90|
|1/13/2012|59.18|59.61|59.01|59.54| 7729300.00|    51.93|
|1/17/2012|59.87|60.11|59.52|59.85| 8500000.00|    52.20|
+---------+-----+-----+-----+-----+-----------+---------+
only showing top 10 rows



### Create a new data frame with a column called HV Ratio that is the ratio of the

- HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

In [15]:
#add a new column on our table
df_data_2 = df_data_1.withColumn("HV_Ratio", df_data_1.High/df_data_1.Volume)

In [16]:
df_data_2.show(8)

+---------+-----+-----+-----+-----+-----------+---------+--------------------+
|     Date| Open| High|  Low|Close|     Volume|Adj Close|            HV_Ratio|
+---------+-----+-----+-----+-----+-----------+---------+--------------------+
| 1/3/2012|59.97|61.06|59.87|60.33|12668800.00|    52.62|4.819714574387472E-6|
| 1/4/2012|60.21|60.35|59.47|59.71| 9593300.00|    52.08|6.290848821573389...|
| 1/5/2012|59.35|59.62|58.37|59.42|12768200.00|    51.83|4.669413073103491E-6|
| 1/6/2012|59.42|59.45|58.87|59.00| 8069400.00|    51.46|7.367338339901356E-6|
| 1/9/2012|59.03|59.55|58.92|59.18| 6679300.00|    51.62|8.915604928660188E-6|
|1/10/2012|59.43|59.71|58.98|59.04| 6907300.00|    51.49|8.644477581688938E-6|
|1/11/2012|59.06|59.53|59.04|59.40| 6365600.00|    51.81| 9.35182857861003E-6|
|1/12/2012|59.79|60.00|59.40|59.50| 7236400.00|    51.90| 8.29141562102703E-6|
+---------+-----+-----+-----+-----+-----------+---------+--------------------+
only showing top 8 rows



- Lets convert the all the data set into two decimal places to maintain consistency


In [17]:
df_data_2.show(8)

+---------+-----+-----+-----+-----+-----------+---------+--------------------+
|     Date| Open| High|  Low|Close|     Volume|Adj Close|            HV_Ratio|
+---------+-----+-----+-----+-----+-----------+---------+--------------------+
| 1/3/2012|59.97|61.06|59.87|60.33|12668800.00|    52.62|4.819714574387472E-6|
| 1/4/2012|60.21|60.35|59.47|59.71| 9593300.00|    52.08|6.290848821573389...|
| 1/5/2012|59.35|59.62|58.37|59.42|12768200.00|    51.83|4.669413073103491E-6|
| 1/6/2012|59.42|59.45|58.87|59.00| 8069400.00|    51.46|7.367338339901356E-6|
| 1/9/2012|59.03|59.55|58.92|59.18| 6679300.00|    51.62|8.915604928660188E-6|
|1/10/2012|59.43|59.71|58.98|59.04| 6907300.00|    51.49|8.644477581688938E-6|
|1/11/2012|59.06|59.53|59.04|59.40| 6365600.00|    51.81| 9.35182857861003E-6|
|1/12/2012|59.79|60.00|59.40|59.50| 7236400.00|    51.90| 8.29141562102703E-6|
+---------+-----+-----+-----+-----+-----------+---------+--------------------+
only showing top 8 rows



### 4. Data Analysis

- Use the registerTempTable() or createOrReplaceTempView method to register the DataFrame df as a table named saf_stock_analysis.

- Then we  run the SQLContext method tableNames to return the list of tables.

- We Assign the resulting list to tables, and use the print function to display it.

In [18]:

# To register the DataFrame df as a table named saf_stock_analysis, run df.registerTempTable('saf_stock_analysis).

from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df_data_2.registerTempTable('saf_stock_analysis')
tables =  sqlCtx.tableNames()#method name
print(tables)

['saf_stock', 'saf_stock_analysis']




In [19]:
#Write a SQL query that returns all column from the 
#saf_stock_analysis  and use the show() method to display the first 20 results
sqlCtx.sql('select * from saf_stock_analysis').show(20)

+---------+-----+-----+-----+-----+-----------+---------+--------------------+
|     Date| Open| High|  Low|Close|     Volume|Adj Close|            HV_Ratio|
+---------+-----+-----+-----+-----+-----------+---------+--------------------+
| 1/3/2012|59.97|61.06|59.87|60.33|12668800.00|    52.62|4.819714574387472E-6|
| 1/4/2012|60.21|60.35|59.47|59.71| 9593300.00|    52.08|6.290848821573389...|
| 1/5/2012|59.35|59.62|58.37|59.42|12768200.00|    51.83|4.669413073103491E-6|
| 1/6/2012|59.42|59.45|58.87|59.00| 8069400.00|    51.46|7.367338339901356E-6|
| 1/9/2012|59.03|59.55|58.92|59.18| 6679300.00|    51.62|8.915604928660188E-6|
|1/10/2012|59.43|59.71|58.98|59.04| 6907300.00|    51.49|8.644477581688938E-6|
|1/11/2012|59.06|59.53|59.04|59.40| 6365600.00|    51.81| 9.35182857861003E-6|
|1/12/2012|59.79|60.00|59.40|59.50| 7236400.00|    51.90| 8.29141562102703E-6|
|1/13/2012|59.18|59.61|59.01|59.54| 7729300.00|    51.93|7.712211972623653E-6|
|1/17/2012|59.87|60.11|59.52|59.85| 8500000.00|    5

### What day had the Peak High in Price?

In [20]:
#create an sql query
query_1 = 'select Date, max(High) from saf_stock_analysis group by 1 order by 2 desc'
#Print the results to the console
sqlCtx.sql(query_1).show(1)

+---------+---------+
|     Date|max(High)|
+---------+---------+
|1/13/2015|    90.97|
+---------+---------+
only showing top 1 row



- Therefore 1/13/2015| had the highest price with 90.9

#### What is the mean of the Close column?

In [21]:
#create an sql query
query_2 = 'select round(avg(Close),2) as Avg_Close from saf_stock_analysis group'
#Print the results to the console
sqlCtx.sql(query_2).show()

+---------+
|Avg_Close|
+---------+
|    72.39|
+---------+



#### What is the max and min of the Volume column?

In [22]:
#create an sql query

query_3 = 'select max(Volume) as Max_Volume, min(Volume) as Min_Volume from saf_stock_analysis group'
#Print the results to the console
sqlCtx.sql(query_3).show()

+-----------+----------+
| Max_Volume|Min_Volume|
+-----------+----------+
|80898100.00|2094900.00|
+-----------+----------+



### How many days was the Close lower than 60 dollars?

In [23]:
#create an sql query

query_4 = 'select count(Date) from saf_stock_analysis where Close < 60'
#Print the results to the console
sqlCtx.sql(query_4).show()

+-----------+
|count(Date)|
+-----------+
|         81|
+-----------+



#### What percentage of the time was the High greater than 80 dollars?

In [24]:
# create an sql query
query_5 = """select round(Over_80/Total * 100,2) as Over_80_Prop from 
          (Select count(Date) as Total, sum(Case when High > 80 then 1 else 0 end) as Over_80 
          From saf_stock_analysis)"""
#Print the results to the console
sqlCtx.sql(query_5).show()

+------------+
|Over_80_Prop|
+------------+
|        8.43|
+------------+



#### What is the Pearson correlation between High and Volume?

In [25]:
# Create an sql query
query_6 = 'select round(corr(High,Volume),2) as High_Vol_Corr from saf_stock_analysis'
#Print the results to the console

sqlCtx.sql(query_6).show()

+-------------+
|High_Vol_Corr|
+-------------+
|        -0.34|
+-------------+



### What is the max High per year?

In [26]:
#  Create an sql query
query_7 = """select substr(Date,1,4) as Year, max(High) as Year_High from saf_stock_analysis 
        group by 1 order by 1"""
#Print the results to the console
sqlCtx.sql(query_7).show()

+----+---------+
|Year|Year_High|
+----+---------+
|1/10|    78.72|
|1/11|    68.79|
|1/12|    90.31|
|1/13|    90.97|
|1/14|    88.52|
|1/15|    87.78|
|1/16|    87.46|
|1/17|    76.82|
|1/18|    69.20|
|1/19|    62.80|
|1/2/|    86.72|
|1/20|    87.70|
|1/21|    86.91|
|1/22|    88.40|
|1/23|    89.26|
|1/24|    75.12|
|1/25|    70.00|
|1/26|    89.16|
|1/27|    88.46|
|1/28|    88.23|
+----+---------+
only showing top 20 rows



#### What is the average Close for each Calendar Month?

In [27]:
# Create an sql query
query_8 = """select substr(Date,5,2) as Month, round(avg(Close),2) as Monthly_Avg_Close 
            from saf_stock_analysis group by Month order by Month"""
# Display the results
sqlCtx.sql(query_8).show()

+-----+-----------------+
|Month|Monthly_Avg_Close|
+-----+-----------------+
|   /2|            72.39|
|   0/|            71.65|
|   1/|            72.29|
|   2/|            72.69|
|   20|            72.56|
|   3/|            71.58|
|   4/|            72.45|
|   5/|             72.8|
|   6/|             72.3|
|   7/|             71.4|
|   8/|            72.09|
|   9/|             72.6|
+-----+-----------------+

