In [1]:
import findspark
findspark.init()
import pyspark

from pyspark.sql import SparkSession
# from pyspark.sql.functions import round, col
from pyspark.sql.functions import *

In [2]:
spark=SparkSession.builder.appName('Big data Walmart Project').getOrCreate()

In [3]:
sc = spark.sparkContext

In [4]:
sc

# using PySpark -------------> RDD ( Resilient Distributed Dataset)

In [5]:
# 1. Creating RDD for walmart dataset

walmart = sc.textFile('walmart_stock.csv')
walmart.take(5)

['Date,Open,High,Low,Close,Volume,Adj Close',
 '03-01-2012,59.970001,61.060001,59.869999,60.330002,12668800,52.619235',
 '04-01-2012,60.209999,60.349998,59.470001,59.709999,9593300,52.078475',
 '05-01-2012,59.349998,59.619999,58.369999,59.419998,12768200,51.825539',
 '06-01-2012,59.419998,59.450001,58.869999,59,8069400,51.45922']

In [6]:
# 2. Removing 1st row of headers from RDD

first_row_index = 0
walmartRDD = walmart.zipWithIndex().filter(lambda x: x[1] > first_row_index).map(lambda x: x[0])

In [7]:
# 2.1 What zipWWithIndex() did to RDD

first_row_index = 0
w1 = walmart.zipWithIndex()
w1.take(5)

[('Date,Open,High,Low,Close,Volume,Adj Close', 0),
 ('03-01-2012,59.970001,61.060001,59.869999,60.330002,12668800,52.619235', 1),
 ('04-01-2012,60.209999,60.349998,59.470001,59.709999,9593300,52.078475', 2),
 ('05-01-2012,59.349998,59.619999,58.369999,59.419998,12768200,51.825539', 3),
 ('06-01-2012,59.419998,59.450001,58.869999,59,8069400,51.45922', 4)]

In [8]:
# 2.2 What filter() did with indexed RDD

w2 = walmart.zipWithIndex().filter(lambda x: x[1] > first_row_index)
w2.take(5)

[('03-01-2012,59.970001,61.060001,59.869999,60.330002,12668800,52.619235', 1),
 ('04-01-2012,60.209999,60.349998,59.470001,59.709999,9593300,52.078475', 2),
 ('05-01-2012,59.349998,59.619999,58.369999,59.419998,12768200,51.825539', 3),
 ('06-01-2012,59.419998,59.450001,58.869999,59,8069400,51.45922', 4),
 ('09-01-2012,59.029999,59.549999,58.919998,59.18,6679300,51.616215', 5)]

In [9]:
# 2.3 Finally getting real workable RDD

w3 = walmart.zipWithIndex().filter(lambda x: x[1] > first_row_index).map(lambda x: x[0])
w3.take(5)[0]

'03-01-2012,59.970001,61.060001,59.869999,60.330002,12668800,52.619235'

In [10]:
# 3. Removing headers using alternative method

header = walmart.first()
w4 = walmart.filter(lambda row: row!=header)
w4.take(2)

['03-01-2012,59.970001,61.060001,59.869999,60.330002,12668800,52.619235',
 '04-01-2012,60.209999,60.349998,59.470001,59.709999,9593300,52.078475']

In [11]:
# 4. Splitting records using comma -> " , " as delimiter

walmart_split = walmartRDD.map(lambda x : x.split(','))
walmart_split.take(1)[0][1]

'59.970001'

In [12]:
# 5. Converting Data types for required columns using function

def asInt(x):
    return [x[0], float(x[1]), float(x[2]), float(x[3]), float(x[4]),float(x[5]), float(x[6])]
#       return x[0], x[1]

walmart_fnl = walmart_split.map(asInt)
walmart_fnl.take(1)

[['03-01-2012',
  59.970001,
  61.060001,
  59.869999,
  60.330002,
  12668800.0,
  52.619235]]

## Problem Statements

### 1.  What is the mean of the Close column? ----> (RDD)

