In [4]:
import pandas as pd
import asyncio
import sqlalchemy
from databases import Database
from ta.trend import MACD
from ta.momentum import RSIIndicator

# ------------------------------------------
# 1. DATABASE SETUP
# ------------------------------------------
DATABASE_URL = "sqlite+aiosqlite:///metal_prices_async.db"
database = Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()

metal_prices = sqlalchemy.Table(
    "metalprices",
    metadata,
    sqlalchemy.Column("Id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("Date", sqlalchemy.Date, nullable=False),
    sqlalchemy.Column("Metal", sqlalchemy.String, nullable=False),
    sqlalchemy.Column("Indicator", sqlalchemy.String, nullable=False),
    sqlalchemy.Column("Value", sqlalchemy.Numeric(18, 4), nullable=False),
)

# Create the table synchronously
engine = sqlalchemy.create_engine(str(DATABASE_URL).replace("+aiosqlite", ""))
metadata.create_all(engine)

# ------------------------------------------
# 2. LOAD & FILTER CSV
# ------------------------------------------
df = pd.read_csv('../data/MarketData.csv', skiprows=6)
df.rename(columns={'Dates': 'Date', 'PX_SETTLE': 'Copper', 'PX_SETTLE.2': 'Zinc'}, inplace=True)
df = df[['Date', 'Copper', 'Zinc']]
df['Date'] = pd.to_datetime(df['Date'], dayfirst=True)
df = df[df['Date'].dt.year.isin([2020, 2021])].copy()

# ------------------------------------------
# 3. COMPUTE MACD (FAST, MEDIUM, SLOW) + RSI
# ------------------------------------------
def compute_indicators(df, metal):
    temp = df[['Date', metal]].dropna().copy()
    temp = temp.rename(columns={metal: 'Price'})

    # Compute MACD
    macd = MACD(temp['Price'], window_fast=12, window_slow=26, window_sign=9)

    temp['MACD'] = macd.macd()

    # Compute RSI
    rsi = RSIIndicator(temp['Price'], window=14)
    temp['RSI'] = rsi.rsi()

    temp.dropna(inplace=True)
    temp['Metal'] = metal

    return temp[['Date', 'Metal', 'MACD', 'RSI']]

# Combine Copper and Zinc
df_copper = compute_indicators(df, 'Copper')
df_zinc = compute_indicators(df, 'Zinc')
df_all = pd.concat([df_copper, df_zinc])

# ------------------------------------------
# 4. ASYNC INSERT
# ------------------------------------------
async def async_insert():
    await database.connect()
    for _, row in df_all.iterrows():
        for indicator in ['MACD', 'RSI']:
            value = row[indicator]
            query = metal_prices.insert().values(
                Date=row['Date'].date(),
                Metal=row['Metal'],
                Indicator=indicator,
                Value=round(float(value), 4)
            )
            await database.execute(query)
    await database.disconnect()
    print(" All indicator values inserted asynchronously.")

# ------------------------------------------
# 5. ASYNC READ TASK
# ------------------------------------------
async def async_read(i):
    await database.connect()
    rows = await database.fetch_all(query=metal_prices.select().limit(5))
    await database.disconnect()
    print(f"Read {i}: {len(rows)} rows")
    return rows

# ------------------------------------------
# 6. SANITY CHECKS
# ------------------------------------------
async def sanity_checks():
    await database.connect()

    # Row count
    total = await database.fetch_val("SELECT COUNT(*) FROM metalprices")
    print(f" Total rows in DB: {total}")

    # Unique metals
    metals = await database.fetch_all("SELECT DISTINCT Metal FROM metalprices")
    print("Metals:", [m[0] for m in metals])

    # Unique indicators
    indicators = await database.fetch_all("SELECT DISTINCT Indicator FROM metalprices")
    print(" Indicators:", [i[0] for i in indicators])

    # Sample rows
    sample = await database.fetch_all("SELECT * FROM metalprices LIMIT 5")
    print(" Sample rows:")
    for row in sample:
        print(dict(row))

    await database.disconnect()

# ------------------------------------------
# 7. MAIN PIPELINE: INSERT + READS + CHECKS
# ------------------------------------------
async def main_pipeline():
    print(" Inserting data...")
    await async_insert()

    print("\n Reading concurrently...")
    await asyncio.gather(
        async_read(1),
        async_read(2),
        async_read(3),
        async_read(4),
        async_read(5)
    )

    print("\nRunning sanity checks...")
    await sanity_checks()

# ------------------------------------------
# 8. RUN
# ------------------------------------------
await main_pipeline()


 Inserting data...
 All indicator values inserted asynchronously.

 Reading concurrently...
Read 5: 5 rows
Read 1: 5 rows
Read 2: 5 rows
Read 3: 5 rows
Read 4: 5 rows

Running sanity checks...
 Total rows in DB: 1992
Metals: ['Copper', 'Zinc']
 Indicators: ['MACD', 'RSI']
 Sample rows:
{'Id': 1, 'Date': '2020-02-05', 'Metal': 'Copper', 'Indicator': 'MACD', 'Value': -156.3104}
{'Id': 2, 'Date': '2020-02-05', 'Metal': 'Copper', 'Indicator': 'RSI', 'Value': 34.098}
{'Id': 3, 'Date': '2020-02-06', 'Metal': 'Copper', 'Indicator': 'MACD', 'Value': -147.126}
{'Id': 4, 'Date': '2020-02-06', 'Metal': 'Copper', 'Indicator': 'RSI', 'Value': 35.3924}
{'Id': 5, 'Date': '2020-02-07', 'Metal': 'Copper', 'Indicator': 'MACD', 'Value': -143.9972}
