In [1]:
# Installing pyspark
# ---
#
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 61.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=d5f4167f82971e1e742acf0cacf34064e984b4e4a25db442da82a92382b23c94
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


### 1. Data Importation and Exploration

* Start a spark session and load the stock file while inferring the data types.
* Determine the column names
* Make observations about the schema.
* Show the first 5 rows
* Use the describe method to learn about the data frame

In [2]:
# Next, we run a local spark session
# ---
#
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

In [3]:
# Start a spark session and load the stock file while inferring the data types.
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)

stocks = sqlCtx.read.options(header=True,inferSchema=True).csv('saf_stock.csv')



In [4]:
# Determine the column names
stocks.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



The SQL context automatically determined the data types of the columns when importing the data

In [5]:
# Show the first 5 rows
stocks.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [6]:
# Use the describe method to learn about the data frame
stocks.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

* The dataset contains 1,258 records
* Opening stock prices ranged from 56.39 to 90.8 with the average across the period being 72.34
* The highest prices per day fluctuated between 57.06 and 90.97 with the average across the period being 72.84
* The lowest prices per day dangled between 56.3 and 89.25 with the average across the period being 71.92

### 2. Data Preparation

* Format all the data to 2 decimal places i.e. format_number()
* Create a new data frame with a column called HV Ratio that is the ratio of the
High Price versus volume of stock traded for a day

In [7]:
# Format all the data to 2 decimal places i.e. format_number()
stocks_formated = stocks.withColumn('Open',F.format_number('Open',2).cast(DoubleType()))\
                        .withColumn('High',F.format_number('High',2).cast(DoubleType()))\
                        .withColumn('Low',F.format_number('Low',2).cast(DoubleType()))\
                        .withColumn('Close',F.format_number('Close',2).cast(DoubleType()))\
                        .withColumn('Adj Close',F.format_number('Adj Close',2).cast(DoubleType()))

stocks_formated.show(5)

+----------+-----+-----+-----+-----+--------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|
+----------+-----+-----+-----+-----+--------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400|    51.46|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 5 rows



In [8]:
stocks_formated.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [9]:
# Create a new data frame with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day
stocks_hv = stocks_formated.withColumn('HV Ratio', F.format_number(F.col('High')/F.col('Volume'),10))
stocks_hv.show(5)

+----------+-----+-----+-----+-----+--------+---------+------------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|    HV Ratio|
+----------+-----+-----+-----+-----+--------+---------+------------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|0.0000048197|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|0.0000062908|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|0.0000046694|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400|    51.46|0.0000073673|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|0.0000089156|
+----------+-----+-----+-----+-----+--------+---------+------------+
only showing top 5 rows



### 3. Data Analysis

* What day had the Peak High in Price?
* What is the mean of the Close column?
* What is the max and min of the Volume column?
* How many days was the Close lower than 60 dollars?
* What percentage of the time was the High greater than 80 dollars?
* What is the Pearson correlation between High and Volume?
* What is the max High per year?
* What is the average Close for each Calendar Month?

In [10]:
stocks_hv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- HV Ratio: string (nullable = true)



In [11]:
# What day had the Peak High in Price?
peak_high = 	stocks_hv.agg({'High': 'max'}).collect()[0][0]
stocks_hv[stocks_hv['High'] == peak_high].show()

+----------+----+-----+-----+-----+-------+---------+------------+
|      Date|Open| High|  Low|Close| Volume|Adj Close|    HV Ratio|
+----------+----+-----+-----+-----+-------+---------+------------+
|2015-01-13|90.8|90.97|88.93|89.31|8215400|    83.83|0.0000110731|
+----------+----+-----+-----+-----+-------+---------+------------+



In [12]:
# What is the mean of the Close column?
mean_close = 	stocks_hv.agg({'Close': 'mean'}).collect()[0][0]
mean_close

72.38844992050863

In [13]:
# What is the max and min of the Volume column?
max_vol = 	stocks_hv.agg({'Volume': 'max'}).collect()[0][0]
min_vol = 	stocks_hv.agg({'Volume': 'min'}).collect()[0][0]
print('Maximum Volume:',max_vol,'\nMinimum Volume:',min_vol)

Maximum Volume: 80898100 
Minimum Volume: 2094900


In [14]:
# How many days was the Close lower than 60 dollars?
close_less_60 = stocks_hv[stocks_hv['Close'] < 60]
print(close_less_60.count(),'days')
close_less_60.show(5)

81 days
+----------+-----+-----+-----+-----+--------+---------+------------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|    HV Ratio|
+----------+-----+-----+-----+-----+--------+---------+------------+
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08|0.0000062908|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|0.0000046694|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400|    51.46|0.0000073673|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62|0.0000089156|
|2012-01-10|59.43|59.71|58.98|59.04| 6907300|    51.49|0.0000086445|
+----------+-----+-----+-----+-----+--------+---------+------------+
only showing top 5 rows



In [15]:
# What percentage of the time was the High greater than 80 dollars?
total_days = stocks_hv.count()
high_greater_80 = stocks_hv[stocks_hv['High'] > 80]
days_high_greater_80 = high_greater_80.count()
print('Total days:',total_days,'\nNumber of days High was greater than 80:',days_high_greater_80)
print('Percentage of days High was greater than 80: {}%'.format((days_high_greater_80/total_days) * 100))
high_greater_80.show(5)

Total days: 1258 
Number of days High was greater than 80: 115
Percentage of days High was greater than 80: 9.141494435612083%
+----------+-----+-----+-----+-----+-------+---------+------------+
|      Date| Open| High|  Low|Close| Volume|Adj Close|    HV Ratio|
+----------+-----+-----+-----+-----+-------+---------+------------+
|2013-11-25|80.06|80.57|79.91|80.43|5670400|    73.22|0.0000142089|
|2013-11-26|80.44|80.68|80.11|80.68|5537800|    73.45|0.0000145690|
|2013-11-27|80.55| 81.0|80.38|80.93|4813300|    73.67|0.0000168284|
|2013-11-29|81.17|81.35|80.82|81.01|3447200|    73.75|0.0000235989|
|2013-12-02|80.89|81.28|80.37|81.11|6178400|    73.84|0.0000131555|
+----------+-----+-----+-----+-----+-------+---------+------------+
only showing top 5 rows



In [16]:
# What is the Pearson correlation between High and Volume?
stocks_hv.corr('High','Volume')

-0.33843260582148915

In [17]:
# What is the max High per year?
stocks_hv.groupby(F.date_format('Date','yyyy').alias('Year')).agg({'High': 'max'}).sort('Year').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2012|     77.6|
|2013|    81.37|
|2014|    88.09|
|2015|    90.97|
|2016|    75.19|
+----+---------+



In [18]:
# What is the average Close for each Calendar Month?
stocks_hv.groupby(F.date_format('Date','MM').alias('Month')).agg({'Close': 'mean'}).sort('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|   01|71.44801980198022|
|   02|71.30680412371134|
|   03|71.77794392523363|
|   04|72.97361904761907|
|   05|72.30971698113206|
|   06|72.49537735849057|
|   07|74.43971962616824|
|   08|73.02981818181819|
|   09|72.18411764705883|
|   10|71.57854545454546|
|   11|72.11108910891085|
|   12|72.84792452830189|
+-----+-----------------+