In [13]:
# Getting only values of column "Close"

close_prices = walmart_fnl.map(lambda x: x[4])      #  close_prices is of type --> pyspark.rdd.PipelinedRDD
# close_prices.mean()

# Finding mean of Close prices 

meanClose = close_prices.mean()
print(f"\nMean of Close column in Walmart : \n{meanClose}")


Mean of Close column in Walmart : 
72.38844998012719


### 2. What is the max High per year? ----> (RDD)

In [14]:
# Using map() to make K,V pairs of (year, High)

yearHigh_paired = walmart_fnl.map(lambda x: (int(x[0][6:]), x[2]))
yearHigh_paired.take(5)

[(2012, 61.060001),
 (2012, 60.349998),
 (2012, 59.619999),
 (2012, 59.450001),
 (2012, 59.549999)]

In [35]:
# Using reduceByKey() to group year by max of High

maxHigh_perYr = yearHigh_paired.reduceByKey(max)
type(maxHigh_perYr)

print('Max High of Walmart stocks every Year(2012-2016): \n\n' ,maxHigh_perYr.sortBy(lambda x: x[1], ascending=False).take(5))

# using -----------> DataFrames

In [36]:
# 1. Creating DataFrame for walmart dataset 

walmartDF = spark.read.load('walmart_stock.csv', format='csv', sep=',', 
                              header='true', inferSchema=True)

# Changing data type for Date column
walmartDF = walmartDF.withColumn('Date', to_date(walmartDF.Date, 'dd-MM-yyyy'))
walmartDF.show(5)
walmartDF.printSchema()

+----------+---------+---------+---------+---------+--------+---------+
|      Date|     Open|     High|      Low|    Close|  Volume|Adj Close|
+----------+---------+---------+---------+---------+--------+---------+
|2012-01-03|59.970001|61.060001|59.869999|60.330002|12668800|52.619235|
|2012-01-04|60.209999|60.349998|59.470001|59.709999| 9593300|52.078475|
|2012-01-05|59.349998|59.619999|58.369999|59.419998|12768200|51.825539|
|2012-01-06|59.419998|59.450001|58.869999|     59.0| 8069400| 51.45922|
|2012-01-09|59.029999|59.549999|58.919998|    59.18| 6679300|51.616215|
+----------+---------+---------+---------+---------+--------+---------+
only showing top 5 rows

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [37]:
walmartDF.describe().show()

+-------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|             Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|             1258|             1258|             1258|             1258|             1258|             1258|
|   mean|72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev| 6.76809024470826|6.768186808159218|6.744075756255496| 6.75685916373299|  4519780.8431556|6.722609449996858|
|    min|        56.389999|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|        90.800003|        90.970001|            89.25|        90.470001|         80898100|        84.914216|
+-------+-----------------+-----------------+-----------

### Bonus Question!

#### There are too many decimal places for mean and stddev in the describe() dataframe. Format the  numbers to just show up to two decimal places. Pay careful attention to the datatypes that  .describe() returns, we didn't cover how to do this exact formatting, but we covered something very similar.


In [38]:
# Storing statistics in separate DF
wmt_desc = walmartDF.describe()

# Changing datatypes of required columns
wmt_desc = wmt_desc.select("summary",wmt_desc["Open"].cast('double'), wmt_desc["High"].cast('double'), 
                                wmt_desc["Low"].cast('double'), wmt_desc["Close"].cast('double'),
                               wmt_desc["Volume"].cast('double'), wmt_desc["Adj Close"].cast('double'))
wmt_desc.printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Adj Close: double (nullable = true)



In [39]:
wmt_desc.select("summary", round("Open",2).alias("Open"), round("High",2).alias("High"), round("Low",2).alias("Low"),
               round("Close",2).alias("Close"), round("Volume",2).alias("Volume"), round("Adj Close",2).alias("Adj Close")).show()

