# Project - Apache Spark Dataframes

## Pre-requisites

In [96]:
# Installing pyspark
# ---
#
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [97]:
# Running a local spark session
# ---
#
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkFiles
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

## Data Importation and Exploration

### Register the DataFrame as a Table

In [98]:
# Dataset URL: https://raw.githubusercontent.com/wambasisamuel/DE_Week06_Wednesday/main/saf_stock.csv

from pyspark import SparkFiles

sqlCtx = SQLContext(sc)
spark.sparkContext.addFile("https://raw.githubusercontent.com/wambasisamuel/DE_Week06_Wednesday/main/saf_stock.csv")
df = spark.read.options(header=True, inferSchema='True', delimiter=',', dateFormat='yyyy-mm-dd').csv("file://" + SparkFiles.get("saf_stock.csv"))

df.createOrReplaceTempView('saf_stock')
tables = sqlCtx.tableNames()
print(tables)

#df.show(10)



['saf_stock']


### Determine column names

In [99]:
print(df.columns)

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


### Observations about the schema

In [100]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### The first five rows

In [101]:
df.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



### Describing the dataframe

In [102]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

## Data Preparation

In [103]:
# Format all the data to 2 decimal places i.e. format_number()
from pyspark.sql.functions import format_number

df = df.select("Date",
               format_number("Open", 2).alias("Open"),
                format_number("High", 2).alias("High"),
                format_number("Low", 2).alias("Low"),
                format_number("Close", 2).alias("Close"),
                format_number("Volume", 2).alias("Volume"),
                format_number("Adj Close", 2).alias("Adj Close")
                )
df.show(3)

+-------------------+-----+-----+-----+-----+-------------+---------+
|               Date| Open| High|  Low|Close|       Volume|Adj Close|
+-------------------+-----+-----+-----+-----+-------------+---------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12,668,800.00|    52.62|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9,593,300.00|    52.08|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12,768,200.00|    51.83|
+-------------------+-----+-----+-----+-----+-------------+---------+
only showing top 3 rows



In [104]:
# A new data frame with a column called HV Ratio
df2 = df.withColumn("HV Ratio",df['High']/df['Volume'])
print(df2.select('HV Ratio').show())

+--------+
|HV Ratio|
+--------+
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
+--------+
only showing top 20 rows

None


## Data Analysis

In [105]:
# What day had the Peak High in Price?
peak_high_day = df.orderBy(df['High'].desc()).head(1)
print(peak_high_day[0][0])

2015-01-13 00:00:00


In [106]:
# What is the mean of the Close column?
from pyspark.sql.functions import mean
close_mean = df.select(mean('Close')).show()
#print(close_mean)

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844992050863|
+-----------------+



In [107]:
# What is the max and min of the Volume column?
from pyspark.sql.functions import max, min
df.select(max("Volume"), min("Volume")).show()

+------------+-------------+
| max(Volume)|  min(Volume)|
+------------+-------------+
|9,994,400.00|10,010,500.00|
+------------+-------------+



In [108]:
# How many days was the Close lower than 60 dollars?
close_lt_60_days = df.filter(df['Close'] < 60).count()
print(close_lt_60_days)

81


In [109]:
# What percentage of the time was the High greater than 80 dollars?
perc = (df.filter(df['High']>80).count()/df.count()) * 100
print(f"{perc}%")

8.426073131955485%


In [110]:
# What is the Pearson correlation between High and Volume?
from pyspark.sql.functions import corr

df.select(corr('High','Volume')).show()

+------------------+
|corr(High, Volume)|
+------------------+
|              null|
+------------------+



In [111]:
# What is the max High per year?
from pyspark.sql.functions import year
yeardf = df.withColumn("Year",year(df['Date']))
max_df = yeardf.groupBy('Year').max()
#print(max_df.select('Year',"max(Year)").show())

df.withColumn("Year", year(df["Date"])).groupBy('Year').agg(max("High").alias("High")).show()

+----+-----+
|Year| High|
+----+-----+
|2012|77.60|
|2013|81.37|
|2014|88.09|
|2015|90.97|
|2016|75.19|
+----+-----+



In [112]:
# What is the average Close for each Calendar Month
from pyspark.sql.functions import month, avg

df.withColumn("Month", month(df["Date"])).groupBy('Month').agg(avg("Close").alias("Close")).show()

+-----+-----------------+
|Month|            Close|
+-----+-----------------+
|   12|72.84792452830189|
|    1|71.44801980198022|
|    6|72.49537735849057|
|    3|71.77794392523363|
|    5|72.30971698113206|
|    9|72.18411764705883|
|    4|72.97361904761907|
|    8|73.02981818181819|
|    7|74.43971962616824|
|   10|71.57854545454546|
|   11|72.11108910891085|
|    2|71.30680412371134|
+-----+-----------------+

