In [1]:
# 0: Start a Spark Session and import the libraries you will need
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
# Read in the data from the AMZN.csv file from the assignmnet folder
amzn_df = spark.read.format("csv").option("header", "true").load("gs://lgenders-5132/assignment/AMZN.csv")
amzn_df.show(10)

                                                                                

+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
+----------+---------+---------+---------+---------+---------+--------+
|2013-01-02|12.804000|12.905000|12.663000|12.865500|12.865500|65420000|
|2013-01-03|12.863500|13.044000|12.818500|12.924000|12.924000|55018000|
|2013-01-04|12.879000|12.990000|12.832500|12.957500|12.957500|37484000|
|2013-01-07|13.148500|13.486500|13.133500|13.423000|13.423000|98200000|
|2013-01-08|13.353500|13.449000|13.178500|13.319000|13.319000|60214000|
|2013-01-09|13.408500|13.475000|13.270000|13.317500|13.317500|45312000|
|2013-01-10|13.427000|13.437000|13.115000|13.267000|13.267000|57268000|
|2013-01-11|13.255000|13.421500|13.205500|13.397000|13.397000|48266000|
|2013-01-14|13.400000|13.713000|13.377000|13.636500|13.636500|85500000|
|2013-01-15|13.534000|13.636500|13.465000|13.595000|13.595000|46538000|
+----------+---------+---------+---------+---------+---------+--

In [3]:
######################################################################################################

In [4]:
# 1: Display the Schema of the data frame.
amzn_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [5]:
# Need to change col types to correct data types
amzn_df_clean = amzn_df.withColumn("High", amzn_df["High"].cast("double")).withColumn("Open", amzn_df["Open"].cast("double")).withColumn("Low", amzn_df["Low"].cast("double")).withColumn("Close", amzn_df["Close"].cast("double"))\
                        .withColumn("Adj Close", amzn_df["Adj Close"].cast("double")).withColumn("Volume", amzn_df["Volume"].cast("integer")).withColumn("Date", amzn_df["Date"].cast("date")).withColumnRenamed('Adj Close', 'Adj_Close')
amzn_df_clean.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj_Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [6]:
######################################################################################################

In [7]:
#2: Display the summary statistics for all columns in the data frame.
# Dropping duplicate values first
# Show number of records before dropping
amzn_df_clean.count()

                                                                                

2518

In [8]:
# Drop duplicates and get new record count
amzn_df_clean_nodup = amzn_df.dropDuplicates()
amzn_df_clean_nodup.count()
# No duplicate data

                                                                                

2518

In [9]:
# Removing NAs
amzn_df_clean_noNAs = amzn_df.na.drop()
amzn_df_clean_noNAs.count()
# no NAs

2518

In [10]:
# Summary Statistics
amzn_df_clean.describe().show()

                                                                                

+-------+-----------------+-----------------+-----------------+------------------+------------------+-------------------+
|summary|             Open|             High|              Low|             Close|         Adj_Close|             Volume|
+-------+-----------------+-----------------+-----------------+------------------+------------------+-------------------+
|  count|             2518|             2518|             2518|              2518|              2518|               2518|
|   mean|73.82353615726771|74.66189798570285| 72.8813564964257| 73.78004878752976| 73.78004878752976|8.025619868943606E7|
| stddev|53.34565607175615|53.99876250199932|52.61413456378813|53.289557886169824|53.289557886169824|4.230300092993142E7|
|    min|           12.447|          12.6465|          12.2875|           12.4115|           12.4115|           17626000|
|    max|       187.199997|       188.654007|       184.839493|        186.570496|        186.570496|          477122000|
+-------+---------------

In [11]:
######################################################################################################

In [12]:
#3: Find and display the five records that has the highest price in the High column.
amzn_df_clean.createOrReplaceTempView('high_table')
# Retrieve the top five records that have the highest price in the High col (currently as a string value)
amzn_df2_clean = spark.sql("SELECT * \
                    FROM high_table \
                    ORDER BY High DESC").show(5)

