In [32]:
# I am performing this task with both pandas and pyspark

Spark Walmart Data Analysis Project

Let's get some quick practice with your new Spark DataFrame skills, you will be asked some basic questions about some stock market data, in this case Walmart Stock from the years 2012-2017. This exercise will just ask a bunch of questions, unlike the machine learning exercises, which will be a little looser and be in the form of "Consulting Projects".

For now, just answer the questions and complete the tasks below.

Use the walmart_stock.csv file to Answer and complete the tasks below!


1.Load the Walmart Stock CSV File, have Spark infer the data types.


In [44]:
#pandas
import pandas as pd
stock=pd.read_csv('walmart_stock.csv')
print(stock.head(5))
print(stock.dtypes)

         Date       Open       High        Low      Close    Volume  Adj Close
0  2012-01-03  59.970001  61.060001  59.869999  60.330002  12668800  52.619235
1  2012-01-04  60.209999  60.349998  59.470001  59.709999   9593300  52.078475
2  2012-01-05  59.349998  59.619999  58.369999  59.419998  12768200  51.825539
3  2012-01-06  59.419998  59.450001  58.869999  59.000000   8069400  51.459220
4  2012-01-09  59.029999  59.549999  58.919998  59.180000   6679300  51.616215
Date          object
Open         float64
High         float64
Low          float64
Close        float64
Volume         int64
Adj Close    float64
dtype: object


In [45]:
#pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('stock').getOrCreate()
spark
df_pyspark=spark.read.csv("walmart_stock.csv",inferSchema=True, header=True)
print(df_pyspark)

DataFrame[Date: date, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double]


2.What are the column names?

In [2]:
#pandas
print(stock.columns)

Index(['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close'], dtype='object')


In [46]:
#pyspark
df_pyspark.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

3.What does the Schema look like?
A.root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

In [3]:
#pandas
print(stock.info())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1258 entries, 0 to 1257
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Date       1258 non-null   object 
 1   Open       1258 non-null   float64
 2   High       1258 non-null   float64
 3   Low        1258 non-null   float64
 4   Close      1258 non-null   float64
 5   Volume     1258 non-null   int64  
 6   Adj Close  1258 non-null   float64
dtypes: float64(5), int64(1), object(1)
memory usage: 68.9+ KB
None


In [48]:
#pyspark
df_pyspark.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



4.Print out the first 5 columns.

In [4]:
#pandas
print(stock.iloc[:, :5].head())


         Date       Open       High        Low      Close
0  2012-01-03  59.970001  61.060001  59.869999  60.330002
1  2012-01-04  60.209999  60.349998  59.470001  59.709999
2  2012-01-05  59.349998  59.619999  58.369999  59.419998
3  2012-01-06  59.419998  59.450001  58.869999  59.000000
4  2012-01-09  59.029999  59.549999  58.919998  59.180000


In [49]:
#pyspark
for line in df_pyspark.head(5):
    print(line,'\n')

Row(Date=datetime.date(2012, 1, 3), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996) 

Row(Date=datetime.date(2012, 1, 4), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475) 

Row(Date=datetime.date(2012, 1, 5), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539) 

Row(Date=datetime.date(2012, 1, 6), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922) 

Row(Date=datetime.date(2012, 1, 9), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004) 



5.Use describe() to learn about the DataFrame.
A.+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+


In [9]:
#pandas
print(stock.describe())

              Open         High          Low        Close        Volume  \
count  1258.000000  1258.000000  1258.000000  1258.000000  1.258000e+03   
mean     72.357854    72.839388    71.918601    72.388450  8.222093e+06   
std       6.768090     6.768187     6.744076     6.756859  4.519781e+06   
min      56.389999    57.060001    56.299999    56.419998  2.094900e+06   
25%      68.627503    69.059998    68.162503    68.632497  5.791100e+06   
50%      73.235000    73.725002    72.839996    73.265000  7.093500e+06   
75%      76.629997    77.094999    76.250000    76.709999  9.394675e+06   
max      90.800003    90.970001    89.250000    90.470001  8.089810e+07   

         Adj Close  
