## Question 5

- Modify Question 3 to write data to the database asynchronously .
- Read from the database 5 times concurrantly using async (hint: asyncio.gather())


In [1]:
# Import Python packages
import numpy as np
import pandas as pd
from datetime import date, datetime
import functools
import logging

In [28]:
# Import sqlalchemy packages
from sqlalchemy import create_engine, inspect, Column, Integer, String, Float, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import text, select
import asyncio
import nest_asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

In [3]:
# Read data in 
data_file = "../data/MarketData.csv"
try:
    # Read in data set from the csv file, skip initial metadata rows (first 3 rows)
    f = open(data_file, 'rb')
    df = pd.read_csv(f, skiprows = 3, header = [0, 1, 2, 3])

    # Drop the first column name from the multi-index (for "Dates" column)
    df.columns = ['Dates'] + [f"{col[0]} ({col[1]})" for col in df.columns[1:]]
    
except:
    print('Error opening file/loading data')

In [4]:
# Convert "Dates" column to datetime format, European date format
df["Dates"] = pd.to_datetime(df["Dates"], dayfirst=True)

In [5]:
# First slice dataframe by "Dates", copper and zinc columns, then slice by the 2020 and 2021 data
copper_zinc = ['Dates', 'LME COPPER    3MO ($) (LMCADS03 Comdty)', 'LME ZINC      3MO ($) (LMZSDS03 Comdty)']
df_filtered = df[copper_zinc][df['Dates'].dt.year.isin([2020, 2021])]
df_filtered.set_index('Dates', inplace=True)

In [6]:
# Define MACD function
def macd(df, fast_window=12, slow_window=26, signal_window=9):

    df_fast_ema = df.ewm(span=fast_window, adjust=False).mean()  # Fast EMA (e.g. 12)
    df_slow_ema = df.ewm(span=slow_window, adjust=False).mean()  # Slow EMA (e.g. 26)
    df_macd_line = df_fast_ema - df_slow_ema  # MACD line (Fast - Slow)
    df_signal_line = df_macd_line.ewm(span=signal_window, adjust=False).mean()  # Signal Line (e.g. 9)
    df_macd_histogram = df_macd_line - df_signal_line # MACD Histogram

    return df_macd_line, df_signal_line, df_macd_histogram

In [7]:
# Define windows for fast and slow EMA and window for the signal line  
fast_window=12
slow_window=26
signal_window=9

df_macd_line, df_signal_line, df_macd_histogram = macd(df_filtered, fast_window, slow_window, signal_window)

# Display the DataFrame with MACD line
df_macd_line

Unnamed: 0_level_0,LME COPPER 3MO ($) (LMCADS03 Comdty),LME ZINC 3MO ($) (LMZSDS03 Comdty)
Dates,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-01-01,0.000000,0.000000
2020-01-02,1.116809,3.031339
2020-01-03,-2.687592,5.052686
2020-01-06,-4.919671,8.054566
2020-01-07,-5.774780,12.029776
...,...,...
2021-12-27,-2.500190,66.849430
2021-12-28,0.815513,69.361656
2021-12-29,12.378352,70.060845
2021-12-30,22.173985,71.365785


In [8]:
# Define function to calculate the RSI line
def rsi(df, period=14):

    # Step 1 of RSI calculation: use simple moving average (SMA) for the first period
    # Calculate daily price changes
    df_price_change = df.diff()

    # Separate gains and losses
    df_gains = df_price_change.where(df_price_change > 0, 0)
    df_losses = -df_price_change.where(df_price_change < 0, 0)

    # Compute initial average gain/loss (first 14 values using SMA)
    df_avg_gain = df_gains.rolling(window=period, min_periods=period).mean()
    df_avg_loss = df_losses.rolling(window=period, min_periods=period).mean()

    # # Step 2: Calculate the smoothed averages for RSI
    for i in range(period, len(df)):
        df_avg_gain.iloc[i] = ((df_avg_gain.iloc[i - 1] * (period - 1)) + df_gains.iloc[i]) / period
        df_avg_loss.iloc[i] = ((df_avg_loss.iloc[i - 1] * (period - 1)) + df_losses.iloc[i]) / period

    # Calculate RS (Relative Strength)
    df_rs = df_avg_gain / df_avg_loss

    # Calculate RSI using the formula
    df_rsi = 100 - (100 / (1 + df_rs))

    return df_rsi

