In [17]:
import pandas as pd
import requests
from secrets_config import api_key_id, api_secret_key 

Extract data for Alpaca Stocks API

In [19]:
# Data retrieved for tesla stock from date between 2020-01-01 t0 2020-01-02
# Documentation for authorisation and api reference at URL https://alpaca.markets/docs/api-references/trading-api/

stock_ticker = "tsla"

## f string format for base url
base_url = f"https://data.alpaca.markets/v2/stocks/{stock_ticker}/trades"

## start date and end date in unix format
start_time = "2020-01-01T00:00:00.00Z"
end_time = "2020-01-02T00:00:00.00Z"

## array to store response data
response_data = []

## parameters for passing for rest API
params = {
    "start": start_time,
    "end": end_time
}

## Headers for secrets authorisation
headers = {
    "APCA-API-KEY-ID": api_key_id,
    "APCA-API-SECRET-KEY": api_secret_key
}

response = requests.get(base_url, params=params, headers=headers)

if response.json().get("trades") is not None:
    response_data.extend(response.json().get("trades"))

In [20]:
# read trade data into a dataframe
df = pd.json_normalize(data=response_data, max_level=0)

In [23]:
# read exchange codes data into a dataframe 
df_exchange_codes = pd.read_csv("data/exchange_codes.csv")

Load data

In [27]:
# import libraries for sql 
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, Float, JSON 
from sqlalchemy.engine import URL
from sqlalchemy.dialects import postgresql
from secrets_config import db_user, db_password, db_server_name, db_database_name

In [28]:
# create connection to database(Postgres)
connection_url = URL.create(
    drivername = "postgresql+pg8000", 
    username = db_user,
    password = db_password,
    host = db_server_name, 
    port = 5432,
    database = db_database_name, 
)

engine = create_engine(connection_url)

In [29]:
# create meta object 
meta = MetaData()

# specify trade data table schema 
# see field definition here: https://alpaca.markets/docs/api-references/market-data-api/stock-pricing-data/historical/#trade 
trades_table = Table(
    "raw_trades", meta, 
    Column("i", Integer, primary_key=True),
    Column("t", String),
    Column("x", String),
    Column("p", Float),
    Column("s", Integer),
    Column("c", String),
    Column("z", String)
)

# specify exchange codes table schema 
exchange_codes_table = Table(
    "raw_exchange_codes", meta, 
    Column("exchange_code", String, primary_key=True), 
    Column("exchange_name", String)
)

# creates table if it does not exist 
meta.create_all(engine) 

In [32]:
# upsert data to trade table 
insert_statement = postgresql.insert(trades_table).values(df.to_dict(orient='records'))
upsert_statement = insert_statement.on_conflict_do_update(
    index_elements=['i'],
    set_={c.key: c for c in insert_statement.excluded if c.key not in ['i']})
engine.execute(upsert_statement)

# upsert data to exchange codes table 
insert_statement = postgresql.insert(exchange_codes_table).values(df_exchange_codes.to_dict(orient='records'))
upsert_statement = insert_statement.on_conflict_do_update(
    index_elements=['exchange_code'],
    set_={c.key: c for c in insert_statement.excluded if c.key not in ['exchange_code']})
engine.execute(upsert_statement)

AttributeError: 'Engine' object has no attribute 'execute'

Transform

In [21]:
# create a staging table for trades and rename columns to more meaningful names 

target_table = "staging_trades"
engine.execute(f"drop table if exists {target_table}")

engine.execute(f"""
    create table {target_table} as (
        select 
            i as id, 
            to_timestamp(t, 'YYYY-MM-DDTHH:MI:SS') as timestamp, 
            x as exchange, 
            p as trade_price, 
            s as trade_size, 
            c as trade_conditions, 
            z as tape
        from 
            raw_trades 
    )
""")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f9dbe98ee20>

In [23]:
# create a staging table for exchange codes

target_table = "staging_exchange_codes"
engine.execute(f"drop table if exists {target_table}")

engine.execute(f"""
    create table {target_table} as (
        select 
            exchange_code, 
            exchange_name
        from 
            raw_exchange_codes 
    )
""")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f9dbe98e130>

In [25]:
# create a serving table by joining staging_trades with staging_exchange_codes

target_table = "serving_trades"
engine.execute(f"drop table if exists {target_table}")

engine.execute(f"""
    create table {target_table} as (
        select 
            id, 
            timestamp,  
            exchange_name,
            trade_price, 
            trade_size, 
            trade_conditions, 
            tape
        from 
            staging_trades st inner join staging_exchange_codes sec 
            on st.exchange = sec.exchange_code
    )
""")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f9dbebdbb20>