In [2]:
import findspark
findspark.init()

In [3]:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F

In [4]:
#1 Start a simple Spark Session
spark = SparkSession.builder\
                .master("local[*]")\
                .appName("Walmart_Stock")\
                .getOrCreate()

In [5]:
#2 Load the Walmart Stock CSV File
df = spark.read\
        .option("header",True)\
        .csv("walmart_stock.csv")
        
  

In [6]:
df.show(3)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 3 rows



In [7]:
#3 column names
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [8]:
# 4 schema look?
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [9]:
#5 New column HV_Ratio =Ratio High price/ Volume of stock per day
df2= df.withColumn("HV_Ratio", col("High")/col("Volume"))
df2.show(3)

+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|            HV_Ratio|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
only showing top 3 rows



In [10]:
#6 What Day had the peak high price?
df.orderBy(col("High").desc())\
    .select(col("Date"))\
    .head(3)



In [11]:
#6 What Day had the peak high price? SQL
df.createOrReplaceTempView("stock")
spark.sql(""" 
select Date 
from stock
order By High desc """).show(1)

+----------+
|      Date|
+----------+
|2015-01-13|
+----------+
only showing top 1 row



In [12]:
#7 what is the mean of the Close column?
df.createOrReplaceTempView("stock")
spark.sql("""
select avg(Close) as mean_Close
from stock 
""").show()

+-----------------+
|       mean_Close|
+-----------------+
|72.38844998012726|
+-----------------+



In [92]:
#7 what is the mean of the Close column?
df.select(col("Close")).agg(avg("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [14]:
#8 What is the max and min of the volumn column?
df.createOrReplaceTempView("stock")
spark.sql("""
select min(Volume), max(Volume)
from stock 
""").show()


+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|   10010500|    9994400|
+-----------+-----------+



In [93]:
#8 What is the max and min of the volumn column? (spark)
df.select(col("Volume")).agg(min("Volume"), max("Volume")).show()

+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|   10010500|    9994400|
+-----------+-----------+



In [15]:
#9 How many days was the CLose lower than 60 dollars?
df.createOrReplaceTempView("stock")
spark.sql("""
select count(Close)  as jour_close_inf_60
from stock
where Close < 60

""").show()

+-----------------+
|jour_close_inf_60|
+-----------------+
|               81|
+-----------------+



In [112]:
## 9) How many days was the Close lower than 60 dollars? (spark)
df.filter(df['Close'] < 60).count()


81

In [48]:
df.count()

1258

In [107]:
#10 percentage of the time was the High greater than 80 dollars 
spark.sql(""" 
select count(Date)*100/(select count(*) from stock) as Pourcentage
from stock
where High > 80""").show()

+-----------------+
|      Pourcentage|
+-----------------+
|8.426073131955485|
+-----------------+



In [114]:
#10 percentage of the time was the High greater than 80 dollars (spark)
df.filter('High > 80').count() * 100/df.count()

8.426073131955485

In [85]:
# 11 Max high per year
df.createOrReplaceTempView("stock")
spark.sql("""
SELECT substr(Date,1,4) as year, max(High )
FROM stock
group by year
order by year

""").show()

+----+---------+
|year|max(High)|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



In [125]:
# 11 Max high per year (Spark)
df.groupby(year("date")).agg(max("high")).orderBy(year("date")).show()

+----------+---------+
|year(date)|max(high)|
+----------+---------+
|      2012|77.599998|
|      2013|81.370003|
|      2014|88.089996|
|      2015|90.970001|
|      2016|75.190002|
+----------+---------+



In [87]:
# 12  average Close for each calendar Month
df.createOrReplaceTempView("stock")
spark.sql("""
SELECT substr(Date,6,2) as month, avg(Close ) as avg_close_per_month
FROM stock
group by month
order by month

""").show()

+-----+-------------------+
|month|avg_close_per_month|
+-----+-------------------+
|   01|  71.44801958415842|
|   02|    71.306804443299|
|   03|  71.77794377570092|
|   04|  72.97361900952382|
|   05|  72.30971688679247|
|   06|   72.4953774245283|
|   07|  74.43971943925233|
|   08|  73.02981855454546|
|   09|  72.18411785294116|
|   10|  71.57854545454543|
|   11|   72.1110893069307|
|   12|  72.84792478301885|
+-----+-------------------+



In [127]:
# 12  average Close for each calendar Month (Spark)
df.groupby(month("date")).agg(avg("Close")).orderBy(month("date")).show()

+-----------+-----------------+
|month(date)|       avg(Close)|
+-----------+-----------------+
|          1|71.44801958415842|
|          2|  71.306804443299|
|          3|71.77794377570092|
|          4|72.97361900952382|
|          5|72.30971688679247|
|          6| 72.4953774245283|
|          7|74.43971943925233|
|          8|73.02981855454546|
|          9|72.18411785294116|
|         10|71.57854545454543|
|         11| 72.1110893069307|
|         12|72.84792478301885|
+-----------+-----------------+

