# Apache Spark DataFrames Project


## Data Importation and Exploration

### Prerequisites

In [31]:
# Installing pyspark
# ---

!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [32]:
#Then run a local spark session
# ---

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

### Open the CSV file

In [33]:
f = open('saf_stock.csv')

for i in range(0,4):
    print(f.readline())

Date,Open,High,Low,Close,Volume,Adj Close

2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619234999999996

2012-01-04,60.209998999999996,60.349998,59.470001,59.709998999999996,9593300,52.078475

2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539



### Make observations about the schema.


In [34]:
from pyspark.sql import SQLContext

# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)

# Read CSV data into a DataFrame object `df`

df = sqlCtx.read.csv("saf_stock.csv", header=True, inferSchema=True)

# Print the type
df.printSchema()




root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### Determine the column names


In [35]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

### Show first 5 rows

In [36]:
df.limit(5).show()

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+



### Use the describe method to learn about the data frame

In [37]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

## Data Preparation

###  Format all the data to 2 decimal places i.e. format_number()


In [38]:
from pyspark.sql.functions import *

columns = ['Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

for column in columns:
  df = df.withColumn(column, format_number(column,2))

df.show(10)


+-------------------+-----+-----+-----+-----+-------------+---------+
|               Date| Open| High|  Low|Close|       Volume|Adj Close|
+-------------------+-----+-----+-----+-----+-------------+---------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12,668,800.00|    52.62|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9,593,300.00|    52.08|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12,768,200.00|    51.83|
|2012-01-06 00:00:00|59.42|59.45|58.87|59.00| 8,069,400.00|    51.46|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6,679,300.00|    51.62|
|2012-01-10 00:00:00|59.43|59.71|58.98|59.04| 6,907,300.00|    51.49|
|2012-01-11 00:00:00|59.06|59.53|59.04|59.40| 6,365,600.00|    51.81|
|2012-01-12 00:00:00|59.79|60.00|59.40|59.50| 7,236,400.00|    51.90|
|2012-01-13 00:00:00|59.18|59.61|59.01|59.54| 7,729,300.00|    51.93|
|2012-01-17 00:00:00|59.87|60.11|59.52|59.85| 8,500,000.00|    52.20|
+-------------------+-----+-----+-----+-----+-------------+---------+
only showing top 10 

### Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day

In [39]:
df_new = df.withColumn("HV Ratio", df.High/df.Volume).show()

+-------------------+-----+-----+-----+-----+-------------+---------+--------+
|               Date| Open| High|  Low|Close|       Volume|Adj Close|HV Ratio|
+-------------------+-----+-----+-----+-----+-------------+---------+--------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12,668,800.00|    52.62|    null|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9,593,300.00|    52.08|    null|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12,768,200.00|    51.83|    null|
|2012-01-06 00:00:00|59.42|59.45|58.87|59.00| 8,069,400.00|    51.46|    null|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6,679,300.00|    51.62|    null|
|2012-01-10 00:00:00|59.43|59.71|58.98|59.04| 6,907,300.00|    51.49|    null|
|2012-01-11 00:00:00|59.06|59.53|59.04|59.40| 6,365,600.00|    51.81|    null|
|2012-01-12 00:00:00|59.79|60.00|59.40|59.50| 7,236,400.00|    51.90|    null|
|2012-01-13 00:00:00|59.18|59.61|59.01|59.54| 7,729,300.00|    51.93|    null|
|2012-01-17 00:00:00|59.87|60.11|59.52|59.85| 8,500,

## Data Analysis


### What day had the Peak High in Price?

In [40]:
from pyspark.sql.functions import desc

peak_high_price = df.orderBy(desc("High")).select("Date").first()[0] 
bold = '\033[1m' 
print(bold + "The day with the highest peak price is:", peak_high_price)
    

[1mThe day with the highest peak price is: 2015-01-13 00:00:00


###  What is the mean of the Close column?


In [41]:
from pyspark.sql.functions import format_number, year, month, dayofmonth, max, mean, corr, min 

close_mean = df.select(mean("Close")).first()[0] 
print(bold +"The mean of the close column is:", close_mean)      



[1mThe mean of the close column is: 72.38844992050863


### What is the max and min of the Volume column?

In [42]:
from pyspark.sql.functions import format_number, year, month, dayofmonth, max, mean, corr, min 

max_vol = df.select(max("Volume")).first()[0] 
min_vol = df.select(min("Volume")).first()[0] 
print(bold +"The maximum volume is:", max_vol) 
print("The minimum volume is:", min_vol)

[1mThe maximum volume is: 9,994,400.00
The minimum volume is: 10,010,500.00


###  How many days was the Close lower than 60 dollars?

In [43]:
days_less_60 = df.filter("Close < 60").count() 
print(bold +"The number of days when the Close was lower than 60 dollars is:", days_less_60, "days" )

[1mThe number of days when the Close was lower than 60 dollars is: 81 days


###  What percentage of the time was the High greater than 80 dollars?

In [44]:
from pyspark.sql.functions import count 
 
high_more_than_80 = df.filter("High > 80") 
percent = (high_more_than_80.count() / df.count()) * 100 
print(bold + "The percentage of the time when the High was greater than 80 dollars is: {:.2f}%".format(percent))      

[1mThe percentage of the time when the High was greater than 80 dollars is: 8.43%


###  What is the max High per year?


In [45]:
from pyspark.sql.functions import format_number, year, month, dayofmonth, max, mean, corr, min 

max_high_year = df.groupBy(year("Date")).agg(max("High")) 
max_high_year.show()

+----------+---------+
|year(Date)|max(High)|
+----------+---------+
|      2012|    77.60|
|      2013|    81.37|
|      2014|    88.09|
|      2015|    90.97|
|      2016|    75.19|
+----------+---------+



###  What is the average Close for each Calendar Month?

In [46]:
from pyspark.sql.functions import format_number, year, month, dayofmonth, max, mean, corr, min, avg, desc 

avg_close_per_month = df.groupBy(month("Date")).agg(avg("Close")) 
avg_close_per_month.show()

+-----------+-----------------+
|month(Date)|       avg(Close)|
+-----------+-----------------+
|         12|72.84792452830189|
|          1|71.44801980198022|
|          6|72.49537735849057|
|          3|71.77794392523363|
|          5|72.30971698113206|
|          9|72.18411764705883|
|          4|72.97361904761907|
|          8|73.02981818181819|
|          7|74.43971962616824|
|         10|71.57854545454546|
|         11|72.11108910891085|
|          2|71.30680412371134|
+-----------+-----------------+

