# Initialize a SparkSession

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('walmart').getOrCreate()

22/11/10 21:42:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Load the Walmart Data

In [2]:
df = spark.read.csv('./WMT.xls', inferSchema=True, header=True)

# Display the Column Names

In [3]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

# Display the first row of the dataframe

In [4]:
df.head()

Row(Date=datetime.datetime(2011, 11, 16, 0, 0), Open=57.099998, High=57.419998, Low=56.639999, Close=56.68, Adj Close=44.899456, Volume=11780800)

# Check the Schema of the Dataframe

In [5]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



# Print the first 5 rows

In [6]:
for line in df.head(5):
    print(line, '\n')

Row(Date=datetime.datetime(2011, 11, 16, 0, 0), Open=57.099998, High=57.419998, Low=56.639999, Close=56.68, Adj Close=44.899456, Volume=11780800) 

Row(Date=datetime.datetime(2011, 11, 17, 0, 0), Open=56.540001, High=57.189999, Low=56.259998, Close=56.73, Adj Close=44.939064, Volume=10223800) 

Row(Date=datetime.datetime(2011, 11, 18, 0, 0), Open=57.029999, High=57.360001, Low=56.610001, Close=57.23, Adj Close=45.335129, Volume=8982300) 

Row(Date=datetime.datetime(2011, 11, 21, 0, 0), Open=56.93, High=57.290001, Low=56.380001, Close=56.66, Adj Close=44.883606, Volume=9932200) 

Row(Date=datetime.datetime(2011, 11, 22, 0, 0), Open=56.560001, High=57.130001, Low=56.5, Close=56.849998, Adj Close=45.034107, Volume=7497300) 



# Print general Stats

In [7]:
df.describe().show()

+-------+------------------+------------------+------------------+-----------------+------------------+----------------+
|summary|              Open|              High|               Low|            Close|         Adj Close|          Volume|
+-------+------------------+------------------+------------------+-----------------+------------------+----------------+
|  count|              2516|              2516|              2516|             2516|              2516|            2516|
|   mean| 90.25209059817178| 90.94377592090628| 89.61924488434002|90.28167732551657| 82.77843126589822|8267158.86327504|
| stddev|25.818932768463412|26.070702860968247|25.565153063174236|25.79883827640457|29.207787105719017|4587081.88545677|
|    min|         56.389999|              57.0|         56.259998|        56.419998|         44.867764|         2094900|
|    max|        153.600006|        153.660004|        151.660004|       152.789993|        151.449997|        80898100|
+-------+------------------+----

# Format Columns to show just 2 Decimal Places

In [8]:
from pyspark.sql.functions import format_number

summary = df.describe()
summary.select(summary['summary'],
               format_number(summary['Open'].cast('float'),2).alias('Open'),
               format_number(summary['High'].cast('float'),2).alias('High'),
               format_number(summary['Low'].cast('float'),2).alias('Low'),
               format_number(summary['Close'].cast('float'),2).alias('Close'),
               format_number(summary['Volume'].cast('int'),0).alias('Volume')
              ).show()

+-------+--------+--------+--------+--------+----------+
|summary|    Open|    High|     Low|   Close|    Volume|
+-------+--------+--------+--------+--------+----------+
|  count|2,516.00|2,516.00|2,516.00|2,516.00|     2,516|
|   mean|   90.25|   90.94|   89.62|   90.28| 8,267,158|
| stddev|   25.82|   26.07|   25.57|   25.80| 4,587,081|
|    min|   56.39|   57.00|   56.26|   56.42| 2,094,900|
|    max|  153.60|  153.66|  151.66|  152.79|80,898,100|
+-------+--------+--------+--------+--------+----------+



# Create a new Dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day

In [9]:
df_hv = df.withColumn('HV Ratio', df['High']/df['Volume']).select(['HV Ratio'])
df_hv.show()

+--------------------+
|            HV Ratio|
+--------------------+
| 4.87403215401331E-6|
|5.593810422739099E-6|
|6.385892366097769E-6|
|5.768107871367874E-6|
|7.620076694276606E-6|
|6.900392232821655E-6|
|1.346153893115431...|
|6.244549477288143E-6|
|5.389564674777206...|
|3.974348610998842E-6|
|6.731305634267636E-6|
|5.265601789429508E-6|
|5.564325737979306E-6|
|5.175589215548695E-6|
|3.757632754877940...|
|5.656840767900448E-6|
|5.814751314756086E-6|
|5.822398800450329...|
|5.318033740122675E-6|
|6.469134591839006E-6|
+--------------------+
only showing top 20 rows



# Which day has the peak high in price ?

In [10]:
df.orderBy(df['High'].desc()).select(['Date']).head(1)[0]['Date']

datetime.datetime(2020, 12, 1, 0, 0)

# What is the mean of the "Close" column

In [11]:
from pyspark.sql.functions import mean
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|90.28167732551657|
+-----------------+



# What is the maximum and minimum value of the "Volumn" Column

In [12]:
from pyspark.sql.functions import min, max
df.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



# How many days did the stocks close lower than 60 dollars?

In [13]:
df.filter(df['Close'] < 60).count()

112

# What percentage of the time was the "High" greater than 80 dollars?

In [14]:
df.filter(df['High'] > 80).count() * 100/df.count()

46.939586645469

# What is the Pearson correlation between "High" and "Volume"

In [15]:
df.corr('High','Volume')

-0.07509967259246798

In [16]:
from pyspark.sql.functions import corr
df.select(corr(df['High'], df['Volume'])).show()

+--------------------+
|  corr(High, Volume)|
+--------------------+
|-0.07509967259246798|
+--------------------+



# What is the max "High" per year?

In [17]:
from pyspark.sql.functions import (dayofmonth, hour, 
                                   dayofyear, month,
                                   year, weekofyear,
                                   format_number, date_format)

year_df = df.withColumn('Year', year(df['Date']))

year_df.groupBy('Year').max()['Year', 'max(High)'].show()

+----+----------+
|Year| max(High)|
+----+----------+
|2018|109.980003|
|2015| 90.970001|
|2013| 81.370003|
|2014| 88.089996|
|2019|125.379997|
|2020|153.660004|
|2012| 77.599998|
|2016| 75.190002|
|2011|      60.0|
|2017|100.129997|
|2021|152.570007|
+----+----------+



# What is the average "Close" for each calender month?

In [18]:
#Create a new column Month from existing Date Column
month_df = df.withColumn('Month', month(df['Date']))

#Group by month and take average of all other columns
month_df = month_df.groupBy('Month').mean()

#Sort by month
month_df = month_df.orderBy('Month')

#Display only month and avg(Close), the desired columns
month_df['Month', 'avg(Close)'].show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|88.47398988177339|
|    2|87.60531266666669|
|    3|86.47244217050691|
|    4|89.30846177403849|
|    5|88.39650949999998|
|    6|89.40037571361498|
|    7|92.05450699530518|
|    8|92.63425376018098|
|    9|93.86182297536949|
|   10| 94.2081447013575|
|   11|92.62975491176469|
|   12|88.01454553110052|
+-----+-----------------+

