# Apache Spark DataFrames Project Simeon_Omeda

> 
As a Data professional, you need to perform an analysis by answering questions about
some stock market data on Safaricom from the years 2012-2017



In [96]:
# Installing pyspark
# ---

!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [97]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

**Data Importation and Exploration**

*● Start a spark session and load the stock file while inferring the data types.*

*● Determine the column names*

*● Make observations about the schema.*

*● Show the first 5 rows*

*● Use the describe method to learn about the data frame*



In [98]:
# Start a spark session and load the stock file while inferring the data types
# Dataset URL (CSV File): https://bit.ly/3pmchka
saf_stock_df = spark.read.csv("saf_stock.csv", header=True, inferSchema=True)

In [99]:
# Determine the column names
print("Column Names:")
print(saf_stock_df.columns)

Column Names:
['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


In [100]:
# Make observations about the schema.
saf_stock_df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [101]:
# we have the folowing datatypes:timestamp, double, integer

In [102]:
# Use the describe method to learn about the data frame
saf_stock_df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [103]:
# Show the first 5 rows
saf_stock_df.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



**Data Preparation**

*● Format all the data to 2 decimal places i.e. format_number()*

*● Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day*


In [104]:
from pyspark.sql.functions import round, col
import pyspark.sql.functions as F
for c_name, c_type in saf_stock_df.dtypes:
    if c_type in ('double', 'float'):
        saf_stock_df = saf_stock_df.withColumn(c_name, F.round(c_name, 2))
saf_stock_df.show(5)

+-------------------+-----+-----+-----+-----+--------+---------+
|               Date| Open| High|  Low|Close|  Volume|Adj Close|
+-------------------+-----+-----+-----+-----+--------+---------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12668800|    52.62|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9593300|    52.08|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12768200|    51.83|
|2012-01-06 00:00:00|59.42|59.45|58.87| 59.0| 8069400|    51.46|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6679300|    51.62|
+-------------------+-----+-----+-----+-----+--------+---------+
only showing top 5 rows



In [105]:
from pyspark.sql.functions import regexp_replace, col
stock_df = saf_stock_df.withColumn("Volume", regexp_replace(col("Volume"), ",", ""))
stock_df = saf_stock_df.withColumn("HV Ratio", saf_stock_df.High/ saf_stock_df.Volume)
stock_df.show(5)

+-------------------+-----+-----+-----+-----+--------+---------+--------------------+
|               Date| Open| High|  Low|Close|  Volume|Adj Close|            HV Ratio|
+-------------------+-----+-----+-----+-----+--------+---------+--------------------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12668800|    52.62|4.819714574387472E-6|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9593300|    52.08|6.290848821573389...|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12768200|    51.83|4.669413073103491E-6|
|2012-01-06 00:00:00|59.42|59.45|58.87| 59.0| 8069400|    51.46|7.367338339901356E-6|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6679300|    51.62|8.915604928660188E-6|
+-------------------+-----+-----+-----+-----+--------+---------+--------------------+
only showing top 5 rows



**Data Analysis**

*● What day had the Peak High in Price?*

*● What is the mean of the Close column?*

*● What is the max and min of the Volume column?*

*● How many days was the Close lower than 60 dollars?*

*● What percentage of the time was the High greater than 80 dollars?*

*● What is the Pearson correlation between High and Volume?*

*● What is the max High per year?*

*● What is the average Close for each Calendar Month?*


In [106]:
from pyspark.sql import SQLContext

sqlCtx = SQLContext(spark)
saf_stock_df.createOrReplaceTempView("stock_data")

print("Tables in SQLContext:", sqlCtx.tableNames())

Tables in SQLContext: ['stock_data', 'stock_df']


In [107]:
stock_df.createOrReplaceTempView('stock_df')
tables = sqlCtx.tableNames()


In [108]:
# What day had the Peak High in Price?
stock_df[['date','High']].sort(stock_df.High.desc()).show(1)

+-------------------+-----+
|               date| High|
+-------------------+-----+
|2015-01-13 00:00:00|90.97|
+-------------------+-----+
only showing top 1 row



In [109]:
# What is the mean of the Close column?
stock_df[['close']].describe().show()

+-------+-----------------+
|summary|            close|
+-------+-----------------+
|  count|             1258|
|   mean|72.38844992050863|
| stddev|6.756859155425468|
|    min|            56.42|
|    max|            90.47|
+-------+-----------------+



In [110]:
# What is the max and min of the Volume column?
stock_df[['Volume']].describe().show()

+-------+-----------------+
|summary|           Volume|
+-------+-----------------+
|  count|             1258|
|   mean|8222093.481717011|
| stddev|  4519780.8431556|
|    min|          2094900|
|    max|         80898100|
+-------+-----------------+



In [111]:
# How many days was the Close lower than 60 dollars?
less_60= stock_df[stock_df['close'] < 60]
less_60.count()

81

In [112]:
# What percentage of the time was the High greater than 80 dollars?

query ='SELECT (SELECT COUNT(High) FROM stock_df where High > 80)*100/(select COUNT(High) from stock_df) as Percentage  From stock_df group by Percentage'
sqlCtx.sql(query).show()

+-----------------+
|       Percentage|
+-----------------+
|9.141494435612083|
+-----------------+



In [113]:
# What is the Pearson correlation between High and Volume?
from pyspark.sql.functions import corr
pearson_corr = stock_df.select(corr("High", "Volume").alias("correlation")).collect()[0][0]
pearson_corr

-0.33843260582148915

In [114]:
# What is the max High per year?
max_high = sqlCtx.sql("SELECT year(Date) as year, max(High) as MAXX FROM stock_df GROUP BY year(Date) ORDER BY MAXX DESC")
#max_high
max_high.show()

+----+-----+
|year| MAXX|
+----+-----+
|2015|90.97|
|2014|88.09|
|2013|81.37|
|2012| 77.6|
|2016|75.19|
+----+-----+



In [115]:
# What is the average Close for each Calendar Month?
query = 'select MONTH(Date), AVG(Close) as month_average from stock_df group by MONTH(Date) order by MONTH(Date) asc'
sqlCtx.sql(query).show()

+-----------+-----------------+
|month(Date)|    month_average|
+-----------+-----------------+
|          1|71.44801980198022|
|          2|71.30680412371134|
|          3|71.77794392523363|
|          4|72.97361904761907|
|          5|72.30971698113206|
|          6|72.49537735849057|
|          7|74.43971962616824|
|          8|73.02981818181819|
|          9|72.18411764705883|
|         10|71.57854545454546|
|         11|72.11108910891085|
|         12|72.84792452830189|
+-----------+-----------------+