count  1258.000000  
mean     67.238838  
std       6.722609  
min      50.363689  
25%      63.778335  
50%      68.541162  
75%      71.105668  
max      84.914216  


In [52]:
#pyspark
df_pyspark.describe().show()

24/03/08 12:51:12 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

6.There are too many decimal places for mean and stddev in the describe() dataframe. Format the numbers to just show up to two decimal places. Pay careful attention to the datatypes that .describe() returns, we didn't cover how to do this exact formatting, but we covered something very similar.

A.+-------+--------+--------+--------+--------+----------+
|summary|    Open|    High|     Low|   Close|    Volume|
+-------+--------+--------+--------+--------+----------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,780|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,100|
+-------+--------+--------+--------+--------+----------+

In [15]:
#pandas
summary_df = stock.describe()
# Format the 'Volume' column to include commas as thousands separator
summary_df['Volume'] = summary_df['Volume'].apply(lambda x: f'{x:,.0f}')
# Define the formatting dictionary
format_dict = {'Open': '{:,.2f}','High': '{:,.2f}', 'Low': '{:,.2f}', 'Close': '{:,.2f}'}
# Apply formatting to the DataFrame
summary_df = summary_df.style.format(format_dict)
summary_df


Unnamed: 0,Open,High,Low,Close,Volume,Adj Close
count,1258.0,1258.0,1258.0,1258.0,1258,1258.0
mean,72.36,72.84,71.92,72.39,8222093,67.238838
std,6.77,6.77,6.74,6.76,4519781,6.722609
min,56.39,57.06,56.3,56.42,2094900,50.363689
25%,68.63,69.06,68.16,68.63,5791100,63.778335
50%,73.24,73.73,72.84,73.26,7093500,68.541162
75%,76.63,77.09,76.25,76.71,9394675,71.105668
max,90.8,90.97,89.25,90.47,80898100,84.914216


In [58]:
#pyspark
from pyspark.sql.functions import format_number
summary = df_pyspark.describe()
# Apply the format_number function to each column and alias them
summary_formatted = summary.select(
    summary['summary'],
    format_number(summary['Open'].cast('float'), 2).alias('Open'),
    format_number(summary['High'].cast('float'), 2).alias('High'),
    format_number(summary['Low'].cast('float'), 2).alias('Low'),
    format_number(summary['Close'].cast('float'), 2).alias('Close'),
    format_number(summary['Volume'].cast('int'), 0).alias('Volume')
)
summary_formatted.show()


+-------+--------+--------+--------+--------+----------+
|summary|    Open|    High|     Low|   Close|    Volume|
+-------+--------+--------+--------+--------+----------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,780|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,100|
+-------+--------+--------+--------+--------+----------+



7.Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

A.+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows


In [16]:
#pandas
hv_ratio_df = stock.assign(HV_Ratio=stock['High'] / stock['Volume'])
print(hv_ratio_df[['HV_Ratio']])

      HV_Ratio
0     0.000005
1     0.000006
2     0.000005
3     0.000007
4     0.000009
...        ...
1253  0.000015
1254  0.000016
1255  0.000014
1256  0.000016
1257  0.000010

[1258 rows x 1 columns]


In [59]:
#pyspark
df2 = df_pyspark.withColumn("HV Ratio",df_pyspark["High"]/df_pyspark["Volume"])
df2.select('HV Ratio').show()


+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



8.What day had the Peak High in Price?
A.datetime.datetime(2015, 1, 13, 0, 0)

In [17]:
#pandas
peak_high_row = stock[stock['High'] == stock['High'].max()]
peak_high_date = peak_high_row['Date'].iloc[0]
print(peak_high_date)

2015-01-13


In [60]:
#pyspark
df_pyspark.orderBy(df_pyspark["High"].desc()).head(1)[0][0]


datetime.date(2015, 1, 13)

9.What is the max and min of the Volume column?
A.+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+