+-------+------+------+------+------+----------+---------+
|summary|  Open|  High|   Low| Close|    Volume|Adj Close|
+-------+------+------+------+------+----------+---------+
|  count|1258.0|1258.0|1258.0|1258.0|    1258.0|   1258.0|
|   mean| 72.36| 72.84| 71.92| 72.39|8222093.48|    67.24|
| stddev|  6.77|  6.77|  6.74|  6.76|4519780.84|     6.72|
|    min| 56.39| 57.06|  56.3| 56.42| 2094900.0|    50.36|
|    max|  90.8| 90.97| 89.25| 90.47| 8.08981E7|    84.91|
+-------+------+------+------+------+----------+---------+



### 3. Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

In [40]:
wmt_HVRatioDF = walmartDF.withColumn("HV Ratio", walmartDF.High/walmartDF.Volume)
wmt_HVRatioDF.select("HV Ratio").show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001474E-6|
|7.071764823529411...|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



# using -------------> SparkSQL

### Creating SQL table from DF

In [41]:
walmartDF.createOrReplaceTempView("walmart")
spark.sql("select * from walmart limit 5").show()

+----------+---------+---------+---------+---------+--------+---------+
|      Date|     Open|     High|      Low|    Close|  Volume|Adj Close|
+----------+---------+---------+---------+---------+--------+---------+
|2012-01-03|59.970001|61.060001|59.869999|60.330002|12668800|52.619235|
|2012-01-04|60.209999|60.349998|59.470001|59.709999| 9593300|52.078475|
|2012-01-05|59.349998|59.619999|58.369999|59.419998|12768200|51.825539|
|2012-01-06|59.419998|59.450001|58.869999|     59.0| 8069400| 51.45922|
|2012-01-09|59.029999|59.549999|58.919998|    59.18| 6679300|51.616215|
+----------+---------+---------+---------+---------+--------+---------+



### 3. What day had the Peak High in Price?

In [42]:
peakHighDF = spark.sql("SELECT Date FROM walmart WHERE High = (SELECT MAX(High) FROM walmart)")
peakHighDF.head()['Date']

datetime.date(2015, 1, 13)

### 4. What percentage of the time was the High greater than 80 dollars ? 

In [43]:
high_greater80DF = spark.sql("SELECT Date FROM walmart WHERE High > 80")
(high_greater80DF.count() / walmartDF.count()) * 100

9.141494435612083

### 5. What is the average Close for each Calendar Month? 


In [44]:
avgCloseDF = spark.sql("SELECT YEAR(Date) AS year, MONTH(Date) AS month, AVG(Close) AS avg_close\
                       FROM walmart GROUP BY YEAR(Date), month(Date) ORDER BY YEAR(Date), MONTH(Date)")
avgCloseDF.show(60)

+----+-----+------------------+
|year|month|         avg_close|
+----+-----+------------------+
|2012|    1|        60.2354999|
|2012|    2|            60.898|
|2012|    3|60.433636818181796|
|2012|    4|60.149000150000006|
|2012|    5|61.456363409090905|
|2012|    6| 67.50380961904762|
|2012|    7| 72.40666661904763|
|2012|    8| 73.04478265217392|
|2012|    9| 74.18157921052631|
|2012|   10| 75.30619061904761|
|2012|   11| 71.10952333333333|
|2012|   12| 69.71100009999999|
|2013|    1| 69.09476142857143|
|2013|    2| 70.62315857894738|
|2013|    3| 73.43649940000002|
|2013|    4| 77.68954572727273|
|2013|    5| 77.81636368181817|
|2013|    6| 74.97800020000001|
|2013|    7| 77.11545418181818|
|2013|    8| 75.22409204545455|
|2013|    9|        74.4395005|
|2013|   10| 74.97913104347826|
|2013|   11| 78.97300075000001|
|2013|   12|  78.7752382857143|
|2014|    1| 76.53142833333334|
|2014|    2| 74.05578978947368|
|2014|    3| 75.30238076190474|
|2014|    4| 77.80857085714285|
|2014|  