# Market Data ETL & Query

This notebook demonstrates output from a toy implementation, it shows market data load required for Black 76 Options pricing for `BRENT`, `WTI` & `HH` (from multiple exchanges)

We use the `RESTClient` to persist and query market data, `LocalClient` can also be used for local development - new data ETL can be easily created by extending

`market.etl_adapter.BaseETLAdapter`

and implementing the `transform(self, url: str, params: dict)` method

Or you can simply munge your market data and use the `RestClient.save` endpoint

Below we see the various data adapter implementations to support ETL across a few exchanges:
- Some raw market files are held locally given account access restrictions etc
- Use of concurrent compute is used below for the Quandl OWF Implied Vol Models, given abundance of files required - in reality this would be implemented in a production setting using a robust DAG scheduler e.g. Luigi, Apache Airflow etc

A rudimentary symbology is assumed, as described in the Design/Architecture notes, this is an area for further work

In [1]:
import os
import sys
sys.path.insert(0, os.path.abspath('../'))

from analytics.constants import CONTRACT_EXCHANGE_MAP, FUTURES_DELIVERY_MAP
from api import RestClient as c
from market.etl_option_expiries import CMEOptionExpiriesAdapter, ICEOptionExpiriesAdapter
from market.etl_yield_curve import FEDUSTAdapter

***** Shell Trading API *****
TRADE_DATE = 2022-12-09
MARKET_DATE = 2022-12-09
ROOT_PATH = C:\Users\magicmonk\PycharmProjects\shell
MARKET_DATA_PATH = C:\Users\magicmonk\PycharmProjects\shell\data
REST_API_URL = http://127.0.0.1:5000


A separate abstracted Data persist/query API exists for use by ETL adapters - data is saved as parquet files for this toy implementation, it is trivial to extend this to support various types of SQL/NoSql datastores etc

See `market.datastore_adapter.DataAPI` for more details

In [2]:
# ICE EU Style Brent Option Expiries
ICEOptionExpiriesAdapter(
    symbol='CALENDAR_OPTION_BRENT', url='ProductSpecExpiryDates_BRENT_OPTIONS.csv'
).run(save=True).head()

Symbol CALENDAR_OPTION_BRENT saved successfully.


Unnamed: 0,EXPIRATION_DATE
G2023,2022-12-22
H2023,2023-01-26
J2023,2023-02-23
K2023,2023-03-28
M2023,2023-04-25


You can see the transform code like so ICEOptionExpiriesAdapter.transform?? e.g.

Signature: ICEOptionExpiriesAdapter.transform(self, url: str, params: dict) -> pandas.core.frame.DataFrame
Docstring: <no docstring>
Source:   
    def transform(self, url: str, params: dict) -> pd.DataFrame:
        assert url

        dataframe = pd.read_csv(url)
        dataframe.index = [
            ''.join((FUTURES_DELIVERY_MAP[contract[2:5].upper()], '20', contract[5:-1]))
            for contract in dataframe.index
        ]
        dataframe = dataframe[['OPTIONS FTD']]
        dataframe.columns = ['EXPIRATION_DATE']
        dataframe['EXPIRATION_DATE'] = dataframe['EXPIRATION_DATE'].astype(np.datetime64)

        return dataframe
File:      c:\users\magicmonk\pycharmprojects\shell\market\etl_option_expiries.py
Type:      function

We can use the RestClient API to query the saved data like so...

In [3]:
c.data(symbol='CALENDAR_OPTION_BRENT').head()

Unnamed: 0,EXPIRATION_DATE
G2023,2022-12-22
H2023,2023-01-26
J2023,2023-02-23
K2023,2023-03-28
M2023,2023-04-25


In [4]:
# ICE EU Style WTI Option Expiries
ICEOptionExpiriesAdapter(
    symbol='CALENDAR_OPTION_WTI', url='ProductSpecExpiryDates_WTI_OPTIONS.csv'
).run(save=True).head()

Symbol CALENDAR_OPTION_WTI saved successfully.


Unnamed: 0,EXPIRATION_DATE
F2023,2022-12-15
G2023,2023-01-17
H2023,2023-02-15
J2023,2023-03-16
K2023,2023-04-17


In [5]:
# CME EU Style HH Option Expiries
CMEOptionExpiriesAdapter(
    symbol='CALENDAR_OPTION_HH', url='product-calendar_HH_OPTIONS.xls'
).run(save=True).head()

Symbol CALENDAR_OPTION_HH saved successfully.


Unnamed: 0,EXPIRATION_DATE
F2023,2022-12-27
G2023,2023-01-26
H2023,2023-02-23
J2023,2023-03-28
K2023,2023-04-25


In [6]:
# Fed Reserve US Treasury Constant Maturity
FEDUSTAdapter(
    symbol='RIFLGFC', url='FRB_H15.csv'
).run(save=True).head()

Symbol RIFLGFC saved successfully.


