In [2]:
## Sample application using Python/PySpark, Data Frames 
## Spark SQL on Sample stock_sales date set.
 
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import (format_number, mean, max,min, 
                                   corr, year, month, count)
from pyspark.sql.types import (StructField, StringType,
                               IntegerType, StructType)

## Create Spark Session
sparkSession = SparkSession.builder.appName('AurumApplication').getOrCreate()

## Load the input CSV file into Data Frame
stockSalesDF = sparkSession.read.csv('C:/Users/nagaj/Desktop/AurumDS/stock_sales.csv', inferSchema=True, header=True)

## Describe the Stock Sales date set - in proper format
descStockSalesDF = stockSalesDF.describe()
descStockSalesDF.select(descStockSalesDF['summary'],
              format_number(descStockSalesDF['Open'].cast('float'),2).alias('Open'),
              format_number(descStockSalesDF['High'].cast('float'),2).alias('High'),
              format_number(descStockSalesDF['Low'].cast('float'),2).alias('Low'),
              format_number(descStockSalesDF['Close'].cast('float'),2).alias('Close'),
              descStockSalesDF['Volume'].cast('int').alias('Volume')
             ).show()



+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



In [6]:
## Calculate High Price versus Volume ratio of stock traded for each day.
stockSalesDF.select((stockSalesDF["High"]/stockSalesDF["Volume"]).alias('High vs Volume ratio')).show()

+--------------------+
|High vs Volume ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [7]:
## Find highest stock sale price record...
highPriceRecord = stockSalesDF.orderBy(stockSalesDF["High"].desc()).head(1)[0]
print(highPriceRecord)

Row(Date=datetime.datetime(2015, 1, 13, 0, 0), Open=90.800003, High=90.970001, Low=88.93, Close=89.309998, Volume=8215400, Adj Close=83.825448)


In [8]:
## Overall average of High price
stockSalesDF.select(mean("High").alias('Average High')).show()

+-----------------+
|     Average High|
+-----------------+
|72.83938807631165|
+-----------------+



In [9]:
## Over all Maximum price and Minimum price.
stockSalesDF.select(max("High").alias('Over all High'),min("Low").alias('Over all Low')).show()

+-------------+------------+
|Over all High|Over all Low|
+-------------+------------+
|    90.970001|   56.299999|
+-------------+------------+



In [113]:
## How many days sales 'Close' is more than $60
stockSalesDF.filter(stockSalesDF['Close'] > 60).select(count('Close').alias('No. of days Close more than $60')).show()

+-------------------------------+
|No. of days Close more than $60|
+-------------------------------+
|                           1177|
+-------------------------------+



In [117]:
## Find % of the time was the High greater than 80 dollars
(stockSalesDF.filter(stockSalesDF["High"]>80).count()*1.0/stockSalesDF.count())*100

9.141494435612083

In [10]:
## Correlation between High and Volume
stockSalesDF.select(corr("High","Volume").alias("Correlation between High and Volume")).show()

+-----------------------------------+
|Correlation between High and Volume|
+-----------------------------------+
|                -0.3384326061737161|
+-----------------------------------+



In [11]:
## Find max High per year
stockSalesPerYear = stockSalesDF.withColumn("Year",year(stockSalesDF["Date"]))
maxStockSalesYear = stockSalesPerYear.groupBy('Year').max()
maxStockSalesYear.select('Year','max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [12]:
## Calculating average Close for each month.
monthCol = stockSalesDF.withColumn("Month",month("Date"))
monthSalesAvg = monthCol.select("Month","Close").groupBy("Month").mean()
monthSalesAvg.select("Month","avg(Close)").orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