In [9]:
# Define the period for RSI metric e.g. 14  
rsi_period=14

df_rsi= rsi(df_filtered, rsi_period)

In [12]:
# Use melt() to transform from wide to long format
df_long = df_filtered.reset_index().melt(id_vars=['Dates'], var_name='Metal', value_name='Price')

In [13]:
df_macd_line_long = df_macd_line.reset_index().melt(id_vars=['Dates'], var_name='Metal', value_name='MACD_line')
df_signal_line_long = df_signal_line.reset_index().melt(id_vars=['Dates'], var_name='Metal', value_name='Signal_line')
df_macd_histogram_long = df_macd_histogram.reset_index().melt(id_vars=['Dates'], var_name='Metal', value_name='MACD_histogram')
df_rsi_long = df_rsi.reset_index().melt(id_vars=['Dates'], var_name='Metal', value_name='RSI')

In [14]:
df_merged = pd.merge(df_long, df_macd_line_long, on=['Dates', 'Metal'], how='inner')
df_merged = pd.merge(df_merged, df_signal_line_long, on=['Dates', 'Metal'], how='inner')
df_merged = pd.merge(df_merged, df_macd_histogram_long, on=['Dates', 'Metal'], how='inner')
df_merged = pd.merge(df_merged, df_rsi_long, on=['Dates', 'Metal'], how='inner')
df_merged

Unnamed: 0,Dates,Metal,Price,MACD_line,Signal_line,MACD_histogram,RSI
0,2020-01-01,LME COPPER 3MO ($) (LMCADS03 Comdty),6174.0,0.000000,0.000000,0.000000,
1,2020-01-02,LME COPPER 3MO ($) (LMCADS03 Comdty),6188.0,1.116809,0.223362,0.893447,
2,2020-01-03,LME COPPER 3MO ($) (LMCADS03 Comdty),6129.5,-2.687592,-0.358829,-2.328763,
3,2020-01-06,LME COPPER 3MO ($) (LMCADS03 Comdty),6138.5,-4.919671,-1.270997,-3.648674,
4,2020-01-07,LME COPPER 3MO ($) (LMCADS03 Comdty),6149.0,-5.774780,-2.171754,-3.603026,
...,...,...,...,...,...,...,...
1041,2021-12-27,LME ZINC 3MO ($) (LMZSDS03 Comdty),3519.0,66.849430,41.187284,25.662145,66.260897
1042,2021-12-28,LME ZINC 3MO ($) (LMZSDS03 Comdty),3519.0,69.361656,46.822159,22.539497,66.260897
1043,2021-12-29,LME ZINC 3MO ($) (LMZSDS03 Comdty),3513.0,70.060845,51.469896,18.590949,65.418907
1044,2021-12-30,LME ZINC 3MO ($) (LMZSDS03 Comdty),3532.5,71.365785,55.449074,15.916711,66.891416


Define new database

In [17]:
# Async database setup
DATABASE_URL = "sqlite+aiosqlite:///metal_prices_async.db"
engine = create_async_engine(DATABASE_URL, echo=False)
Base = declarative_base()
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

In [18]:
# SQLAlchemy table definition
class MetalPrice(Base):
    __tablename__ = 'metal_prices'

    Dates = Column(DateTime, primary_key=True)
    Metal = Column(String, primary_key=True)
    Price = Column(Float)
    MACD_line = Column(Float)
    Signal_line = Column(Float)
    MACD_histogram = Column(Float)
    RSI = Column(Float)

