In [37]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import format_number, mean, min, max, corr
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format)

spark = SparkSession.builder.appName('walmart').getOrCreate()

## Load the Walmart data

In [8]:
df = spark.read.csv('/home/jovyan/work/data/SP_500_Historical.csv', inferSchema=True, header=True)

In [9]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

In [46]:
df.head()

Row(Date='1927-12-30', Open=17.66, High=17.66, Low=17.66, Close=17.66, Adj Close=17.66, Volume=0)

In [47]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)



## Print the first 5 rows

In [10]:
for line in df.head(5):
    print(line, '\n')

Row(Date='1927-12-30', Open=17.66, High=17.66, Low=17.66, Close=17.66, Adj Close=17.66, Volume=0) 

Row(Date='1928-01-03', Open=17.76, High=17.76, Low=17.76, Close=17.76, Adj Close=17.76, Volume=0) 

Row(Date='1928-01-04', Open=17.719999, High=17.719999, Low=17.719999, Close=17.719999, Adj Close=17.719999, Volume=0) 

Row(Date='1928-01-05', Open=17.549999, High=17.549999, Low=17.549999, Close=17.549999, Adj Close=17.549999, Volume=0) 

Row(Date='1928-01-06', Open=17.66, High=17.66, Low=17.66, Close=17.66, Adj Close=17.66, Volume=0) 



In [11]:
df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-------------------+
|summary|      Date|              Open|             High|              Low|            Close|        Adj Close|             Volume|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-------------------+
|  count|     23394|             23394|            23394|            23394|            23394|            23394|              23394|
|   mean|      null|476.11160499572713| 499.573078690306|493.5473208786023|496.7595207246288|496.7595207246288|7.818766662392067E8|
| stddev|      null| 758.6405523320375|750.7131015121531|742.3163058669743|746.8156180683476|746.8156180683476|1.501735206029603E9|
|    min|1927-12-30|               0.0|              4.4|              4.4|              4.4|              4.4|                  0|
|    max|2021-02-18|       3939.610107|      3950.429932|      3923.850098| 

## Format columns to show just 2 decimal places

In [15]:
summary = df.describe()
summary.select(summary['summary'],
              format_number(summary['Open'].cast('float'), 2).alias('Open'),
              format_number(summary['High'].cast('float'), 2).alias('High'),
              format_number(summary['Low'].cast('float'), 2).alias('Low'),
              format_number(summary['Close'].cast('float'), 2).alias('Close'),
              format_number(summary['Volume'].cast('float'), 2).alias('Volume'),
              ).show()

+-------+---------+---------+---------+---------+-----------------+
|summary|     Open|     High|      Low|    Close|           Volume|
+-------+---------+---------+---------+---------+-----------------+
|  count|23,394.00|23,394.00|23,394.00|23,394.00|        23,394.00|
|   mean|   476.11|   499.57|   493.55|   496.76|   781,876,672.00|
| stddev|   758.64|   750.71|   742.32|   746.82| 1,501,735,168.00|
|    min|     0.00|     4.40|     4.40|     4.40|             0.00|
|    max| 3,939.61| 3,950.43| 3,923.85| 3,934.83|11,456,230,400.00|
+-------+---------+---------+---------+---------+-----------------+



## Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day

In [25]:
df_hv = df.withColumn('HV Ratio', df['High']/df['Volume']).select(['HV Ratio'])
df_hv.show()

+--------+
|HV Ratio|
+--------+
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
|    null|
+--------+
only showing top 20 rows



## Which day has the peak high in price?

In [26]:
df.orderBy(df['High'].desc()).select(['Date']).head(1)[0]['Date']

'2021-02-16'

## What is the mean of the "Close" column

In [28]:
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|496.7595207246288|
+-----------------+



## What is the maximum and minimum value of the "Volumn" column

In [31]:
df.select(max('Volume'), min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|11456230000|          0|
+-----------+-----------+



## How many days did the stocks close lower than 60 dollars?

In [32]:
df.filter(df['Close'] < 60).count()

8381

## What percentage of the time was the "High" greater than 80 dollars?

In [33]:
df.filter('High > 80').count() * 100/df.count()

60.13935197059075

## What is the Pearson correlation between "High" and "Volume"

In [34]:
df.corr('High', 'Volume')

0.8243719072537106

In [36]:
df.select(corr(df['High'], df['Volume'])).show()

+------------------+
|corr(High, Volume)|
+------------------+
|0.8243719072537106|
+------------------+



## What is the max "High" per year?

In [41]:
year_df = df.withColumn('Year', year(df['Date']))

year_df.groupBy('Year').max()['Year','max(High)'].show()

+----+-----------+
|Year|  max(High)|
+----+-----------+
|1959|  60.709999|
|1990| 369.779999|
|1975|  96.580002|
|1977| 107.970001|
|2003|1112.560059|
|2007|1576.089966|
|2018|2940.909912|
|1974| 101.050003|
|2015|2134.719971|
|1927|      17.66|
|1955|      46.41|
|2006|1431.810059|
|1978| 108.050003|
|1961|  72.639999|
|2013|1849.439941|
|1942|       9.77|
|1939|      13.23|
|1944|      13.28|
|1952|      26.59|
|1934|      11.82|
+----+-----------+
only showing top 20 rows



## What is the average 'Close' for each calender month?

In [43]:
# New column Month
month_df = df.withColumn('Month', month(df['Date']))

# Group by month and take average of all other columns
month_df = month_df.groupBy('Month').mean()

# Sort by month
month_df = month_df.orderBy('Month')

#Display only month and avg(Close), the desired columns
month_df['Month', 'avg(Close)'].show()

+-----+------------------+
|Month|        avg(Close)|
+-----+------------------+
|    1|  496.468688647595|
|    2|508.39007361958375|
|    3| 477.3591337443125|
|    4|482.42328686337675|
|    5| 487.1987677683734|
|    6| 491.1081333945652|
|    7|497.47155179134825|
|    8|497.00198786410095|
|    9|494.81639784761904|
|   10| 501.5918244367927|
|   11| 520.5698029569008|
|   12|509.57525815349265|
+-----+------------------+