# Answer: The top 5 records with the highest price are 188.654007, 188.107498, 187.999496, 187.864502, and 187.399994
# On the respective dates of 2021-07-13, 2021-11-19, 2021-07-08, 2021-07-12, and 2021-07-09

+----------+----------+----------+----------+----------+----------+---------+
|      Date|      Open|      High|       Low|     Close| Adj_Close|   Volume|
+----------+----------+----------+----------+----------+----------+---------+
|2021-07-13|185.104996|188.654007|183.565994|183.867996|183.867996| 76918000|
|2021-11-19|185.634506|188.107498|183.785995|183.828506|183.828506| 98734000|
|2021-07-08|182.177994|187.999496|   181.056|186.570496|186.570496|103612000|
|2021-07-12|187.199997|187.864502|184.839493|185.927505|185.927505| 51432000|
|2021-07-09|186.126007|187.399994|184.669998|185.966995|185.966995| 74964000|
+----------+----------+----------+----------+----------+----------+---------+
only showing top 5 rows



                                                                                

In [13]:
######################################################################################################

In [14]:
#4: What day had the lowest price? Display the date (as given in the Date column) and the price?
amzn_df_clean.createOrReplaceTempView('low_table')
# Retrieve the lowest record and only show the date and price
amzn_df3_clean = spark.sql("SELECT Date, Low \
                            FROM low_table \
                            ORDER BY Low ASC").show(1)

# Answer: 2023-05-01 had the lowest price of 12.2875

+----------+-------+
|      Date|    Low|
+----------+-------+
|2013-05-01|12.2875|
+----------+-------+
only showing top 1 row



In [15]:
######################################################################################################

In [16]:
#5: What is the range of the Volume column? (Hint: Range is the difference between max and min values)
amzn_df_clean.createOrReplaceTempView('range_table')
# Retrieve the difference between MAX(VOLUME) and MIN(VOLUME)
amzn_df4_clean = spark.sql("SELECT MAX(Volume) - MIN(Volume) as Volume_range \
                            FROM range_table").show()
# Answer: 459,496,000

+------------+
|Volume_range|
+------------+
|   459496000|
+------------+



In [17]:
######################################################################################################

In [18]:
#6: Calculate the average of the High column. What percent of the observations had a High price greater than the average?
amzn_df_clean.createOrReplaceTempView('avg_table')
# Retrive the average of the High column first
high_avg = spark.sql("SELECT AVG(High) \
                            FROM avg_table").collect()[0][0] # returns the value 74.66189798570285 by using indices
# show the average of the High column
high_avg
# Answer: 74.66189798570285

74.66189798570285

In [19]:
# What percent of the observations had a High price greater than the average?
obs_greater_avg = amzn_df_clean.filter(amzn_df_clean['High'] > high_avg).count()
obs_greater_avg # 1208 obs greater than the average, now need total observation count

1208

In [20]:
# total obs
total_count = amzn_df_clean.count()
total_count # 2518 (matches summary stats from above)

2518

In [21]:
# calculate percent of obs had a High price greater than the average
prcnt_above_avg = (obs_greater_avg/total_count)*100
prcnt_above_avg
# Answer: 47.97458300238284 % of the observations had a High price greater than the average

47.97458300238284

In [22]:
######################################################################################################

In [23]:
#7: What is the total Volume per year? Sort the results based on the total Volume in a decreasing order.
# Make new col of year data
# import sql.functions
from pyspark.sql.functions import year
amzn_df6_clean = amzn_df_clean.withColumn('year', year('Date'))
amzn_df6_clean

DataFrame[Date: date, Open: double, High: double, Low: double, Close: double, Adj_Close: double, Volume: int, year: int]

In [24]:
# Group by year, sort in descending order, show it
total_vol = amzn_df6_clean.groupBy('year').sum('volume').sort('sum(volume)', ascending = False).show()

[Stage 31:>                                                         (0 + 1) / 1]

+----+-----------+
|year|sum(volume)|
+----+-----------+
|2018|28357952000|
|2020|24950814000|
|2016|20775126000|
|2014|20581334000|
|2019|19493002000|
|2015|19142040000|
|2022|19096256300|
|2017|17654108000|
|2021|17076362000|
|2013|14958114000|
+----+-----------+



                                                                                

