## start Spark

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load market data

In [82]:
from pyspark.sql.functions import regexp_replace, input_file_name
from pyspark.sql.types import (StructType, StructField, IntegerType,
                               StringType, FloatType)

# sample: 202004011320,Abe,Ajuste,Fec,Max,Min,Ofc,Ofv,QOfc,QOfv,QUlt,Ult,Grupo,
#         NNeg,Nome,QTot,Strike,VCTO,VolTot
# NOTE: for some reason, I should first import the data THEN cast it to the
#  correct types

schema = StructType([
    StructField("ticker", StringType(), True),
    StructField("open", StringType(), True),
    StructField("settle", StringType(), True),
    StructField("lclose", StringType(), True),
    StructField("max", StringType(), True),
    StructField("min", StringType(), True),
    StructField("bid", StringType(), True),
    StructField("ask", StringType(), True),
    StructField("qbid", StringType(), True),
    StructField("qask", StringType(), True),
    StructField("qlast", IntegerType(), True),
    StructField("last", StringType(), True),
    StructField("group", StringType(), True),
    StructField("ntrades", StringType(), True),
    StructField("name", StringType(), True),
    StructField("qtotal", StringType(), True),
    StructField("strike", StringType(), True),
    StructField("expdate", StringType(), True),
    StructField("totalvol", StringType(), True)])

def load_data(s_path2s3, this_schema=schema):
    # read csv
    df = (spark.read
              .option("header", "true")
              .schema(this_schema)
              .csv(s_path2s3)
              .withColumn("input_file", input_file_name()))

    # drop rows where ticker is null
    df = df.na.drop(subset=["ticker"])
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [84]:
# 202004011320_flashpanel.csv 
data_quotes = 's3a://data-from-quotes/20200411*_flashpanel.csv'
data_quotes = 's3a://data-from-quotes/*_flashpanel.csv'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [85]:
df = spark.read.csv(data_quotes)
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

918702

In [86]:
df = load_data(data_quotes)
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ticker: string (nullable = true)
 |-- open: string (nullable = true)
 |-- settle: string (nullable = true)
 |-- lclose: string (nullable = true)
 |-- max: string (nullable = true)
 |-- min: string (nullable = true)
 |-- bid: string (nullable = true)
 |-- ask: string (nullable = true)
 |-- qbid: string (nullable = true)
 |-- qask: string (nullable = true)
 |-- qlast: integer (nullable = true)
 |-- last: string (nullable = true)
 |-- group: string (nullable = true)
 |-- ntrades: string (nullable = true)
 |-- name: string (nullable = true)
 |-- qtotal: string (nullable = true)
 |-- strike: string (nullable = true)
 |-- expdate: string (nullable = true)
 |-- totalvol: string (nullable = true)
 |-- input_file: string (nullable = false)

## Exploring the data

In [5]:
(df.filter(df.ticker == 'PETR4')
   .select('ticker', 'bid', 'ask')
   .show(5))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-----+-----+
|ticker|  bid|  ask|
+------+-----+-----+
| PETR4|15,73|15,74|
| PETR4|15,68|15,69|
| PETR4|14,33|14,34|
| PETR4|15,83|15.84|
| PETR4|15,69| 15.7|
+------+-----+-----+
only showing top 5 rows

