In [1]:
from utils.quandl_data_fetcher import QuandlDataFetcher
from utils.spark_initializer import SparkInitializer
SparkInitializer.init_spark()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, avg, format_number
from pyspark.sql.types import StructType, StructField, DateType, DoubleType

In [2]:
spark = SparkSession.builder.appName('Quandl').getOrCreate()

### consume data from Quandl API

In [3]:
# ING GROEP N.V. stock price data in Euronext 
pdf = QuandlDataFetcher.fetch_data('EURONEXT/INGA')

In [4]:
pdf.head(10)

Unnamed: 0,Date,Open,High,Low,Last,Volume,Turnover
0,2014-02-14,10.52,10.64,10.515,10.6,11457475.0,121313500.0
1,2014-02-17,10.655,10.68,10.59,10.61,7394038.0,78533650.0
2,2014-02-18,10.65,10.65,10.515,10.605,11580469.0,122761600.0
3,2014-02-19,10.575,10.595,10.415,10.475,11410268.0,119646300.0
4,2014-02-20,10.3,10.525,10.255,10.525,12632060.0,131836300.0
5,2014-02-21,10.6,10.625,10.5,10.545,11276993.0,118934200.0
6,2014-02-24,10.5,10.615,10.47,10.615,10083180.0,106631900.0
7,2014-02-25,10.605,10.67,10.51,10.665,10274855.0,109056800.0
8,2014-02-26,10.68,10.69,10.51,10.525,8740996.0,92497150.0
9,2014-02-27,10.56,10.57,10.35,10.505,12037920.0,125965200.0


### transform Pandas DataFrame into Spark DataFrame

In [5]:
stock_schema = StructType([StructField('Date', DateType(), True),
                           StructField('Open', DoubleType(), True),
                           StructField('High', DoubleType(), True),
                           StructField('Low', DoubleType(), True),
                           StructField('Last', DoubleType(), True),
                           StructField('Volumn', DoubleType(), True),
                           StructField('Turnover', DoubleType(), True), ])

In [6]:
sdf = spark.createDataFrame(pdf, schema=stock_schema)
sdf.show(10)

+----------+------+------+------+------+-----------+---------------+
|      Date|  Open|  High|   Low|  Last|     Volumn|       Turnover|
+----------+------+------+------+------+-----------+---------------+
|2014-02-14| 10.52| 10.64|10.515|  10.6|1.1457475E7| 1.2131352577E8|
|2014-02-17|10.655| 10.68| 10.59| 10.61|  7394038.0|  7.853364632E7|
|2014-02-18| 10.65| 10.65|10.515|10.605|1.1580469E7| 1.2276160587E8|
|2014-02-19|10.575|10.595|10.415|10.475|1.1410268E7| 1.1964634487E8|
|2014-02-20|  10.3|10.525|10.255|10.525| 1.263206E7| 1.3183630136E8|
|2014-02-21|  10.6|10.625|  10.5|10.545|1.1276993E7| 1.1893421816E8|
|2014-02-24|  10.5|10.615| 10.47|10.615| 1.008318E7|1.06631915945E8|
|2014-02-25|10.605| 10.67| 10.51|10.665|1.0274855E7| 1.0905679842E8|
|2014-02-26| 10.68| 10.69| 10.51|10.525|  8740996.0|  9.249715499E7|
|2014-02-27| 10.56| 10.57| 10.35|10.505| 1.203792E7| 1.2596515846E8|
+----------+------+------+------+------+-----------+---------------+
only showing top 10 rows



In [7]:
sdf.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Last: double (nullable = true)
 |-- Volumn: double (nullable = true)
 |-- Turnover: double (nullable = true)



In [8]:
sdf.describe().show()

+-------+------------------+------------------+------------------+------------------+--------------------+--------------------+
|summary|              Open|              High|               Low|              Last|              Volumn|            Turnover|
+-------+------------------+------------------+------------------+------------------+--------------------+--------------------+
|  count|              1445|              1445|              1445|              1445|                1445|                1445|
|   mean|12.205299653979235|12.324435294117649|12.067369550173012|12.195552941176468|1.7261211542560555E7|2.0791255484968305E8|
| stddev|2.0229024862829093|2.0193729621124454|2.0247483405506155|2.0242312157165845|   7576531.360696883|  8.86876913860459E7|
|    min|              8.35|             8.471|             8.197|             8.335|           1730312.0|       2.023141016E7|
|    max|             16.64|            16.692|            16.516|            16.666|         9.7104846E