In [25]:
######################################################################################################

In [26]:
#8: What is the average Adjusted Close price for each month? Sort the results in an increasing order based on the average Adjusted Close price
# Make new col of month data
from pyspark.sql.functions import month
amzn_df7_clean = amzn_df_clean.withColumn('month', month('Date'))
amzn_df7_clean

DataFrame[Date: date, Open: double, High: double, Low: double, Close: double, Adj_Close: double, Volume: int, month: int]

In [27]:
amzn_df7_clean.createOrReplaceTempView('avg_monthly_table')
# Retrive the average of the adj close price column first
avg_mon_sorted = spark.sql("SELECT month, AVG(Adj_Close) as adj_close_avg FROM avg_monthly_table GROUP BY month ORDER BY adj_close_avg ASC").show()
avg_mon_sorted

+-----+-----------------+
|month|    adj_close_avg|
+-----+-----------------+
|    1|67.00627561083743|
|    2|68.95255491623037|
|    5|69.37138398578199|
|    3|69.88013780275226|
|    4|71.78249272596155|
|    6|73.02435191079809|
|   12|75.53066592417065|
|   10|76.20908837556561|
|   11|76.53358077450977|
|    7|77.88898079245286|
|    9|78.86672661951219|
|    8| 79.5366561040724|
+-----+-----------------+



In [28]:
######################################################################################################

In [29]:
#9: Using SQL commands, create a new data frame that includes the records for the days where 
# closing price is lower than the opening price. Display the number of records in this new data frame.
amzn_df_clean.createOrReplaceTempView('close_lwr_open')
# Retrieve the df where close < open first
# Creating a col called open_close_diff to check work
close_lessthan = spark.sql("SELECT Date, Close, Open, ROUND((Open - Close),2) AS open_close_diff FROM close_lwr_open WHERE Close < Open").show()

+----------+-------+-------+---------------+
|      Date|  Close|   Open|open_close_diff|
+----------+-------+-------+---------------+
|2013-01-08| 13.319|13.3535|           0.03|
|2013-01-09|13.3175|13.4085|           0.09|
|2013-01-10| 13.267| 13.427|           0.16|
|2013-01-16|13.4465|13.5265|           0.08|
|2013-01-17| 13.524| 13.575|           0.05|
|2013-01-22|13.5095| 13.581|           0.07|
|2013-01-23|13.4055|13.5285|           0.12|
|2013-01-28| 13.802| 14.189|           0.39|
|2013-01-29|13.0175|13.7675|           0.75|
|2013-01-30| 13.638|  14.15|           0.51|
|2013-01-31| 13.275| 13.552|           0.28|
|2013-02-01|  13.25|13.4465|            0.2|
|2013-02-04| 12.999| 13.139|           0.14|
|2013-02-06| 13.111| 13.258|           0.15|
|2013-02-07|13.0115| 13.205|           0.19|
|2013-02-11|12.8605|  13.16|            0.3|
|2013-02-12| 12.935|12.9595|           0.02|
|2013-02-15|13.2545|13.3815|           0.13|
|2013-02-20|13.3205|  13.51|           0.19|
|2013-02-2

In [30]:
# Display the number of records in this new data frame (close_lessthan) - continuing to use SQL
amzn_df_clean.createOrReplaceTempView('close_lwr_open')
# Retrieve the number of records in close_lessthan
num_records = spark.sql("SELECT COUNT(*) FROM close_lwr_open WHERE Close < Open").show()
# Answer: 1272 records where close is less than open price

+--------+
|count(1)|
+--------+
|    1272|
+--------+



In [31]:
######################################################################################################

In [32]:
#10: Perform the following using SQL commands: Count the number of days in each month of each
# year during the last three years, where closing price is at least $5 less than the opening price. The
# resulting data frame should be sorted based on Year first and then Month in increasing order.
# Make new col of month data and year data
from pyspark.sql.functions import month
from pyspark.sql.functions import year
amzn_df10_clean = amzn_df_clean.withColumn('Year', year('Date')).withColumn('Month', month('Date'))
amzn_df10_clean.show()