In [21]:
#pandas
print(min(stock.Volume))
print(max(stock.Volume))

2094900
80898100


In [62]:
#pyspark
from pyspark.sql.functions import max,min
df_pyspark.select(max("Volume"),min("Volume")).show()


+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



10.How many days was the Close lower than 60 dollars?
A.81

In [18]:
#pandas
lower_than_60_df = stock[stock['Close'] < 60]
days_lower_than_60 = len(lower_than_60_df)
print(days_lower_than_60)

81


In [68]:
#pyspark
df_pyspark.filter(df_pyspark['Close'] < 60).count()


81

11.What percentage of the time was the High greater than 80 dollars ?
A. 9.141494

In [24]:
#pandas
totaldays = len(stock)
high_greater_than_80_days = stock[stock['High'] > 80]
num_high_greater_than_80_days = len(high_greater_than_80_days)
percentage_high_greater_than_80 = (num_high_greater_than_80_days / totaldays) * 100
print(round(percentage_high_greater_than_80, 6))

9.141494


In [67]:
#pyspark
df_pyspark.filter('High > 80').count() * 100/df_pyspark.count()


9.141494435612083

12.What is the Pearson correlation between High and Volume?
A.+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+

In [31]:
#pandas
pearson_corr = stock['High'].corr(stock['Volume'])
print(pearson_corr)

-0.3384326061737166


In [69]:
#pyspark
df_pyspark.corr("High","Volume")

-0.3384326061737161

13.What is the max High per year?
A.+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+

In [29]:
#pandas
stock['Date'] = pd.to_datetime(stock['Date'])
stock['Year'] = stock['Date'].dt.year
# group by 'Year' and find the maximum high for each year
max_high_per_year = stock.groupby('Year')['High'].max().reset_index()

print(max_high_per_year)

   Year       High
0  2012  77.599998
1  2013  81.370003
2  2014  88.089996
3  2015  90.970001
4  2016  75.190002


In [65]:
#Pyspark
from pyspark.sql.functions import year
yeardf = df_pyspark.withColumn("Year",year(df_pyspark["Date"]))
max_df = yeardf.groupBy('Year').max()
max_df.select('Year','max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



14.What is the average Close for each Calendar Month?
In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months.

A.+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

In [30]:
#pandas
stock['Date'] = pd.to_datetime(stock['Date'])
stock['Month'] = stock['Date'].dt.month
#group by 'Month' and find the average close for each month
avg_close_per_month = stock.groupby('Month')['Close'].mean().reset_index()

print(avg_close_per_month)

    Month      Close
0       1  71.448020
1       2  71.306804
2       3  71.777944
3       4  72.973619
4       5  72.309717
5       6  72.495377
6       7  74.439719
7       8  73.029819
8       9  72.184118
9      10  71.578545
10     11  72.111089
11     12  72.847925


In [72]:
#pyspark
month_df = df_pyspark.withColumn('Month',(df_pyspark['Date']))
#group by month and take average of all other columns
month_df = month_df.groupBy('Month').mean()
#sort by month
month_df = month_df.orderBy('Month')
month_df['Month', 'avg(Close)'].show()

+----------+------------------+
|     Month|        avg(Close)|
+----------+------------------+
|2012-01-03|         60.330002|
|2012-01-04|59.709998999999996|
|2012-01-05|         59.419998|
|2012-01-06|              59.0|
|2012-01-09|             59.18|
|2012-01-10|59.040001000000004|
|2012-01-11|         59.400002|
|2012-01-12|              59.5|
|2012-01-13|59.540001000000004|
|2012-01-17|         59.849998|
|2012-01-18|60.009997999999996|
|2012-01-19|60.610001000000004|
|2012-01-20|61.009997999999996|
|2012-01-23|             60.91|
|2012-01-24|61.389998999999996|
|2012-01-25|         61.470001|
|2012-01-26|         60.970001|
|2012-01-27|60.709998999999996|
|2012-01-30|         61.299999|
|2012-01-31|61.360001000000004|
+----------+------------------+
only showing top 20 rows