In [19]:
# Logging decorator
def log_db_operation(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        print(f"Executing {func.__name__}")
        result = await func(*args, **kwargs)
        print(f"Finished {func.__name__}")
        return result
    return wrapper

In [20]:
# Insert DataFrame into the database
@log_db_operation
async def insert_data(df: pd.DataFrame):
    async with async_session() as session:
        for _, row in df.iterrows():
            session.add(
                MetalPrice(
                    Dates=pd.to_datetime(row['Dates']),
                    Metal=row['Metal'],
                    Price=row['Price'],
                    MACD_line=row['MACD_line'],
                    Signal_line=row['Signal_line'],
                    MACD_histogram=row['MACD_histogram'],
                    RSI=row['RSI']
                )
            )
        await session.commit()

In [21]:
# Read from the database
@log_db_operation
async def read_data(task_id: int):
    async with async_session() as session:
        result = await session.execute(select(MetalPrice).limit(5))
        rows = result.scalars().all()
        print(f"Task {task_id} read {len(rows)} rows")
        return rows

In [26]:
# Main async runner
async def main():
    # Create tables
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    # Insert data
    await insert_data(df_merged)

    # Read concurrently 5 times
    results = await asyncio.gather(*(read_data(i) for i in range(5)))

In [29]:
# Run the event loop
# nest_asyncio is a small patch that allows you to re-enter the running event loop (which is what notebooks already have running under the hood).
nest_asyncio.apply()
await main()

Executing insert_data
Finished insert_data
Executing read_data
Executing read_data
Executing read_data
Executing read_data
Executing read_data
Task 0 read 5 rows
Finished read_data
Task 2 read 5 rows
Task 3 read 5 rows
Task 4 read 5 rows
Task 1 read 5 rows
Finished read_data
Finished read_data
Finished read_data
Finished read_data


In [31]:
# Let's print out the first 20 lines of the database created with async
async with async_session() as session:

    result = await session.execute(
        select(MetalPrice).limit(20)
    )

    rows = result.scalars().all()
    
    for row in rows:

        print(f"Dates: {row.Dates}, Metal: {row.Metal}, Price: {row.Price}, MACD line: {float(row.MACD_line):.2f}, Signal line: {float(row.Signal_line):.2f}, MACD histogram: {float(row.MACD_histogram):.2f}, RSI line: {row.RSI}")

Dates: 2020-01-01 00:00:00, Metal: LME COPPER    3MO ($) (LMCADS03 Comdty), Price: 6174.0, MACD line: 0.00, Signal line: 0.00, MACD histogram: 0.00, RSI line: None
Dates: 2020-01-02 00:00:00, Metal: LME COPPER    3MO ($) (LMCADS03 Comdty), Price: 6188.0, MACD line: 1.12, Signal line: 0.22, MACD histogram: 0.89, RSI line: None
Dates: 2020-01-03 00:00:00, Metal: LME COPPER    3MO ($) (LMCADS03 Comdty), Price: 6129.5, MACD line: -2.69, Signal line: -0.36, MACD histogram: -2.33, RSI line: None
Dates: 2020-01-06 00:00:00, Metal: LME COPPER    3MO ($) (LMCADS03 Comdty), Price: 6138.5, MACD line: -4.92, Signal line: -1.27, MACD histogram: -3.65, RSI line: None
Dates: 2020-01-07 00:00:00, Metal: LME COPPER    3MO ($) (LMCADS03 Comdty), Price: 6149.0, MACD line: -5.77, Signal line: -2.17, MACD histogram: -3.60, RSI line: None
Dates: 2020-01-08 00:00:00, Metal: LME COPPER    3MO ($) (LMCADS03 Comdty), Price: 6178.0, MACD line: -4.07, Signal line: -2.55, MACD histogram: -1.52, RSI line: None
Date