### compares SQL query approach and the built-in function

1. filter by conditions

In [9]:
# filter using built-in functions
spk_rslt = sdf.filter('high > 16.5' and 'open > 16.6')
spk_rslt.show()

+----------+-----+-----+------+------+-----------+---------------+
|      Date| Open| High|   Low|  Last|     Volumn|       Turnover|
+----------+-----+-----+------+------+-----------+---------------+
|2018-01-12|16.63|16.69|16.494|16.612|1.4569804E7|2.41759308908E8|
|2018-01-23|16.64|16.65|16.206|16.364|2.1670624E7|3.55128283566E8|
+----------+-----+-----+------+------+-----------+---------------+



In [10]:
# better way to apply muiltiple conditions
condition1 = sdf['High'] > 16.5
condition2 = sdf['Open'] > 16.6
py_rslt = sdf.filter(condition1 & condition2)
py_rslt.show()

+----------+-----+-----+------+------+-----------+---------------+
|      Date| Open| High|   Low|  Last|     Volumn|       Turnover|
+----------+-----+-----+------+------+-----------+---------------+
|2018-01-12|16.63|16.69|16.494|16.612|1.4569804E7|2.41759308908E8|
|2018-01-23|16.64|16.65|16.206|16.364|2.1670624E7|3.55128283566E8|
+----------+-----+-----+------+------+-----------+---------------+



In [11]:
# filter using plain SQL query
sdf.createOrReplaceTempView('temp1')
sql_rslt = spark.sql('select * from temp1 where high > 16.5 and open > 16.6')
sql_rslt.show()

+----------+-----+-----+------+------+-----------+---------------+
|      Date| Open| High|   Low|  Last|     Volumn|       Turnover|
+----------+-----+-----+------+------+-----------+---------------+
|2018-01-12|16.63|16.69|16.494|16.612|1.4569804E7|2.41759308908E8|
|2018-01-23|16.64|16.65|16.206|16.364|2.1670624E7|3.55128283566E8|
+----------+-----+-----+------+------+-----------+---------------+



2. Aggregation Function - Group By

In [12]:
# extract Year out of datetime
sdf1 = sdf.withColumn('Year', year(sdf['Date']))
sdf1.show(10)

+----------+------+------+------+------+-----------+---------------+----+
|      Date|  Open|  High|   Low|  Last|     Volumn|       Turnover|Year|
+----------+------+------+------+------+-----------+---------------+----+
|2014-02-14| 10.52| 10.64|10.515|  10.6|1.1457475E7| 1.2131352577E8|2014|
|2014-02-17|10.655| 10.68| 10.59| 10.61|  7394038.0|  7.853364632E7|2014|
|2014-02-18| 10.65| 10.65|10.515|10.605|1.1580469E7| 1.2276160587E8|2014|
|2014-02-19|10.575|10.595|10.415|10.475|1.1410268E7| 1.1964634487E8|2014|
|2014-02-20|  10.3|10.525|10.255|10.525| 1.263206E7| 1.3183630136E8|2014|
|2014-02-21|  10.6|10.625|  10.5|10.545|1.1276993E7| 1.1893421816E8|2014|
|2014-02-24|  10.5|10.615| 10.47|10.615| 1.008318E7|1.06631915945E8|2014|
|2014-02-25|10.605| 10.67| 10.51|10.665|1.0274855E7| 1.0905679842E8|2014|
|2014-02-26| 10.68| 10.69| 10.51|10.525|  8740996.0|  9.249715499E7|2014|
|2014-02-27| 10.56| 10.57| 10.35|10.505| 1.203792E7| 1.2596515846E8|2014|
+----------+------+------+------+-----

In [13]:
# average yearly closing price using built-in function
sdf1.groupBy('Year').mean('Last').alias('avg').show()

+----+------------------+
|Year|         avg(Last)|
+----+------------------+
|2018|12.720397637795275|
|2015|13.368586065573764|
|2014|10.546440191387559|
|2019|10.193730434782614|
|2016|11.072203124999994|
|2017|14.866706349206348|
+----+------------------+



In [14]:
# average yearly closing price using plain SQL query
sdf1.createOrReplaceTempView('temp2')
spark.sql('select year, avg(last) from temp2 group by year').show()

+----+------------------+
|year|         avg(last)|
+----+------------------+
|2018|12.720397637795275|
|2015|13.368586065573764|
|2014|10.546440191387559|
|2019|10.193730434782614|
|2016|11.072203124999994|
|2017|14.866706349206348|
+----+------------------+