Unnamed: 0_level_0,RIFLGFCM01_N.B,RIFLGFCM03_N.B,RIFLGFCM06_N.B,RIFLGFCY01_N.B,RIFLGFCY02_N.B,RIFLGFCY03_N.B,RIFLGFCY05_N.B,RIFLGFCY07_N.B,RIFLGFCY10_N.B,RIFLGFCY20_N.B,RIFLGFCY30_N.B
Time Period,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
1962-01-02,,,,0.031944,,0.036662,0.038428,,0.040193,0.040291,
1962-01-03,,,,0.03214,,0.036662,0.03833,,0.039899,0.040291,
1962-01-04,,,,0.03214,,0.036564,0.038232,,0.039507,0.040193,
1962-01-05,,,,0.032337,,0.03676,0.038526,,0.039801,0.040291,
1962-01-08,,,,0.032829,,0.03676,0.038723,,0.039899,0.04039,


In [7]:
# Quandl OWF Option Implied Vols
CONTRACT_EXCHANGE_MAP

{('BRENT', 'ICE'): ('B', 'B', 1000),
 ('WTI', 'NYM'): ('CL', 'CL', 1000),
 ('WTI', 'ICE'): ('T', 'T', 1000),
 ('HH', 'NYM'): ('NG', 'NG', 10000)}

Here we make use of `ray` multi-processing to speed up market data ETL for OptionWorks Futures (OWF) Implied Vol Model data

In [9]:
import ray

ray.init()

