# Spark Assignment 

#### Use the walmart_stock.csv file to Answer and complete the  tasks below!

#### Start a simple Spark Session

In [1]:
import findspark
findspark.init()

In [2]:
# SparkSession is an entry point to PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
import pyspark.sql.functions as func

In [3]:
# use a variable 'spark' to store the SparkSession instance
spark = SparkSession.builder.appName('Assignment1').getOrCreate()

#### Load the Walmart Stock CSV File into a data frame called df, have Spark infer the data types.

In [4]:
# load the Walmart dataset
dframe = spark.read.csv('walmart_stock.csv', inferSchema=True, header=True)

#### Print the column names

In [5]:
# print the Walmart dataset
dframe.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

#### What does the Schema look like?

In [6]:
# viewing the dataset schema
dframe.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



#### Print out the first 5 rows.

In [7]:
# printing out the 5 rows using spark sql
sql = dframe.createOrReplaceTempView('walmart_stock')
sql = spark.sql('SELECT *  FROM walmart_stock LIMIT 5')
sql.show()

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+



#### Use describe() to learn about the DataFrame.

In [8]:
# Describing the dataframe
dframe.describe()

DataFrame[summary: string, Open: string, High: string, Low: string, Close: string, Volume: string, Adj Close: string]

In [9]:
# We store the dataFrame discibtion in a new dataFrame called 'dfDescribe'
dfDescribe = dframe.describe()
# Notice the data types in dfDescribe
dfDescribe.printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



#### Format the numbers in dfDescribe to just show up two decimal places
#### If you get stuck here, you can continue the rest of the assignment and try this part later


In [10]:
# using for loop to round the columns to 2two decimal places
for v in dfDescribe.columns[1:]:
    dfDescribe = dfDescribe.withColumn(v, func.round(v, 2))

dfDescribe.show()

+-------+------+------+------+------+----------+---------+
|summary|  Open|  High|   Low| Close|    Volume|Adj Close|
+-------+------+------+------+------+----------+---------+
|  count|1258.0|1258.0|1258.0|1258.0|    1258.0|   1258.0|
|   mean| 72.36| 72.84| 71.92| 72.39|8222093.48|    67.24|
| stddev|  6.77|  6.77|  6.74|  6.76|4519780.84|     6.72|
|    min| 56.39| 57.06|  56.3| 56.42| 2094900.0|    50.36|
|    max|  90.8| 90.97| 89.25| 90.47| 8.08981E7|    84.91|
+-------+------+------+------+------+----------+---------+



#### Create a new dataframe with a column called Ratio that is the ratio of the High Price versus volume of stock traded for a day. Print the first five rows in that column

In [11]:
# Create a new dataframe with a column called Ratio 
# which shows the ratio of High Price versus volume of stock traded for a day
dframe_ratio = dframe.withColumn('Ratio', (dframe['Volume'] / dframe['High']))
dframe_ratio.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|             Ratio|
+-------------------+------------------+---------+---------+------------------+--------+------------------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|207481.16266817617|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475| 158961.0657485026|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|214159.68155249383|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|135734.22816258657|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|1121

#### What day had the Peak in the column 'High' in Price?

In [12]:
#sql = dframe.createOrReplaceTempView('walmart_stock')
sql = spark.sql('SELECT max_by(Date, High) Peak_day FROM walmart_stock')
sql.show()

+-------------------+
|           Peak_day|
+-------------------+
|2015-01-13 00:00:00|
+-------------------+



#### What is the mean of the Close column (print it as a float number)?

In [13]:
# the mean of the Close column
dframe.select(format_number(avg('Close'),5).alias('Average(Close)')).show()

+--------------+
|Average(Close)|
+--------------+
|      72.38845|
+--------------+



#### What is the max and min of the Volume column?

In [14]:
# create a dictionary of both the max and min of the Volume column
dict_1 = dframe.select(max(dframe['Volume']).alias('Max_Volume')).collect()[0].asDict()
dict_2 = dframe.select(min(dframe['Volume']).alias('Min_Volume')).collect()[0].asDict()

# combining the dataframe in one 'dict_1' 
dict_1.update(dict_2)

In [15]:
#put in a list stored in data
data = [dict_1 ]

# creating a dataframe
df = spark.createDataFrame(data)
df.show()

+----------+----------+
|Max_Volume|Min_Volume|
+----------+----------+
|  80898100|   2094900|
+----------+----------+



