### **Exercise 1**
Start a simple Spark session

In [1]:
# Look into https://spark.apache.org/downloads.html for the latest version
spark_mirror = "https://mirrors.sonic.net/apache/spark"
spark_version = "3.3.0"
hadoop_version = "3"

# Install Java 8 (Spark does not work with newer Java versions)
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and extract Spark binary distribution
! rm -rf spark-{spark_version}-bin-hadoop{hadoop_version}.tgz spark-{spark_version}-bin-hadoop{hadoop_version}
! wget -q {spark_mirror}/spark-{spark_version}/spark-{spark_version}-bin-hadoop{hadoop_version}.tgz
! tar xzf spark-{spark_version}-bin-hadoop{hadoop_version}.tgz

# The only 2 environment variables needed to set up Java and Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{spark_version}-bin-hadoop{hadoop_version}"

# Set up the Spark environment based on the environment variable SPARK_HOME 
! pip install -q findspark
import findspark
findspark.init()

In [2]:
#import SparkSesion
import findspark
findspark.init()

#import SparkSession
appname = "TestApp"
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(appname).getOrCreate()

#Import Pandas
import pandas as pd

### **Exercise 2**
Load the Walmart Stock CSV file, let Spark infer the data types

In [3]:
from google.colab import files 
  
  
walmart = files.upload()

Saving walmart_stock.csv to walmart_stock.csv


In [4]:
df = spark.read.options(inferSchema=True, header=True).csv('walmart_stock.csv')
df.show()

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000

### **Exercise 3**
Show the column names

In [None]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

### **Exercise 4**

What does the Schema look like?

In [None]:
df = spark.read.options(header=True,inferSchema=True).csv("walmart_stock.csv")

In [None]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### **Exercise 5**
Print out the first 5 rows

In [None]:
df.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



### **exercise 6**
Use describe() to learn about the DataFrame

In [None]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

### **Exercise 7**
(optional and not easy) Format the numbers to show only 2 decimal places

In [6]:
#we need to convert the dataframe into pandas format in order to use
#the commands
df2 = df.toPandas()
df2.round(decimals=2)


Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.97,61.06,59.87,60.33,12668800,52.62
1,2012-01-04,60.21,60.35,59.47,59.71,9593300,52.08
2,2012-01-05,59.35,59.62,58.37,59.42,12768200,51.83
3,2012-01-06,59.42,59.45,58.87,59.00,8069400,51.46
4,2012-01-09,59.03,59.55,58.92,59.18,6679300,51.62
...,...,...,...,...,...,...,...
1253,2016-12-23,69.43,69.75,69.36,69.54,4803900,69.03
1254,2016-12-27,69.30,69.82,69.25,69.70,4435700,69.19
1255,2016-12-28,69.94,70.00,69.26,69.31,4875700,68.80
1256,2016-12-29,69.21,69.52,69.12,69.26,4298400,68.75


### **Exercise 8**
Create a new DataFrame with a column called  'HV Ratio' that is the ratio of the High Price vs Volume of Stock traded for a day

In [7]:
dfc = df.withColumn('HV Ratio',df.High/df.Volume).show()

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV Ratio|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06 00:00:00|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51

### **Exercise 9**
What day had the Peak High in Price?

In [None]:
df.createOrReplaceTempView('table')
spark.sql("SELECT Date FROM table where High = (SELECT (MAX(High)) from table);").show()

+-------------------+
|               Date|
+-------------------+
|2015-01-13 00:00:00|
+-------------------+



### **Exercise 10**
What is the mean of the Close column?

In [None]:
from pyspark.sql.functions import mean
df.select(mean ('close')).show()

+-----------------+
|       avg(close)|
+-----------------+
|72.38844998012726|
+-----------------+



### **Exercise 11**
What is the max and min of the Volume column?

In [None]:
#min
from pyspark.sql.functions import min
df.select(min ('Volume')).show()

+-----------+
|min(Volume)|
+-----------+
|    2094900|
+-----------+



In [None]:
#max
from pyspark.sql.functions import max
df.select(max ('Volume')).show()

+-----------+
|max(Volume)|
+-----------+
|   80898100|
+-----------+



### **Exercise 12**
How many days was the Close lower than 60 dollars?

In [None]:
dfclose=df.select('Close')
dfclose.createOrReplaceTempView('table')

In [None]:
spark.sql('SELECT * FROM table WHERE close < 60').count()

81

### **Exercise 13**
What percentage of time was the High greater than 80 dollars?

In [8]:
dfhighper=df.select('Date','High')
dfhighper.createOrReplaceTempView('table')

In [9]:
spark.sql('SELECT Date from table WHERE high > 80').count()

115

### **Exercise 14**
What is the Pearson correlation between High and Volume?

In [None]:
df.corr('High', 'Volume')

-0.3384326061737161

### **Exercise 15**
What is the max High per year?

In [None]:
#pd.to_datetime converts an argument to datetime
#.groupby groups DataFrame using a mapper or by a Series of columns
#pd.DatetimeIndex(df).year/month extracts whatever you want
#we need to have the dataframe in pandas format in order to use
#the commands
#invalid test= spark.sql("SELECT YEAR (Date) AS Year, MAX(High) FROM table;").show()
#I could have used .dt.year, which extracts the year of the datetime.
df2['Date']=pd.to_datetime(df2['Date'], format='%Y-%m-%d')
year=pd.DatetimeIndex(df2['Date']).year
df2.groupby(year)['High'].max()

Date
2012    77.599998
2013    81.370003
2014    88.089996
2015    90.970001
2016    75.190002
Name: High, dtype: float64

### **Exercise 16**
What is the average Close for each calendar month?

In [None]:
#I could have used .dt.month which converts months as january=1...december=12
#valid test=df2.groupby(df2['Date'].dt.month)['Close'].mean()
month=pd.DatetimeIndex(df2['Date']).month
df2.groupby(month)['Close'].mean()


Date
1     71.448020
2     71.306804
3     71.777944
4     72.973619
5     72.309717
6     72.495377
7     74.439719
8     73.029819
9     72.184118
10    71.578545
11    72.111089
12    72.847925
Name: Close, dtype: float64