2022-12-17 01:55:36,573	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8266 [39m[22m


0,1
Python version:,3.9.15
Ray version:,2.2.0
Dashboard:,http://127.0.0.1:8266


In [10]:
%%time

@ray.remote
def load_quandl_owf_implied_vols(
    contract: str,
    exchange_code: str,
    futures_code: str,
    options_code: str,
    year: str,
    month: str,
    url: str,
    save: bool,
    expiration: str,
    python_path: str
):
    assert contract
    assert exchange_code
    assert futures_code
    assert options_code
    assert year
    assert month
    assert url
    assert expiration
    assert python_path

    import sys
    sys.path.insert(0, os.path.abspath(python_path))
    
    from market.etl_implied_vols import QuandlOWFImpliedVolsAdapter
    
    params = {
        'EXCHANGE_CODE': exchange_code,
        'FUTURES_CODE': futures_code,
        'OPTIONS_CODE': options_code,
        'YEAR': year,
        'MONTH': month
    }
    
    symbol = f'{contract}_{exchange_code}_{futures_code}_{options_code}_{expiration}_IVM'

    try:
        return QuandlOWFImpliedVolsAdapter(
            symbol=symbol,
            url=url,
            params=params
        ).run(save=save)
    except:
        return

ray_ids = []
url = f'https://data.nasdaq.com/api/v3/datasets/OWF'
python_path = os.path.sep.join((os.getcwd().split(os.path.sep)[:-1]))
for (contract, exchange_code), (futures_code, options_code, __) in CONTRACT_EXCHANGE_MAP.items():

    for year in ('2022', '2023', '2024', '2025', '2026', '2027', '2028', '2029', '2030'):

        for month in ('JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'):
            
            expiration = f'{FUTURES_DELIVERY_MAP[month]}{year}'
            ray_ids.append(
                load_quandl_owf_implied_vols.remote(
                    contract=contract,
                    exchange_code=exchange_code,
                    futures_code=futures_code,
                    options_code=options_code,
                    year=year,
                    month=month,
                    url=url,
                    save=True,
                    expiration=expiration,
                    python_path=python_path
                )
            )

dataframes = ray.get(ray_ids)

[2m[36m(load_quandl_owf_implied_vols pid=23848)[0m ***** Shell Trading API *****
[2m[36m(load_quandl_owf_implied_vols pid=23848)[0m TRADE_DATE = 2022-12-09
[2m[36m(load_quandl_owf_implied_vols pid=23848)[0m MARKET_DATE = 2022-12-09
[2m[36m(load_quandl_owf_implied_vols pid=23848)[0m ROOT_PATH = C:\Users\magicmonk\PycharmProjects\shell
[2m[36m(load_quandl_owf_implied_vols pid=23848)[0m MARKET_DATA_PATH = C:\Users\magicmonk\PycharmProjects\shell\data
[2m[36m(load_quandl_owf_implied_vols pid=23848)[0m REST_API_URL = http://127.0.0.1:5000
[2m[36m(load_quandl_owf_implied_vols pid=4960)[0m ***** Shell Trading API *****
[2m[36m(load_quandl_owf_implied_vols pid=4960)[0m TRADE_DATE = 2022-12-09
[2m[36m(load_quandl_owf_implied_vols pid=4960)[0m MARKET_DATE = 2022-12-09
[2m[36m(load_quandl_owf_implied_vols pid=4960)[0m ROOT_PATH = C:\Users\magicmonk\PycharmProjects\shell
[2m[36m(load_quandl_owf_implied_vols pid=4960)[0m MARKET_DATA_PATH = C:\Users\magicmonk\Pycharm

[2m[36m(load_quandl_owf_implied_vols pid=8176)[0m Symbol BRENT_ICE_B_B_Q2022_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=10460)[0m Symbol BRENT_ICE_B_B_G2022_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=20504)[0m Symbol BRENT_ICE_B_B_X2022_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=20112)[0m Symbol BRENT_ICE_B_B_K2022_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=4588)[0m Symbol BRENT_ICE_B_B_N2022_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=14908)[0m Symbol BRENT_ICE_B_B_G2023_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=6728)[0m Symbol BRENT_ICE_B_B_M2022_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=4960)[0m Symbol BRENT_ICE_B_B_F2023_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=1368)[0m Symbol BRENT_ICE_B_B_H2023_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=7304)[0m Symbol 

[2m[36m(load_quandl_owf_implied_vols pid=23848)[0m Symbol WTI_NYM_CL_CL_F2024_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=4960)[0m Symbol WTI_NYM_CL_CL_X2023_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=20504)[0m Symbol WTI_NYM_CL_CL_V2023_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=10460)[0m Symbol WTI_NYM_CL_CL_U2023_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=8176)[0m Symbol WTI_NYM_CL_CL_M2023_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=9316)[0m Symbol WTI_NYM_CL_CL_V2024_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=6728)[0m Symbol WTI_NYM_CL_CL_F2025_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=6244)[0m Symbol WTI_NYM_CL_CL_N2024_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=22580)[0m Symbol WTI_NYM_CL_CL_Q2024_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=12628)[0m Symbol

[2m[36m(load_quandl_owf_implied_vols pid=7304)[0m Symbol WTI_ICE_T_T_V2026_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=12160)[0m Symbol WTI_ICE_T_T_U2026_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=4588)[0m Symbol WTI_ICE_T_T_X2026_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=6244)[0m Symbol WTI_ICE_T_T_Z2025_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=1368)[0m Symbol WTI_ICE_T_T_M2026_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=14908)[0m Symbol WTI_ICE_T_T_M2027_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=6728)[0m Symbol WTI_ICE_T_T_Z2026_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=12160)[0m Symbol WTI_ICE_T_T_Z2027_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=20112)[0m Symbol WTI_ICE_T_T_M2028_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=1368)[0m Symbol WTI_ICE_T_T_Z2028_

[2m[36m(load_quandl_owf_implied_vols pid=20504)[0m Symbol HH_NYM_NG_NG_F2028_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=7304)[0m Symbol HH_NYM_NG_NG_V2027_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=22580)[0m Symbol HH_NYM_NG_NG_Z2027_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=4588)[0m Symbol HH_NYM_NG_NG_X2027_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=4960)[0m Symbol HH_NYM_NG_NG_M2028_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=1368)[0m Symbol HH_NYM_NG_NG_H2028_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=14908)[0m Symbol HH_NYM_NG_NG_J2028_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=8176)[0m Symbol HH_NYM_NG_NG_G2028_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=12628)[0m Symbol HH_NYM_NG_NG_K2028_IVM saved successfully.
[2m[36m(load_quandl_owf_implied_vols pid=9316)[0m Symbol HH_NYM_NG

In [11]:
dataframes[0].head()

Unnamed: 0_level_0,Future,AtM,RR25,RR10,Fly25,Fly10,Beta1,Beta2,Beta3,Beta4,Beta5,Beta6,MinMoney,MaxMoney,DtE,DtT
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
2021-11-24,82.25,0.448341,0.018241,0.023389,0.017135,0.06649,0.615285,75.819885,-417.058197,-11622.731445,57509.78125,1491270.0,-0.058196,0.053267,0.85,6.0
2021-11-23,82.309998,0.428443,0.011347,0.007743,0.014353,0.05209,0.283453,32.99017,-143.453384,-3660.596436,10506.664062,335872.8,-0.079759,0.075331,1.85,7.0
2021-11-22,79.699997,0.415653,-0.057008,-0.130627,0.018386,0.074118,-1.088839,27.212183,47.159893,-1640.96228,127.687584,65330.61,-0.119117,0.081875,2.85,8.0
2021-11-19,78.889999,0.369036,-0.069729,-0.124354,0.011291,0.040405,-1.137661,9.49088,89.881325,-199.624283,-3864.152344,-4288.779,-0.130331,0.09209,5.85,11.0
2021-11-18,81.239998,0.323595,-0.032377,-0.06402,0.008199,0.032928,-0.549235,8.666416,30.197849,-146.91275,-1009.595154,1469.119,-0.120742,0.096831,6.85,12.0


[2m[36m(load_quandl_owf_implied_vols pid=22580)[0m Symbol HH_NYM_NG_NG_F2030_IVM saved successfully.


Exception in thread 2022-12-17 01:57:23,377	ERROR import_thread.py:75 -- ImportThread: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Stream removed"
	debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:65491 {created_time:"2022-12-17T01:57:23.336661737+00:00", grpc_status:2, grpc_message:"Stream removed"}"
>
ray_listen_error_messages:
Traceback (most recent call last):
  File "C:\Users\magicmonk\anaconda3\envs\shell\lib\threading.py", line 980, in _bootstrap_inner
2022-12-17 01:57:23,396	ERROR worker.py:813 -- print_logs: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Stream removed"
	debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:65491 {created_time:"2022-12-17T01:57:23.336678927+00:00", grpc_status:2, grpc_message:"Stream removed"}"
>