+----------+-------+-------+-------+-------+---------+---------+----+-----+
|      Date|   Open|   High|    Low|  Close|Adj_Close|   Volume|Year|Month|
+----------+-------+-------+-------+-------+---------+---------+----+-----+
|2013-01-02| 12.804| 12.905| 12.663|12.8655|  12.8655| 65420000|2013|    1|
|2013-01-03|12.8635| 13.044|12.8185| 12.924|   12.924| 55018000|2013|    1|
|2013-01-04| 12.879|  12.99|12.8325|12.9575|  12.9575| 37484000|2013|    1|
|2013-01-07|13.1485|13.4865|13.1335| 13.423|   13.423| 98200000|2013|    1|
|2013-01-08|13.3535| 13.449|13.1785| 13.319|   13.319| 60214000|2013|    1|
|2013-01-09|13.4085| 13.475|  13.27|13.3175|  13.3175| 45312000|2013|    1|
|2013-01-10| 13.427| 13.437| 13.115| 13.267|   13.267| 57268000|2013|    1|
|2013-01-11| 13.255|13.4215|13.2055| 13.397|   13.397| 48266000|2013|    1|
|2013-01-14|   13.4| 13.713| 13.377|13.6365|  13.6365| 85500000|2013|    1|
|2013-01-15| 13.534|13.6365| 13.465| 13.595|   13.595| 46538000|2013|    1|
|2013-01-16|

In [33]:
amzn_df10_clean.createOrReplaceTempView('question_10')
# Retrieve the dates where closing price is at least $5 less than the opening price and sort by year then month in increasing order
# Adding a col that takes the open-close difference to check work and naming Open_Close_Diff
q_10 = spark.sql("SELECT Date, Year, Month, Open, Close, ROUND((Open - Close), 1) as Open_Close_Diff FROM question_10 WHERE Open - Close >= 5 ORDER BY Year ASC, Month ASC").show()

+----------+----+-----+----------+----------+---------------+
|      Date|Year|Month|      Open|     Close|Open_Close_Diff|
+----------+----+-----+----------+----------+---------------+
|2018-10-29|2018|   10|      83.0|    76.944|            6.1|
|2018-10-24|2018|   10| 88.684998| 83.209999|            5.5|
|2018-10-10|2018|   10| 92.894501| 87.762497|            5.1|
|2020-07-13|2020|    7|162.552994|155.199997|            7.4|
|2020-07-23|2020|    7|154.913498|149.327499|            5.6|
|2020-09-16|2020|    9|158.999496|153.904999|            5.1|
|2020-09-10|2020|    9|165.360992|158.755493|            6.6|
|2020-09-23|2020|    9|  156.0215|149.992996|            6.0|
|2020-09-03|2020|    9|    174.25|168.399994|            5.9|
|2020-10-30|2020|   10|157.887497|151.807495|            6.1|
|2021-01-27|2021|    1|167.074493|161.628998|            5.4|
|2021-02-03|2021|    2|171.250504|165.626495|            5.6|
|2021-11-22|2021|   11|   183.819|178.628494|            5.2|
|2021-12

In [34]:
# Retrieve the number of records in q_10 by each year and each month
amzn_df10_clean.createOrReplaceTempView('question_10')
num_records_10 = spark.sql("SELECT Year, Month, COUNT(*) FROM question_10 WHERE Open - Close >= 5 GROUP BY Year, Month").show()

+----+-----+--------+
|Year|Month|count(1)|
+----+-----+--------+
|2022|   10|       1|
|2018|   10|       3|
|2020|    9|       4|
|2022|    2|       1|
|2021|   11|       1|
|2022|   11|       2|
|2022|    3|       2|
|2021|   12|       1|
|2021|    2|       1|
|2022|    1|       3|
|2022|    5|       1|
|2021|    1|       1|
|2020|    7|       2|
|2020|   10|       1|
|2022|    6|       1|
|2022|    4|       3|
|2022|    8|       1|
+----+-----+--------+

