# Start a Spark Session

In [162]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WalmartStocks").getOrCreate()

## Load the SF Fire calls CSV File, have Spark infer the data types.

In [163]:
df = spark.read.csv('walmart_stock.csv',header=True,inferSchema=True)

### What are the column names?

In [41]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

### What does the Schema look like?

In [42]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### Print out the first 5 columns.

In [43]:
df.head(5)

[Row(Date=datetime.date(2012, 1, 3), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.date(2012, 1, 4), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date=datetime.date(2012, 1, 5), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date=datetime.date(2012, 1, 6), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date=datetime.date(2012, 1, 9), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

In [44]:
for row in df.head(5):
    print(row)
    print('\n')

Row(Date=datetime.date(2012, 1, 3), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)


Row(Date=datetime.date(2012, 1, 4), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)


Row(Date=datetime.date(2012, 1, 5), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)


Row(Date=datetime.date(2012, 1, 6), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)


Row(Date=datetime.date(2012, 1, 9), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)




### Use describe() to learn about the DataFrame.

In [45]:
df.describe()

DataFrame[summary: string, Open: string, High: string, Low: string, Close: string, Volume: string, Adj Close: string]

In [46]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [47]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [48]:
# To reduce the decimal points.
from pyspark.sql.functions import format_number

In [50]:
result = df.describe()
result.select(result['summary'],
              format_number(result['Open'].cast('float'),2).alias('Open'),
              format_number(result['High'].cast('float'),2).alias('High'),
              format_number(result['Low'].cast('float'),2).alias('Low'),
              format_number(result['Close'].cast('float'),2).alias('Close'),
              result['Volume'].cast('int').alias('Volume')
             ).show()

+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



### Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

In [165]:
df_2 = df.withColumn("HV Ratio",df["High"]/df["Volume"])
df_2.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



### What day had the Peak High in Price?

In [170]:
df.orderBy(df["High"].desc()).head(1)[0][0]

datetime.date(2015, 1, 13)

In [57]:
df.select("High").show()

+------------------+
|              High|
+------------------+
|         61.060001|
|         60.349998|
|         59.619999|
|         59.450001|
|         59.549999|
|59.709998999999996|
|         59.529999|
|              60.0|
|59.610001000000004|
|60.110001000000004|
|         60.029999|
|             60.73|
|             61.25|
|             60.98|
|              62.0|
|61.610001000000004|
|             61.84|
|         61.119999|
|             61.32|
|             61.57|
+------------------+
only showing top 20 rows



### What is the mean of the Close column?

In [58]:
from pyspark.sql.functions import mean

In [59]:
df.select(mean("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



### What is the max and min of the Volume column?

In [60]:
from pyspark.sql.functions import max,min

In [61]:
df.select(max("Volume"), min("Volume")).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



### How many days was the Close lower than 60 dollars?

In [70]:
df.filter("Close < 60").show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             58.98|59.040001000000004| 6907300|         51.494109|
|2012-01-11|         59.060001|         59.529999|59.04000100000

In [73]:
close = df.select("Close")

In [76]:
close.filter("Close<60").show()

+------------------+
|             Close|
+------------------+
|59.709998999999996|
|         59.419998|
|              59.0|
|             59.18|
|59.040001000000004|
|         59.400002|
|              59.5|
|59.540001000000004|
|         59.849998|
|         58.599998|
|58.540001000000004|
|58.790001000000004|
|58.459998999999996|
|             58.93|
|         59.080002|
|             58.82|
|59.009997999999996|
|         59.400002|
|         58.970001|
|59.860001000000004|
+------------------+
only showing top 20 rows



In [86]:
close.filter("Close<60").count()

81

In [110]:
from pyspark.sql.functions import count
result = df.filter(df['Close'] < 60)
result.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|          81|
+------------+



### What percentage of the time was the High greater than 80 dollars ?
In other words, (Number of Days High>80)/(Total Days in the dataset)

In [164]:
Percentage = (df.filter(df["High"]>80).count()/df.count())*100

In [107]:
Percentage

9.141494435612083

### What is the Pearson correlation between High and Volume?

In [113]:
from pyspark.sql.functions import corr
df.select(corr("High","Volume")).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



### What is the max High per year?

In [114]:
from pyspark.sql.functions import to_date,year,month

In [125]:
df_year = df.withColumn("df_year", year(df["Date"]))
df_year.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+-------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|df_year|
+----------+------------------+------------------+------------------+------------------+--------+------------------+-------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|   2012|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|   2012|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|   2012|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|   2012|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|   2012|


In [128]:
max_year = df_year.groupBy('df_year').max()
max_year.show()

+-------+-----------------+---------+---------+----------+-----------+-----------------+------------+
|df_year|        max(Open)|max(High)| max(Low)|max(Close)|max(Volume)|   max(Adj Close)|max(df_year)|
+-------+-----------------+---------+---------+----------+-----------+-----------------+------------+
|   2015|        90.800003|90.970001|    89.25| 90.470001|   80898100|84.91421600000001|        2015|
|   2013|        81.209999|81.370003|    80.82| 81.209999|   25683700|        73.929868|        2013|
|   2014|87.08000200000001|88.089996|86.480003| 87.540001|   22812400|81.70768000000001|        2014|
|   2012|        77.599998|77.599998|76.690002| 77.150002|   38007300|        68.568371|        2012|
|   2016|             74.5|75.190002|73.629997| 74.300003|   35076700|        73.233524|        2016|
+-------+-----------------+---------+---------+----------+-----------+-----------------+------------+



In [131]:
max_year.select("df_year","max(High)").show()

+-------+---------+
|df_year|max(High)|
+-------+---------+
|   2015|90.970001|
|   2013|81.370003|
|   2014|88.089996|
|   2012|77.599998|
|   2016|75.190002|
+-------+---------+



### What is the average Close for each Calendar Month?
In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months.

In [134]:
df_month = df.withColumn("df_month", month(df["Date"]))
df_month.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+--------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|df_month|
+----------+------------------+------------------+------------------+------------------+--------+------------------+--------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|       1|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|       1|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|       1|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|       1|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|  

In [158]:
AVG_month = df_month.groupBy('df_month').mean()
AVG_month.show()

+--------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-------------+
|df_month|        avg(Open)|        avg(High)|         avg(Low)|       avg(Close)|      avg(Volume)|   avg(Adj Close)|avg(df_month)|
+--------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-------------+
|      12|72.87952850943395|73.35566025471698|72.44481152830188|72.84792478301885|7967959.433962264|68.46031040566042|         12.0|
|       1|71.40811884158416|71.97009924752473|70.90425712871289|71.44801958415842|8761851.485148516|65.56887865346533|          1.0|
|       6| 72.5100938962264| 72.9262265471698|72.12198099056603| 72.4953774245283|8303756.603773585|67.43827906603772|          6.0|
|       3|71.69046716822429|72.20289709345795|71.31878489719628|71.77794377570092|7721836.448598131| 66.2763403084112|          3.0|
|       5|72.24349083962262|72.71783049056604|71.85292466981134|72.30

In [160]:
 AVG_month.select("df_month","avg(Close)").orderBy("df_month").show()

+--------+-----------------+
|df_month|       avg(Close)|
+--------+-----------------+
|       1|71.44801958415842|
|       2|  71.306804443299|
|       3|71.77794377570092|
|       4|72.97361900952382|
|       5|72.30971688679247|
|       6| 72.4953774245283|
|       7|74.43971943925233|
|       8|73.02981855454546|
|       9|72.18411785294116|
|      10|71.57854545454543|
|      11| 72.1110893069307|
|      12|72.84792478301885|
+--------+-----------------+