In [6]:
(df.filter(df.ticker == 'PETR4').count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

196

In [7]:
(df.filter(df.ticker == 'PETR4')
   .select('input_file')
   .show(5))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|          input_file|
+--------------------+
|s3a://data-from-q...|
|s3a://data-from-q...|
|s3a://data-from-q...|
|s3a://data-from-q...|
|s3a://data-from-q...|
+--------------------+
only showing top 5 rows

## Transforming the data

In [70]:
# create timestamp column from original timestamp column

import time
import numpy as np
from datetime import datetime
from pyspark.sql.functions import udf, col
from pyspark.sql.types import LongType, IntegerType, DateType, StringType, FloatType


get_ints_from_file = udf(lambda x: int(str(x).split('/')[-1].split('_')[0]), LongType())
df2 = df.withColumn('intdate', get_ints_from_file('input_file'))


get_datetime = udf(lambda x: datetime(*time.strptime(str(x), '%Y%m%d%H%S')[:6]), DateType())
df2 = df2.withColumn('datetime', get_datetime('intdate'))


def convert_str2float(x):
    if isinstance(x, str):
        if x == 'Abe':
            return np.nan
        if 'm' in x:
            l_nm = [float(xx + '0' * (3-len(xx))) for xx in x.split('m')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**5 + l_nm[1] * 1000
            return l_nm[0] * 10**5
        elif 'k' in x:
            l_nm = [float(xx + '0' * (3-len(xx))) for xx in x.split('k')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**3 + l_nm[1]
            return l_nm[0] * 10**3
        elif 'b' in x:
            l_nm = [float(xx + '0' * (3-len(xx))) for xx in x.split('b')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**7 + l_nm[1] * 10 ** 6
            return l_nm[0] * 10**7
        return float(x.replace(',', '.'))
    return x


# 28m938
# 223k
# 111k5
# 40m876
# 895k1

def convert_str2int(x):
    if isinstance(x, str):
        if 'm' in x:
            l_nm = [int(xx + '0' * (3-len(xx))) for xx in x.split('m')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**5 + l_nm[1] * 1000
            return l_nm[0] * 10**5
        elif 'k' in x:
            l_nm = [int(xx + '0' * (3-len(xx))) for xx in x.split('k')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**3 + l_nm[1]
            return l_nm[0] * 10**3
        elif 'b' in x:
            l_nm = [int(xx + '0' * (3-len(xx))) for xx in x.split('b')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**7 + l_nm[1] * 10 ** 6
            return l_nm[0] * 10**7
        return int(x.replace('.', ''))
    return x


get_str2float = udf(lambda x: convert_str2float(x), FloatType())
for s_s2cast in ['bid', 'ask', 'open', 'settle', 'lclose', 'max', 'min', 'last', 'strike', 'totalvol']:
    df2 = df2.withColumn(f'{s_s2cast}2', get_str2float(f'{s_s2cast}'))
    
get_str2int = udf(lambda x: convert_str2int(x), IntegerType())
for s_s2cast in ['qbid', 'qask', 'qlast', 'qtotal', 'ntrades']:
    df2 = df2.withColumn(f'{s_s2cast}2', get_str2int(f'{s_s2cast}'))

df2.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ticker: string (nullable = true)
 |-- open: string (nullable = true)
 |-- settle: string (nullable = true)
 |-- lclose: string (nullable = true)
 |-- max: string (nullable = true)
 |-- min: string (nullable = true)
 |-- bid: string (nullable = true)
 |-- ask: string (nullable = true)
 |-- qbid: string (nullable = true)
 |-- qask: string (nullable = true)
 |-- qlast: integer (nullable = true)
 |-- last: string (nullable = true)
 |-- group: string (nullable = true)
 |-- ntrades: string (nullable = true)
 |-- name: string (nullable = true)
 |-- qtotal: string (nullable = true)
 |-- strike: string (nullable = true)
 |-- expdate: string (nullable = true)
 |-- totalvol: string (nullable = true)
 |-- input_file: string (nullable = false)
 |-- intdate: long (nullable = true)
 |-- datetime: date (nullable = true)
 |-- bid2: float (nullable = true)
 |-- ask2: float (nullable = true)
 |-- open2: float (nullable = true)
 |-- settle2: float (nullable = true)
 |-- lclose2: float (nullable 

In [58]:
df2.select(['bid', 'ask', 'bid2', 'ask2', 'ticker']).limit(5).show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+-----+-----+--------+
|bid  |ask  |bid2 |ask2 |ticker  |
+-----+-----+-----+-----+--------+
|8.84 |8.86 |8.84 |8.86 |AALR3   |
|8.84 |8.87 |8.84 |8.87 |AALR3F  |
|24.79|24.86|24.79|24.86|ALSO3   |
|52.45|52.49|52.45|52.49|CPLE6   |
|3.0  |3.41 |3.0  |3.41 |B3SAR342|
+-----+-----+-----+-----+--------+

## Checking functions

In [71]:
x = '8,27'
print(x, convert_str2float(x), type(convert_str2float(x)))
x = '8.27'
print(x, convert_str2float(x), type(convert_str2float(x)))
x = 'Abe'
print(x, convert_str2float(x), type(convert_str2float(x)))
x = '350k4'
print(x, convert_str2int(x), type(convert_str2int(x)))
x = '223k'
print(x, convert_str2int(x), type(convert_str2int(x)))
x = '40m876'
print(x, convert_str2int(x), type(convert_str2int(x)))
x = '28m938'
print(x, convert_str2float(x), type(convert_str2float(x)))

x = '28m938'
print(x, convert_str2int(x), type(convert_str2int(x)))

x = '28m'
print(x, convert_str2int(x), type(convert_str2int(x)))

x = '28m'
print(x, convert_str2float(x), type(convert_str2float(x)))


x = '133k3'
print(x, convert_str2int(x), type(convert_str2int(x)))

x = '133k3'
print(x, convert_str2float(x), type(convert_str2float(x)))


x = '3b375'
print(x, convert_str2float(x), type(convert_str2float(x)))

x = '3b375'
print(x, convert_str2int(x), type(convert_str2int(x)))





VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8,27 8.27 <class 'float'>
8.27 8.27 <class 'float'>
Abe nan <class 'float'>
350k4 350400 <class 'int'>
223k 223000 <class 'int'>
40m876 40876000 <class 'int'>
28m938 28938000.0 <class 'float'>
28m938 28938000 <class 'int'>
28m 28000000 <class 'int'>
28m 28000000.0 <class 'float'>
133k3 133300 <class 'int'>
133k3 133300.0 <class 'float'>
3b375 3375000000.0 <class 'float'>
3b375 3375000000 <class 'int'>

## Testing the transformations

In [72]:
(df2.filter(df2.ticker == 'B3SAR342')
   .select('datetime', 'intdate', 'ticker', 'bid2', 'ask2')
   .sort(df2.intdate.asc())
   .show(5))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+--------+----+----+
|  datetime|     intdate|  ticker|bid2|ask2|
+----------+------------+--------+----+----+
|2020-04-01|202004011320|B3SAR342|3.49|3.94|
|2020-04-01|202004011330|B3SAR342|3.44|3.88|
|2020-04-01|202004011340|B3SAR342|3.69|4.15|
|2020-04-01|202004011350|B3SAR342|3.65|4.09|
|2020-04-01|202004011400|B3SAR342|3.66|4.11|
+----------+------------+--------+----+----+
only showing top 5 rows

In [73]:
(df2.filter(df2.ticker == 'PETR4')
   .select('datetime', 'intdate', 'ticker', 'bid2', 'ask2')
   .sort(df2.intdate.asc())
   .show(5))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+------+-----+-----+
|  datetime|     intdate|ticker| bid2| ask2|
+----------+------------+------+-----+-----+
|2020-04-01|202004011320| PETR4|13.85|13.86|
|2020-04-01|202004011330| PETR4|13.82|13.83|
|2020-04-01|202004011340| PETR4|13.76|13.78|
|2020-04-01|202004011350| PETR4|13.95|13.96|
|2020-04-01|202004011400| PETR4|13.99| 14.0|
+----------+------------+------+-----+-----+
only showing top 5 rows

## Create quotes fact table

In [74]:
from pyspark.sql.functions import (year, month, dayofmonth, hour, weekofyear,
                                   date_format, dayofweek)

quotes_table = df2.select(
        col('datetime').alias('date'),
        col('intdate').alias('intdate'),
        year('datetime').alias('year'),
        month('datetime').alias('month'),
        dayofmonth('datetime').alias('day'),
        col('ticker').alias('ticker'),
        col('lclose2').alias('last.close'),
        col('open2').alias('open'),
        col('max2').alias('max'),
        col('min2').alias('min'),
        col('last2').alias('last'),
        col('bid2').alias('bid'),
        col('ask2').alias('ask'),
        col('ntrades2').alias('num.trades'),
        col('qtotal2').alias('qty.total'),
        col('totalvol2').alias('total.volume'),
     )

quotes_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date: date (nullable = true)
 |-- intdate: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- ticker: string (nullable = true)
 |-- last.close: float (nullable = true)
 |-- open: float (nullable = true)
 |-- max: float (nullable = true)
 |-- min: float (nullable = true)
 |-- last: float (nullable = true)
 |-- bid: float (nullable = true)
 |-- ask: float (nullable = true)
 |-- num.trades: integer (nullable = true)
 |-- qty.total: integer (nullable = true)
 |-- total.volume: float (nullable = true)

In [75]:
quotes_table.limit(5).show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+----+-----+---+------+----------+-----+-----+-----+-----+-----+-----+----------+---------+------------+
|date      |intdate     |year|month|day|ticker|last.close|open |max  |min  |last |bid  |ask  |num.trades|qty.total|total.volume|
+----------+------------+----+-----+---+------+----------+-----+-----+-----+-----+-----+-----+----------+---------+------------+
|2020-04-07|202004071530|2020|4    |7  |AALR3 |8.92      |9.61 |9.77 |9.23 |9.38 |9.35 |9.38 |13910     |379100   |3.0603E7    |
|2020-04-07|202004071530|2020|4    |7  |AALR3F|8.92      |9.21 |9.75 |8.94 |9.37 |9.34 |9.38 |1950      |6050     |57.646      |
|2020-04-07|202004071530|2020|4    |7  |ALSO3 |25.74     |29.79|29.79|25.19|25.44|25.42|25.49|100390    |20840000 |7.524E7     |
|2020-04-07|202004071530|2020|4    |7  |CPLE6 |53.19     |56.22|58.3 |54.91|55.65|55.63|55.66|65350     |916000   |5.175E7     |
|2020-04-07|202004071530|2020|4    |7  |BBASQ3|6.81      |null |null |null |null |4.02 |5.0  |nul

In [76]:
quotes_table.filter(quotes_table.ticker == "PETR4").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+----+-----+---+------+----------+-----+-----+-----+-----+-----+-----+----------+---------+------------+
|      date|     intdate|year|month|day|ticker|last.close| open|  max|  min| last|  bid|  ask|num.trades|qty.total|total.volume|
+----------+------------+----+-----+---+------+----------+-----+-----+-----+-----+-----+-----+----------+---------+------------+
|2020-04-02|202004021600|2020|    4|  2| PETR4|      14.3| 15.4|16.55|15.01|15.74|15.73|15.74|   1977610| 22007000|3.37500006E9|
|2020-04-02|202004021610|2020|    4|  2| PETR4|      14.3| 15.4|16.55|15.01|15.68|15.68|15.69|   2010770| 21653000|     3.412E9|
|2020-04-01|202004011500|2020|    4|  1| PETR4|     13.99|13.45|14.37|13.32|14.34|14.33|14.34|   1082950| 81357000|      1.13E9|
|2020-04-02|202004021550|2020|    4|  2| PETR4|      14.3| 15.4|16.55|15.01|15.84|15.83|15.84|   1933530| 21393000|3.32199987E9|
|2020-04-01|202004011630|2020|    4|  1| PETR4|     13.99|13.45|14.55|13.32|14.12|14.11|14.12|   

In [79]:
output_data = 'quotes-from-spark'
s_s3_path = f"s3a://{output_data}/quotes.parquet"
quotes_table.write.mode('overwrite').partitionBy('year', 'month', 'day').parquet(s_s3_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Testing functions created to ETL script

In [22]:
from datetime import datetime
import time
import os
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import (udf, col, row_number, regexp_replace,
                                   input_file_name)
from pyspark.sql.functions import (year, month, dayofmonth, hour, weekofyear,
                                   date_format, dayofweek)
from pyspark.sql.types import (LongType, IntegerType, DateType, StringType,
                               FloatType, StructType, StructField)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [105]:
def convert_str2float(x):
    if isinstance(x, str):
        if x == 'Abe':
            return np.nan
        if 'm' in x:
            l_nm = [float(xx + '0' * (3-len(xx))) for xx in x.split('m')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**5 + l_nm[1] * 1000
            return l_nm[0] * 10**5
        elif 'k' in x:
            l_nm = [float(xx + '0' * (3-len(xx))) for xx in x.split('k')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**3 + l_nm[1]
            return l_nm[0] * 10**3
        elif 'b' in x:
            l_nm = [float(xx + '0' * (3-len(xx))) for xx in x.split('b')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**7 + l_nm[1] * 10 ** 6
            return l_nm[0] * 10**7
        return float(x.replace(',', '.'))
    return x


def convert_str2int(x):
    if isinstance(x, str):
        if 'm' in x:
            l_nm = [int(xx + '0' * (3-len(xx))) for xx in x.split('m')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**5 + l_nm[1] * 1000
            return l_nm[0] * 10**5
        elif 'k' in x:
            l_nm = [int(xx + '0' * (3-len(xx))) for xx in x.split('k')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**3 + l_nm[1]
            return l_nm[0] * 10**3
        elif 'b' in x:
            l_nm = [int(xx + '0' * (3-len(xx))) for xx in x.split('b')]
            if len(l_nm) > 1:
                return l_nm[0] * 10**7 + l_nm[1] * 10 ** 6
            return l_nm[0] * 10**7
        return int(x.replace('.', ''))
    return x

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
def load_data(spark, s_path2s3):

    # define schema
    print('loading {}...'.format(s_path2s3))
    this_schema = StructType([
        StructField("ticker", StringType(), True),
        StructField("open", StringType(), True),
        StructField("settle", StringType(), True),
        StructField("lclose", StringType(), True),
        StructField("max", StringType(), True),
        StructField("min", StringType(), True),
        StructField("bid", StringType(), True),
        StructField("ask", StringType(), True),
        StructField("qbid", StringType(), True),
        StructField("qask", StringType(), True),
        StructField("qlast", IntegerType(), True),
        StructField("last", StringType(), True),
        StructField("group", StringType(), True),
        StructField("ntrades", StringType(), True),
        StructField("name", StringType(), True),
        StructField("qtotal", StringType(), True),
        StructField("strike", StringType(), True),
        StructField("expdate", StringType(), True),
        StructField("totalvol", StringType(), True)])

    # read csv
    df = (spark.read
          .option("header", "true")
          .schema(this_schema)
          .csv(s_path2s3)
          .withColumn("input_file", input_file_name()))

    # drop rows where ticker is null
    df = df.na.drop(subset=["ticker"])
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
def process_quotes_data(spark, input_data, output_data):
    '''
    Porcess data from quotes files

    :param spark: Spark session object.
    :param input_data: string. S3 path data to transform
    :param output_data: string. S3 root path to store transformed data
    '''

    # read log data file
    df = load_data(spark, input_data)

    # create intdate column to help filter and sort data in the future
    get_ints_from_file = udf(lambda x: int(
        str(x).split('/')[-1].split('_')[0]), LongType())
    df = df.withColumn('intdate', get_ints_from_file('input_file'))

    # create datetime column
    get_datetime = udf(lambda x: datetime(
        *time.strptime(str(x), '%Y%m%d%H%S')[:6]), DateType())
    df = df.withColumn('datetime', get_datetime('intdate'))

    # convert strings to floats in finatial related fields
    get_str2float = udf(lambda x: convert_str2float(x), FloatType())
    for s_s2cast in ['bid', 'ask', 'open', 'settle', 'lclose', 'max', 'min',
                     'last', 'strike', 'totalvol']:
        # df = df.withColumn(f'{s_s2cast}2', get_str2float(f'{s_s2cast}'))
        df = df.withColumn(
            '{}2'.format(s_s2cast), get_str2float('{}'.format(s_s2cast)))

    # convert strings to integers in quantity related fields
    get_str2int = udf(lambda x: convert_str2int(x), IntegerType())
    for s_s2cast in ['qbid', 'qask', 'qlast', 'qtotal', 'ntrades']:
        # df = df.withColumn(f'{s_s2cast}2', get_str2int(f'{s_s2cast}'))
        df = df.withColumn(
            '{}2'.format(s_s2cast), get_str2int('{}'.format(s_s2cast)))

    # extract columns to create quotes table
    quotes_table = df.select(
            col('intdate').alias('intdate'),
            year('datetime').alias('year'),
            month('datetime').alias('month'),
            dayofmonth('datetime').alias('day'),
            col('ticker').alias('ticker'),
            col('lclose2').alias('last.close'),
            col('open2').alias('open'),
            col('max2').alias('max'),
            col('min2').alias('min'),
            col('last2').alias('last'),
            col('bid2').alias('bid'),
            col('ask2').alias('ask'),
            col('ntrades2').alias('num.trades'),
            col('qtotal2').alias('qty.total'),
            col('totalvol2').alias('total.volume'),
         )
#     write time table to parquet files partitioned by year and month

#     spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
#     data.toDF().write.mode("overwrite").format("parquet").partitionBy("date", "name").save("s3://path/to/somewhere")
    s_s3_path = "{}/quotes.parquet".format(output_data)
    print('.. writing into {}'.format(s_s3_path))
    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
    quotes_table.write.mode('overwrite').partitionBy(
        'year', 'month', 'day').parquet(s_s3_path)
    
    return quotes_table

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [79]:
# spark_etl.py -q data-from-quotes -op data-from-options -d 20200402 -o data-from-etl

class foo():
    quotes = 'data-from-quotes'
    options = 'data-from-options'
    output = 'data-from-etl'
    date = 20200402



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [80]:
args = foo()

s_input_quotes = 's3a://{}/{}*_flashpanel.csv'.format(
    args.quotes, args.date)
s_input_options = 's3a://{}/{}*_options.csv'.format(
    args.options, args.date)
s_output = 's3a://{}'.format(args.output)

s_err = 'Enter an S3 path to both input and output files'
assert s_output and s_input_quotes and s_input_options, s_err

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
input_quotes_data = s_input_quotes
input_options_data = s_input_options
output_data = s_output

quotes_table = process_quotes_data(spark, input_quotes_data, output_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

loading s3a://data-from-quotes/20200402*_flashpanel.csv...
s3a://data-from-etl/quotes.parquet

In [56]:
quotes_table.filter(quotes_table.ticker == "PETR4").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+----+-----+---+------+----------+-----+-----+-----+-----+-----+-----+----------+---------+------------+
|     intdate|year|month|day|ticker|last.close| open|  max|  min| last|  bid|  ask|num.trades|qty.total|total.volume|
+------------+----+-----+---+------+----------+-----+-----+-----+-----+-----+-----+----------+---------+------------+
|202004071640|2020|    4|  7| PETR4|     15.77|16.61|17.07|16.34|16.44|16.44|16.45|   1452530| 13584000|2.15299994E9|
|202004071550|2020|    4|  7| PETR4|     15.77|16.61|17.07| 16.5|16.52|16.52|16.54|   1295620| 11906000|1.94700006E9|
|202004071540|2020|    4|  7| PETR4|     15.77|16.61|17.07|16.55| 16.6| 16.6|16.61|   1255010| 12018000|      1.89E9|
|202004071510|2020|    4|  7| PETR4|     15.77|16.61|17.07|16.55|16.69|16.68|16.69|   1152040| 10580000|1.73100006E9|
|202004071600|2020|    4|  7| PETR4|     15.77|16.61|17.07| 16.5|16.57|16.57|16.58|   1322530| 11910000|     1.978E9|
+------------+----+-----+---+------+----------+-----+---

## Load options data

In [89]:
# s3://data-from-options/202004011340_options.csv
data_options = 's3a://data-from-options/*_options.csv'
data_options = 's3a://data-from-options/20200401*_options.csv'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# 2020-04-01 13:51:41.017496;Du;ExrcPric;OptnStyle;OptnTp;TradgEndDt;Underlying;bid_opton;
# bid_undly;ask_opton;ask_undly;sector;bs_ask;bs_bid;my_ask;my_bid;bid_vol_imp;ask_vol_imp;#
# bid_vol_cone;ask_vol_cone;bs_delta;new_bid_vol_imp;new_ask_vol_im

In [90]:
df = spark.read.option("sep", ";").option("header", "true").csv(data_options)
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- 2020-04-01 16:22:11.652497: string (nullable = true)
 |-- Du: string (nullable = true)
 |-- ExrcPric: string (nullable = true)
 |-- OptnStyle: string (nullable = true)
 |-- OptnTp: string (nullable = true)
 |-- TradgEndDt: string (nullable = true)
 |-- Underlying: string (nullable = true)
 |-- bid_opton: string (nullable = true)
 |-- bid_undly: string (nullable = true)
 |-- ask_opton: string (nullable = true)
 |-- ask_undly: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- bs_ask: string (nullable = true)
 |-- bs_bid: string (nullable = true)
 |-- my_ask: string (nullable = true)
 |-- my_bid: string (nullable = true)
 |-- bid_vol_imp: string (nullable = true)
 |-- ask_vol_imp: string (nullable = true)
 |-- bid_vol_cone: string (nullable = true)
 |-- ask_vol_cone: string (nullable = true)
 |-- bs_delta: string (nullable = true)
 |-- new_bid_vol_imp: string (nullable = true)
 |-- new_ask_vol_imp: string (nullable = true)

In [72]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

599694

In [73]:
from pyspark.sql.functions import regexp_replace, input_file_name
from pyspark.sql.types import (StructType, StructField, IntegerType,
                               StringType, FloatType)

def load_options(spark, s_path2s3):

    # define schema
    print('loading {}...'.format(s_path2s3))
    this_schema = StructType([
        StructField("ticker", StringType(), True),
        StructField("Du", StringType(), True),
        StructField("ExrcPric", StringType(), True),
        StructField("OptnStyle", StringType(), True),
        StructField("OptnTp", StringType(), True),
        StructField("TradgEndDt", StringType(), True),
        StructField("Underlying", StringType(), True),
        StructField("bid_opton", StringType(), True),
        StructField("bid_undly", StringType(), True),
        StructField("ask_opton", StringType(), True),
        StructField("ask_undly", StringType(), True),
        StructField("sector", StringType(), True),
        StructField("bs_ask", StringType(), True),
        StructField("bs_bid", StringType(), True),
        StructField("my_ask", StringType(), True),
        StructField("my_bid", StringType(), True),
        StructField("bid_vol_imp", StringType(), True),
        StructField("ask_vol_imp", StringType(), True),
        StructField("bid_vol_cone", StringType(), True),
        StructField("ask_vol_cone", StringType(), True),
        StructField("bs_delta", StringType(), True),
        StructField("new_bid_vol_imp", StringType(), True),
        StructField("new_ask_vol_imp", StringType(), True)
    ])

    # read csv
    df = (spark.read
          .option("sep", ";")
          .option("header", "true")
          .schema(this_schema)
          .csv(s_path2s3)
          .withColumn("input_file", input_file_name()))

    # drop rows where ticker is null
    df = df.na.drop(subset=["ticker"])
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [93]:
df = load_options(spark, data_options)
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

loading s3a://data-from-options/20200401*_options.csv...
root
 |-- ticker: string (nullable = true)
 |-- Du: string (nullable = true)
 |-- ExrcPric: string (nullable = true)
 |-- OptnStyle: string (nullable = true)
 |-- OptnTp: string (nullable = true)
 |-- TradgEndDt: string (nullable = true)
 |-- Underlying: string (nullable = true)
 |-- bid_opton: string (nullable = true)
 |-- bid_undly: string (nullable = true)
 |-- ask_opton: string (nullable = true)
 |-- ask_undly: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- bs_ask: string (nullable = true)
 |-- bs_bid: string (nullable = true)
 |-- my_ask: string (nullable = true)
 |-- my_bid: string (nullable = true)
 |-- bid_vol_imp: string (nullable = true)
 |-- ask_vol_imp: string (nullable = true)
 |-- bid_vol_cone: string (nullable = true)
 |-- ask_vol_cone: string (nullable = true)
 |-- bs_delta: string (nullable = true)
 |-- new_bid_vol_imp: string (nullable = true)
 |-- new_ask_vol_imp: string (nullable = true)
 

In [75]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

599694

## Exploring the data

In [95]:
(df.filter(df.Underlying == 'B3SA3')
   .select('ticker', 'Du', 'bid_opton', 'ask_opton', 'bid_undly', 'ask_undly', 'sector')
   .show(5))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---+---------+---------+---------+---------+------+
|  ticker| Du|bid_opton|ask_opton|bid_undly|ask_undly|sector|
+--------+---+---------+---------+---------+---------+------+
|B3SAR342| 50|  3,21000|  3,66000| 34,43000| 34,46000|     4|
| B3SAR35| 50|  3,80000|  4,27000| 34,43000| 34,46000|     4|
|B3SAR350| 50|  3,56000|  4,01000| 34,43000| 34,46000|     4|
|B3SAR352| 50|  3,68000|  4,14000| 34,43000| 34,46000|     4|
| B3SAR36| 50|  4,33000|  4,80000| 34,43000| 34,46000|     4|
+--------+---+---------+---------+---------+---------+------+
only showing top 5 rows

In [88]:
(df.filter(df.Underlying == 'PETR4')
   .select('input_file')
   .show(5))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|          input_file|
+--------------------+
|s3a://data-from-o...|
|s3a://data-from-o...|
|s3a://data-from-o...|
|s3a://data-from-o...|
|s3a://data-from-o...|
+--------------------+
only showing top 5 rows

In [100]:
df.schema.names

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['ticker', 'Du', 'ExrcPric', 'OptnStyle', 'OptnTp', 'TradgEndDt', 'Underlying', 'bid_opton', 'bid_undly', 'ask_opton', 'ask_undly', 'sector', 'bs_ask', 'bs_bid', 'my_ask', 'my_bid', 'bid_vol_imp', 'ask_vol_imp', 'bid_vol_cone', 'ask_vol_cone', 'bs_delta', 'new_bid_vol_imp', 'new_ask_vol_imp', 'input_file']

## Transforming the data and creating options fact table

In [99]:

from datetime import datetime
import time
import os
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import (udf, col, row_number, regexp_replace,
                                   input_file_name)
from pyspark.sql.functions import (year, month, dayofmonth, hour, weekofyear,
                                   date_format, dayofweek)
from pyspark.sql.types import (LongType, IntegerType, DateType, StringType,
                               FloatType, StructType, StructField)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [176]:
def process_options_data(spark, input_data, output_data):
    '''
    Porcess data from quotes files

    :param spark: Spark session object.
    :param input_data: string. S3 path data to transform
    :param output_data: string. S3 root path to store transformed data
    '''

    # read log data file
    df = load_options(spark, input_data)

    # create intdate column to help filter and sort data in the future
    get_ints_from_file = udf(lambda x: int(
        str(x).split('/')[-1].split('_')[0]), LongType())
    df = df.withColumn('intdate', get_ints_from_file('input_file'))

    # create datetime column
    get_datetime = udf(lambda x: datetime(
        *time.strptime(str(x), '%Y%m%d%H%S')[:6]), DateType())
    df = df.withColumn('datetime', get_datetime('intdate'))

    # convert strings to floats in finatial related fields
    get_str2float = udf(lambda x: convert_str2float(x), FloatType())
    for s_s2cast in ['ExrcPric', 'bid_opton', 'bid_undly', 'ask_opton', 'ask_undly',
                     'bs_ask', 'bs_bid', 'my_ask', 'my_bid', 'bid_vol_imp', 'ask_vol_imp',
                     'bid_vol_cone', 'ask_vol_cone', 'bs_delta', 'new_bid_vol_imp',
                     'new_ask_vol_imp']:
        # df = df.withColumn(f'{s_s2cast}2', get_str2float(f'{s_s2cast}'))
        df = df.withColumn(
            '{}2'.format(s_s2cast), get_str2float('{}'.format(s_s2cast)))

    # convert strings to integers in quantity related fields
    get_str2int = udf(lambda x: convert_str2int(x), IntegerType())
    for s_s2cast in ['Du',  'sector']:
        # df = df.withColumn(f'{s_s2cast}2', get_str2int(f'{s_s2cast}'))
        df = df.withColumn(
            '{}2'.format(s_s2cast), get_str2int('{}'.format(s_s2cast)))

    # extract columns to create quotes table
    options_table = df.select(
        col('intdate').alias('intdate'),
        year('datetime').alias('year'),
        month('datetime').alias('month'),
        dayofmonth('datetime').alias('day'),
        col('ticker').alias('ticker'),
        col('Underlying').alias('optn_ua'),  # underlying asset
        col('OptnStyle').alias('optn_style'),
        col('OptnTp').alias('optn_tp'),
        col('DU2').alias('optn_expn_days'),
        col('TradgEndDt').alias('optn_expn_date'),
        col('ExrcPric2').alias('optn_strike'),
        col('bs_delta2').alias('optn_bs_delta'),
        col('bid_opton2').alias('optn_mkt_bid'),
        col('bs_bid2').alias('optn_bs_bid'),
        col('my_bid2').alias('optn_mktbs_bid'),
        col('bid_undly2').alias('ua_mkt_bid'),
        col('ask_opton2').alias('optn_mkt_ask'),
        col('bs_ask2').alias('optn_bs_ask'),
        col('my_ask2').alias('optn_mktbs_ask'),
        col('ask_undly2').alias('ua_mkt_ask'),
        col('bid_vol_imp2').alias('bid_volimp'),
        col('bid_vol_cone2').alias('bid_volcone'),
        col('ask_vol_imp2').alias('ask_volimp'),
        col('ask_vol_cone2').alias('ask_volcone'),
     )
    # write time table to parquet files partitioned by year and month
    s_s3_path = "{}/options.parquet".format(output_data)
    print('.. writing into {}'.format(s_s3_path))
    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
    options_table.write.mode('overwrite').partitionBy(
        'year', 'month', 'day').parquet(s_s3_path)
    
    return options_table

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [177]:
class foo():
    quotes = 'data-from-quotes'
    options = 'data-from-options'
    output = 'data-from-etl'
    date = 20200402

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [178]:
s_input_quotes = 's3a://{}/{}*_flashpanel.csv'.format(
    args.quotes, args.date)
s_input_options = 's3a://{}/{}*_options.csv'.format(
    args.options, args.date)
s_output = 's3a://{}'.format(args.output)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [179]:
options_table = process_options_data(spark, s_input_options, s_output)
options_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

loading s3a://data-from-options/20200402*_options.csv...
.. writing into s3a://data-from-etl/options.parquet
root
 |-- intdate: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- ticker: string (nullable = true)
 |-- optn_ua: string (nullable = true)
 |-- optn_style: string (nullable = true)
 |-- optn_tp: string (nullable = true)
 |-- optn_expn_days: integer (nullable = true)
 |-- optn_expn_date: string (nullable = true)
 |-- optn_strike: float (nullable = true)
 |-- optn_bs_delta: float (nullable = true)
 |-- optn_mkt_bid: float (nullable = true)
 |-- optn_bs_bid: float (nullable = true)
 |-- optn_mktbs_bid: float (nullable = true)
 |-- ua_mkt_bid: float (nullable = true)
 |-- optn_mkt_ask: float (nullable = true)
 |-- optn_bs_ask: float (nullable = true)
 |-- optn_mktbs_ask: float (nullable = true)
 |-- ua_mkt_ask: float (nullable = true)
 |-- bid_volimp: float (nullable = true)
 |-- bid_volcone: 

In [180]:
(options_table.filter(options_table["optn_ua"] == 'B3SA3')
   .select('ticker', 'optn_ua', 'optn_expn_days', 'optn_expn_date', 'ua_mkt_bid', 'optn_mktbs_bid', 'optn_mktbs_ask')
   .show(5))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------+--------------+--------------+----------+--------------+--------------+
|  ticker|optn_ua|optn_expn_days|optn_expn_date|ua_mkt_bid|optn_mktbs_bid|optn_mktbs_ask|
+--------+-------+--------------+--------------+----------+--------------+--------------+
|B3SAR342|  B3SA3|            48|    2020-06-15|     35.97|          2.56|          2.92|
| B3SAR35|  B3SA3|            48|    2020-06-15|     35.97|          3.09|          3.45|
|B3SAR350|  B3SA3|            48|    2020-06-15|     35.97|          2.87|          3.23|
|B3SAR352|  B3SA3|            48|    2020-06-15|     35.97|          2.98|          3.34|
| B3SAR36|  B3SA3|            48|    2020-06-15|     35.97|          3.55|          3.92|
+--------+-------+--------------+--------------+----------+--------------+--------------+
only showing top 5 rows