#### How many days was the Close lower than 60 dollars?

In [16]:
# the number of days the Close is lower than 60 dollars
lower = dframe.filter("Close < 60").count()
print(f"the number of days the Close is lower than 60 dollars is {lower}")

the number of days the Close is lower than 60 dollars is 81


#### What percentage of the time was the High greater than 80 dollars ?
#### In other words, (Number of Days High>80)/(Total Days in the dataset)

In [17]:
# count each and store in variables
total_days = dframe.select(dayofyear(dframe['Date'])).count()
high_gr_than_80 = dframe.filter("High > 80").count()

In [18]:
# percentage calculation
percentage = (high_gr_than_80 / total_days) * 100
percentage
print(f"The percentage of time High is greater than 80 dollars is {percentage}")

The percentage of time High is greater than 80 dollars is 9.141494435612083


#### What is the Pearson correlation between High and Volume?
#### make sure to use spark functionality

In [19]:
# calculating the Pearson correlation between High and Volume
dframe.select(corr("High", "Volume")).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



#### What is the max High for each year?

In [20]:
# max values for each year
dframe.groupBy(year(dframe['Date']).alias('Years'))\
    .agg({"High":'max'})\
    .sort(asc(year(dframe['Date'])))\
    .show()

+-----+---------+
|Years|max(High)|
+-----+---------+
| 2012|77.599998|
| 2013|81.370003|
| 2014|88.089996|
| 2015|90.970001|
| 2016|75.190002|
+-----+---------+



#### What is the average Close for each  month (output should be 12 values)?

In [21]:
#the average Close valu for each  month.
dframe.groupBy(month(dframe['Date']).alias('Month'))\
    .agg({"Close":'avg'})\
    .sort(asc(month(dframe['Date'])))\
    .show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



#### Answer the following with just a few sentences:
#### Consider pandas. fillna() method has an argument called 'inplace'.
#### (1) What is the purpose of that argument?
#### (2) Do we have something similar in spark? 
#### (3) Explain your answer to (2)

--------
#### ANSWERS

#### (1) What is the purpose of that argument?

The inplace parameter in the fillna() method of Pandas DataFrame is used to specify whether to modify the DataFrame in place or return a new DataFrame with missing values filled. If inplace=True, the method will modify the original DataFrame and return None. This means that the missing values will be replaced in the original DataFrame without creating a new copy of it. 

This can be useful if you want to update the original DataFrame and save memory by not creating a copy. If inplace=False or not specified, the method will return a new DataFrame with the missing values filled, leaving the original DataFrame unchanged.

#### (2) Do we have something similar in spark? 

The fillna() method in Spark DataFrame also has an inplace parameter, but it works differently from the inplace parameter in Pandas. In Spark, there is no inplace parameter, instead, the fillna() method returns a new DataFrame with the missing values filled, leaving the original DataFrame unchanged. 

This is because Spark DataFrames are immutable, meaning that once they are created, they cannot be modified. To update the original DataFrame in Spark, you need to assign the result of fillna() back to the original DataFrame. 

#### (3) Explain your answer to (2)

In [22]:
# May take a little while on a local computer
spark = SparkSession.builder.appName("filldata").getOrCreate()
df = spark.read.csv("ContainsNull1.csv",header=True,inferSchema=True)
df.select(['Id','Sales', 'Years']).show()

+----+-----+-----+
|  Id|Sales|Years|
+----+-----+-----+
|emp1| null|    3|
|emp2| null| null|
|emp3|  345|    2|
|emp4|  456|    4|
+----+-----+-----+



In [23]:
# fill the missing values in the 'Sales' and 'Years' columns
df = df.fillna(0, subset=['Sales', 'Years'])
df.show()

+----+-----+-----+-----+
|  Id| Name|Sales|Years|
+----+-----+-----+-----+
|emp1| John|    0|    3|
|emp2| null|    0|    0|
|emp3| null|  345|    2|
|emp4|Cindy|  456|    4|
+----+-----+-----+-----+



This created a new DataFrame with missing values filled with 0 in the columns 'Sales' and 'Years', and assign the result back to the original DataFrame df.

In summary, while the fillna() method in Pandas can modify the original DataFrame in place, the fillna() method in Spark always returns a new DataFrame, which needs to be assigned back to the original DataFrame to update it.