Async Data Pipeline

Tasks:
- Modify Question 3 to write data to the database asynchronously .
- Read from the database 5 times concurrantly using async (hint: asyncio.gather())

# Task 1: Modify Question 3 to write data to the database asynchronously.

For continuity, SQLAlchemy is used for both synchronous and asynchronous database operations. This helps to create a unified codebase and easier maintenance.

To ensure the subsequent base interaction doesn't interfere with database from Question 3, a copy of the database, named `metal_commodity_Q4.db`.

In [1]:
import pandas as pd
import asyncio
from sqlalchemy import create_engine, select, Column, Integer, String, Float, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from datetime import datetime
import logging
from functools import wraps

Then, we configure logging.

In [2]:
# Configure logging
logging.basicConfig(filename='sql_inserts_Q4.log', level=logging.INFO,
                    format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')

Next, we create SQLAlchemy engine for asynchronous operations, using additional aiosqlite as async driver.

In [3]:
engine = create_async_engine('sqlite+aiosqlite:///metal_commodity_Q4.db')  # Use future=True for asyncio compatibility

Next, the ORM classes, function for logging and functions to calculate MACD and RSI are created in the same way as in Question 3.

In [4]:
# Define Base class for declarative ORM
class Base(DeclarativeBase):
    pass

# Define MetalPrice ORM class
class MetalPrice(Base):
    __tablename__ = 'metal_prices'

    id = Column(Integer, primary_key=True)
    date = Column(Date)
    metal = Column(String)
    price = Column(Float)
    macd = Column(Float)
    macd_signal = Column(Float)
    rsi = Column(Float)

# Define decorator to log SQL operations
def log_sql(func):
    @wraps(func)    # Preserve metadata of original function, helps debugging
    def wrapper(*args, **kwargs):
        start_time = datetime.now()
        result = func(*args, **kwargs)
        end_time = datetime.now()
        execution_time = end_time - start_time
        logging.info(f"SQL operation {func.__name__} executed in {execution_time.total_seconds()} seconds (asynchronous)")
        return result
    return wrapper

# Function to calculate MACD for a series of prices
def calculate_macd(prices, slow_period=26, fast_period=12, signal_period=9):
    slow_ema = prices.ewm(span=slow_period).mean()
    fast_ema = prices.ewm(span=fast_period).mean()
    macd_line = fast_ema - slow_ema
    signal_line = macd_line.ewm(span=signal_period).mean()
    return macd_line, signal_line

# Function to calculate RSI for a series of prices
def calculate_rsi(prices, window=14):
    delta = prices.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

# Function to read CSV file and calculate MACD and RSI
def calculate_macd_rsi(csv_file):
    # Read CSV file into DataFrame
    df = pd.read_csv(csv_file)

    # Convert 'Dates' column to datetime
    df['Dates'] = pd.to_datetime(df['Dates'])

    # Extract metal column names
    metals = df.columns[1:]

    # Iterate over metal columns and calculate MACD, RSI
    for metal in metals:
        prices = df[metal]
        macd_line, macd_signal = calculate_macd(prices)
        rsi = calculate_rsi(prices)
        df[f'{metal}_macd'] = macd_line
        df[f'{metal}_macd_signal'] = macd_signal
        df[f'{metal}_rsi'] = rsi

    return (df, metals)

Next, async populate_sql_table() function is created, using 'await' to pause operation until task is complete.

In [5]:
# Function to populate SQL table with calculated data
@log_sql
async def populate_sql_table(df, metals):
    # Create session
    async_session = async_sessionmaker(bind=engine, expire_on_commit=False)    # SQLAlchemy version > 2.0
    async with async_session() as session:

        # Iterate over DataFrame rows and insert into SQL table
        for index, row in df.iterrows():
            date = row['Dates']
            for metal in metals:
                price = row[metal]
                macd = row[f'{metal}_macd']
                macd_signal = row[f'{metal}_macd_signal']
                rsi = row[f'{metal}_rsi']

                metal_price = MetalPrice(date=date, metal=metal, price=price,
                                        macd=macd, macd_signal=macd_signal, rsi=rsi)
                session.add(metal_price)

        # Commit changes
        await session.commit()
        
    # Close and clean-up pooled connections.
    # await engine.dispose()

Finally, we run the asynchronous routine above. The main_write() function calls populate_sql_table() and awaits its completion.

In [6]:
# Run the async processing function
async def main_write():
    # Read CSV file and calculate MACD, RSI
    csv_file = 'MarketData_filtered.csv'
    df, metals =  calculate_macd_rsi(csv_file)             
    
    # Populate SQL table with calculated data
    await populate_sql_table(df, metals)

Lastly, we excecute the asyncio loop. The syntax to do this is different compared running from a script. 
- Jupyter notebooks execute code within a single-threaded event loop. Therefore, we need to use `await main()` to ensure these requests are processed efficiently without blocking the Jupyter notebook.
- When a Python script is run, it creates a new event loop. Therefore, we can use `asyncio.run(main)`.

Note that as a result of this new writing action to database, a duplicate of existing rows are created in `metal_commodity_Q4.db`.

In [7]:
# Execute the asyncio event loop from Jupyter notebook
await main_write()

# Execute the asyncio event loop from .py script
# asyncio.run(main_write())

# Task 2: - Read from the database 5 times concurrantly using async.

First, we define define async session.

In [8]:
# Define async session
async_session = async_sessionmaker(bind=engine, expire_on_commit=False)    

Next, we define an async function ```read_data()``` to asynchronously execute a database query.

In [9]:
# Async function to read data from the database
async def read_data(query):
    async with async_session() as session:
        result = await session.execute(query)
        data = result.fetchall()        
        return data

Next, we create ```concurrent_reads()``` function to create list of queries to be run concurrently, in which ```asyncio.gather()``` function is used run multiple tasks in parallel. To manually validate the query outputs, we choose queries that we're confident to only return a small number of rows from database.

In [16]:
# Async function to perform concurrent database reads 5 times
async def concurrent_reads():
    queries = [
        select(MetalPrice).where(MetalPrice.date == '2021-01-01'),      # Should return 4 database rows, 2 for each metal
        select(MetalPrice).where(MetalPrice.id == 1),                   # Should return 1 database row
        select(MetalPrice).where(MetalPrice.metal == 'TIN'),            # Should return 0 database row
        select(MetalPrice).where(MetalPrice.id == 76),                  # Should return 1 database row
        select(MetalPrice).where(MetalPrice.id == 16),                  # Should return 1 database row
    ]

    tasks = []
    for idx, query in enumerate(queries):
        tasks.append(read_data(query))

    # Concurrently execute all tasks
    results = await asyncio.gather(*tasks)
    return results

Finally, we create the ```main_read()``` function to call ```concurrent_reads()``` and awaits its completion to print out the results.

In [17]:
# Main function to run concurrent reads
async def main_read():
    results = await concurrent_reads()    
    # pp.pprint(results)
    # Process results as needed    
    for idx, data in enumerate(results):
        # pp.pprint(rows)
        print(f"Results for query id = {idx}:")
        for row in data:
            print(f"ID: {row[0].id}, Metal: {row[0].metal}, Date: {row[0].date}, Price: {row[0].price}, MACD: {row[0].macd}, MACD_signal: {row[0].macd_signal}, RSI: {row[0].rsi}")            
        print()

Here the query results returned from ```concurrent_reads()``` is structurally different compared to synschronous read in Question 3:

- `results` contains list of length 5, with each list item represetining the results from each query executed cocurrently.
- `data` is an item within `results`. It contains a list of all the data returned by a single data base query.
- `row` is an item within `data`. It represents a single row of data returned from databaes query. Each `row` contains a tuple containing the data for that row. `row[0]` is used to access that data.

In [18]:
# Run the main_read() function using asyncio.run() in Jupyter notebook
await main_read()

# Run the main_read() function using asyncio.run() in .py script
# asyncio.run(main_read())

Results for query id = 0:
ID: 525, Metal: COPPER, Date: 2021-01-01, Price: 7766.0, MACD: 262.12690234565434, MACD_signal: 267.8027105160701, RSI: 50.94816687737041
ID: 526, Metal: ZINC, Date: 2021-01-01, Price: 2751.0, MACD: 66.30855054413951, MACD_signal: 73.89282576880589, RSI: 38.84514435695538
ID: 1571, Metal: COPPER, Date: 2021-01-01, Price: 7766.0, MACD: 262.12690234565434, MACD_signal: 267.8027105160701, RSI: 50.94816687737041
ID: 1572, Metal: ZINC, Date: 2021-01-01, Price: 2751.0, MACD: 66.30855054413951, MACD_signal: 73.89282576880589, RSI: 38.84514435695538

Results for query id = 1:
ID: 1, Metal: COPPER, Date: 2020-01-01, Price: 6174.0, MACD: 0.0, MACD_signal: 0.0, RSI: None

Results for query id = 2:

Results for query id = 3:
ID: 76, Metal: ZINC, Date: 2020-02-20, Price: 2112.0, MACD: -27.50577707624916, MACD_signal: -10.99218094590205, RSI: 45.83508624316366

Results for query id = 4:
ID: 16, Metal: ZINC, Date: 2020-01-12, Price: 2770.0, MACD: 48.427135470621124, MACD_sig

In [19]:
# Close and clean-up pooled connections.
engine.dispose()

<coroutine object AsyncEngine.dispose at 0x00000140D1718840>

# Improvements

[-] Consider using additional database drivers. While SQLAlchemy provides async support, performance is might be more limited  compared to native async database drivers.

[-] Benchmark to identify any performance bottlenecks.