### Start a simple spark session

In [145]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import format_number,max,min,mean,countDistinct,corr,year,month
from pyspark.sql.types import StructField,StructType,FloatType

In [82]:
spark = SparkSession.builder.appName('walmart_stocks').getOrCreate()

### Load the walmart_stock csv file

In [83]:
df = spark.read.csv('/home/lalit/Desktop/spark-2.4.5-bin-hadoop2.7/python/PySpark/Python-and-Spark-for-Big-Data-master/Spark_DataFrame_Project_Exercise/walmart_stock.csv',inferSchema=True, header = True)

In [84]:
df.show(5)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [85]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [86]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### first five rows

In [87]:
for k in range(5):
    rows = df.head(5)[k]
    print(rows,'\n')

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996) 

Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475) 

Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539) 

Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922) 

Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004) 



### Adjusting summary table for a clean look

In [88]:
df_summary = df.describe()

In [89]:
df_summary.printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [90]:
df_summary.select(df_summary['summary'],format_number(df_summary['Open'].cast('float'),2).alias('Open'),
                 format_number(df_summary['High'].cast('float'),2).alias('High'),
                 format_number(df_summary['Low'].cast('float'),2).alias('Low'),
                 format_number(df_summary['Close'].cast('float'),2).alias('Close'),
                 df_summary['Volume'].cast('int').alias('Volume'),
                 format_number(df_summary['Adj Close'].cast('float'),2).alias('Adj CLose')).show()

+-------+--------+--------+--------+--------+--------+---------+
|summary|    Open|    High|     Low|   Close|  Volume|Adj CLose|
+-------+--------+--------+--------+--------+--------+---------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258| 1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|    67.24|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|     6.72|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|    50.36|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|    84.91|
+-------+--------+--------+--------+--------+--------+---------+



### Calculating HV Ratio (High/Volume)

In [95]:
df_HVratio = df.withColumn('HV Ratio', df['High']/df['Volume'])

In [96]:
df_HVratio.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- HV Ratio: double (nullable = true)



In [97]:
df_HVratio = df_HVratio.select(df_HVratio['Date'],format_number(df_HVratio['Open'],2).alias('Open'),
                                 format_number(df_HVratio['High'],2).alias('High'),
                                 format_number(df_HVratio['Low'],2).alias('Low'),
                                 format_number(df_HVratio['Close'],2).alias('Close'),
                                 df_HVratio['Volume'].alias('Volume'),
                                 format_number(df_HVratio['Adj Close'],2).alias('Adj CLose'),
                                  format_number(df_HVratio['HV Ratio'],8).alias('HV Ratio'))

In [99]:
df_HVratio.show(10)

+-------------------+-----+-----+-----+-----+--------+---------+----------+
|               Date| Open| High|  Low|Close|  Volume|Adj CLose|  HV Ratio|
+-------------------+-----+-----+-----+-----+--------+---------+----------+
|2012-01-03 00:00:00|59.97|61.06|59.87|60.33|12668800|    52.62|0.00000482|
|2012-01-04 00:00:00|60.21|60.35|59.47|59.71| 9593300|    52.08|0.00000629|
|2012-01-05 00:00:00|59.35|59.62|58.37|59.42|12768200|    51.83|0.00000467|
|2012-01-06 00:00:00|59.42|59.45|58.87|59.00| 8069400|    51.46|0.00000737|
|2012-01-09 00:00:00|59.03|59.55|58.92|59.18| 6679300|    51.62|0.00000892|
|2012-01-10 00:00:00|59.43|59.71|58.98|59.04| 6907300|    51.49|0.00000864|
|2012-01-11 00:00:00|59.06|59.53|59.04|59.40| 6365600|    51.81|0.00000935|
|2012-01-12 00:00:00|59.79|60.00|59.40|59.50| 7236400|    51.90|0.00000829|
|2012-01-13 00:00:00|59.18|59.61|59.01|59.54| 7729300|    51.93|0.00000771|
|2012-01-17 00:00:00|59.87|60.11|59.52|59.85| 8500000|    52.20|0.00000707|
+-----------

### What day had the peak high in price

In [114]:
df_HVratio.orderBy(df['High'].desc()).head(1)[0][0]

datetime.datetime(2015, 1, 13, 0, 0)

### Find min and max of the HV Ratio

In [121]:
df_HVratio.select(min(df_HVratio['HV ratio']),max(df_HVratio['HV ratio'])).show()

+-------------+-------------+
|min(HV ratio)|max(HV ratio)|
+-------------+-------------+
|   0.00000084|   0.00003735|
+-------------+-------------+



### No of days close lower than 60 dollars

In [126]:
df_HVratio.filter(df['Close']<60).count()

81

### Percentage of the time high greater than 80 dollars

In [139]:
(df_HVratio.filter(df['High']>80).count()/df_HVratio.select(['High']).count()) * 100

9.141494435612083

### Pearson correlation between high and volume

In [142]:
df_HVratio.select(corr(df_HVratio['High'],df_HVratio['Volume'])).show()

+--------------------+
|  corr(High, Volume)|
+--------------------+
|-0.33843260582148915|
+--------------------+



### Max high per year

In [150]:
high_perYear = df_HVratio.groupBy(year(df['Date'])).agg({'High':'max'})

In [153]:
high_perYear.orderBy(high_perYear['year(Date)']).show()

+----------+---------+
|year(Date)|max(High)|
+----------+---------+
|      2012|    77.60|
|      2013|    81.37|
|      2014|    88.09|
|      2015|    90.97|
|      2016|    75.19|
+----------+---------+



### Average close for each calendar month

In [154]:
avg_perMonth = df_HVratio.groupBy(month(df_HVratio['Date'])).agg({'Close':'mean'})

In [155]:
avg_perMonth.orderBy(avg_perMonth['month(Date)']).show()

+-----------+-----------------+
|month(Date)|       avg(Close)|
+-----------+-----------------+
|          1|71.44801980198022|
|          2|71.30680412371134|
|          3|71.77794392523363|
|          4|72.97361904761907|
|          5|72.30971698113206|
|          6|72.49537735849057|
|          7|74.43971962616824|
|          8|73.02981818181819|
|          9|72.18411764705883|
|         10|71.57854545454546|
|         11|72.11108910891085|
|         12|72.84792452830189|
+-----------+-----------------+

