#### Import Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

#### 1- Start a simple Spark Session

In [2]:
spark = SparkSession.builder \
    .master('local') \
    .appName('Walmart_Stock') \
    .getOrCreate()

#### 2-Load the Walmart Stock CSV File, have Spark infer the data types.

In [3]:
stock = spark.read.csv('data/walmart_stock.csv')

In [4]:
stock.show(3)

+----------+------------------+---------+---------+------------------+--------+------------------+
|       _c0|               _c1|      _c2|      _c3|               _c4|     _c5|               _c6|
+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 3 rows



#### So, we need to define the header

In [5]:
stock = spark\
        .read\
        .option('header', True)\
        .csv('data/walmart_stock.csv')
stock.show(4)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 4 rows



#### 3-What are the column names?

In [6]:
stock.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

#### 4-What does the Schema look like?

In [7]:
stock.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



#### 5-Create a new dataframe with a column called HV_Ratio that is the ratio of the High Price versus volume of stock traded for a day

In [8]:
newStock = stock\
            .withColumn('HV_Ratio', col('High')/col('Volume'))

In [9]:
newStock.show(3)

+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|            HV_Ratio|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
only showing top 3 rows



#### 6-What day had the Peak High in Price?

In [10]:
stock\
    .orderBy(col('High').desc())\
    .select('Date')\
    .head()

Row(Date='2015-01-13')

#### Same with sql
* Create sql table from stock named stockSQL

In [11]:
stock.createOrReplaceTempView("stockSQL") 

In [12]:
spark.sql("""
select Date from stockSQL
where High = (select max(High) from stockSQL)
""").show()

+----------+
|      Date|
+----------+
|2015-01-13|
+----------+



In [13]:
# Complete the work using sql and spark (both)

#### 7-What is the mean of the Close column?
* Spark

In [14]:
stock\
    .agg(mean('Close').alias('Moyenne Close'))\
    .show()

+-----------------+
|    Moyenne Close|
+-----------------+
|72.38844998012726|
+-----------------+



* SQL

In [15]:
spark.sql("""
select mean(Close) as `Moyenne Close` 
from stockSQL
""").show()

+-----------------+
|    Moyenne Close|
+-----------------+
|72.38844998012726|
+-----------------+



#### 8-What is the max and min of the Volume column?
* Saprk

In [16]:
stock\
    .agg(min('Volume').alias('Minimum  volume'),\
         max('Volume').alias('Maximum  volume'))\
    .show()

+---------------+---------------+
|Minimum  volume|Maximum  volume|
+---------------+---------------+
|       10010500|        9994400|
+---------------+---------------+



In [17]:
spark.sql("""
select  min(Volume) as `Minimum  volume`,
        max(Volume) as `Maximum  volume`
from stockSQL
""").show()

+---------------+---------------+
|Minimum  volume|Maximum  volume|
+---------------+---------------+
|       10010500|        9994400|
+---------------+---------------+



#### 9-How many days was the Close lower than 60 dollars?
* Spark

In [18]:
stock\
    .filter(col('Close')<60)\
    .agg(count('*').alias('Nb Jours'))\
    .show()

+--------+
|Nb Jours|
+--------+
|      81|
+--------+



* SQL

In [19]:
spark.sql("""
select  count(*) as `Nb Jours`
from stockSQL
where Close < 60
""").show()

+--------+
|Nb Jours|
+--------+
|      81|
+--------+



#### 10-What percentage of the time was the High greater than 80 dollars ?(In other words, (Number of Days High>80)/(Total Days in the dataset))
* Spark

In [20]:
p = stock\
    .filter(col('High')>80)\
    .agg(count('*'))\
    .collect()[0][0]
stock\
    .agg(round(p/count('*')*100, 2)\
    .alias('Percentage')).show()

+----------+
|Percentage|
+----------+
|      8.43|
+----------+



* SQL

In [21]:
spark.sql("""
select round(
                (select count(*) from stockSQL
                where High>80)
                /
                (count(*))
                * 100
                , 2
            )
as Percentage 
from stockSQL
""").show()

+----------+
|Percentage|
+----------+
|      8.43|
+----------+



#### 11-What is the max High per year?
* Spark

In [22]:
byYear = stock\
            .withColumn('Year', substring('Date', 1, 4).cast('int'))
byYear\
    .groupBy(col('Year'))\
    .agg(max('High').alias('Maximum'))\
    .orderBy('Year')\
    .show()

+----+---------+
|Year|  Maximum|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



In [23]:
spark.sql("""
select  substring(Date, 1, 4) as `Year`, max(High) as `Maximum`
from stockSQL
group by Year 
order by Year
""").show()

+----+---------+
|Year|  Maximum|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



#### 12-What is the average Close for each Calendar Month? In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months.
* Spark

In [24]:
byMonth = stock\
            .withColumn('Month', substring('Date', 6, 2).cast('int'))
byMonth\
    .groupBy(col('Month'))\
    .agg(round(mean('Close'), 2).alias('Average'))\
    .orderBy('Month')\
    .show()

+-----+-------+
|Month|Average|
+-----+-------+
|    1|  71.45|
|    2|  71.31|
|    3|  71.78|
|    4|  72.97|
|    5|  72.31|
|    6|   72.5|
|    7|  74.44|
|    8|  73.03|
|    9|  72.18|
|   10|  71.58|
|   11|  72.11|
|   12|  72.85|
+-----+-------+



* SQL

In [25]:
spark.sql("""
select  substring(Date, 6, 2) as `Month`, 
        round(mean(Close), 2) as `Average`
from stockSQL
group by Month 
order by Month
""").show()

+-----+-------+
|Month|Average|
+-----+-------+
|   01|  71.45|
|   02|  71.31|
|   03|  71.78|
|   04|  72.97|
|   05|  72.31|
|   06|   72.5|
|   07|  74.44|
|   08|  73.03|
|   09|  72.18|
|   10|  71.58|
|   11|  72.11|
|   12|  72.85|
+-----+-------+



* Spark but transforming months numbers to names 

In [26]:
## Additional needed packages 
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType


byMonth = stock\
            .withColumn('Month', substring('Date', 6, 2).cast('int'))

month_lst = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']

udf = UserDefinedFunction(lambda x: month_lst[int(x%12) - 1], StringType())
byMonth = byMonth\
            .select(*[udf(column)\
            .alias('Month') if column == 'Month' else column for column in byMonth.columns])       

byMonth\
    .groupBy(col('Month'))\
    .agg(round(mean('Close'), 2).alias('Average'))\
    .show()

+-----+-------+
|Month|Average|
+-----+-------+
|  Oct|  71.58|
|  Sep|  72.18|
|  Dec|  72.85|
|  Aug|  73.03|
|  May|  72.31|
|  Jun|   72.5|
|  Feb|  71.31|
|  Nov|  72.11|
|  Mar|  71.78|
|  Jan|  71.45|
|  Apr|  72.97|
|  Jul|  74.44|
+-----+-------+



### <center> Don't forget to stop the session !</center>

In [27]:
spark.stop()