# Data Pipeline

This notebook documents the process of pushing data to a queue through which we write to a shared database with one writer for all streams.

### Data gets pushed from all sources to a single queue.

We use the following functions from our database.utils.py file:

In [None]:
import os
import redis
from dotenv import load_dotenv

load_dotenv()
r   = redis.from_url(os.getenv("REDIS_URL"))
PRICE_STREAM  = "prices_stream"
NEWS_STREAM = "news_stream"

async def publish_price(ticker, price, quantity, timestamp):
    await r.xadd(PRICE_STREAM, {
        "ticker": ticker,
        "price": float(price),
        "quantity": float(quantity),
        "timestamp": timestamp
    })

async def publish_news(article_id, title, timestamp, tickers):
    await r.xadd(NEWS_STREAM, {
        "table" : "article",
        "article_id": article_id,
        "title": title,
        "timestamp": timestamp.isoformat()
    })

    for ticker in tickers:
        await r.xadd(NEWS_STREAM, {
            "table" : "mention",
            "article_id": article_id,
            "ticker": ticker
        })

With this we are able to populate the tables in the database with live data. Right now they are pushed to the redis queue. All streams unify into one single redis stream that then gets written to the database in batches.

In [None]:
import duckdb
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer


DB_PATH = 'path_to_database'
con = duckdb.connect(DB_PATH)
analyzer = SentimentIntensityAnalyzer()

def get_sentiment(text):
    return analyzer.polarity_scores(text)["compound"]

async def process_prices(messages):
    for _, m in messages:
        print(m)

    rows = {(
        m["ticker"], float(m["price"]), float(m["quantity"]), m["timestamp"]
    ) for _, m in messages}
    rows = list(rows)
    print(rows)
    print(type(rows))

    con.executemany("""
        INSERT INTO prices (ticker, price, quantity, timestamp) VALUES (?, ?, ? ,?)
        ON CONFLICT DO NOTHING
    """, rows)

async def process_news(messages):
    articles, mentions = [], []
    for _, m in messages:
        if m['table'] == 'article':
            sentiment = get_sentiment(m['title'])
            articles.append((m['article_id'], m['title'], m['timestamp'], sentiment))
        else:
            mentions.append((m['article_id'], m['ticker']))
    if not articles and not mentions:
        return

    con.execute("BEGIN")
    if articles:
        con.executemany("""
            INSERT INTO news_articles (article_id, title, timestamp, sentiment) VALUES (?, ?, ?, ?)
            ON CONFLICT DO NOTHING
        """, articles)
    if mentions:
        con.executemany("""
            INSERT INTO ticker_mentions (article_id, ticker) VALUES (?, ?)
            ON CONFLICT DO NOTHING
        """, mentions)
    con.execute("COMMIT")

I use the vader sentiment library to help analyze the news articles and give it a sentiment score. Then process_news and process_prices populates the tables in the database. This is done in batches to increase efficiency.

In [None]:
import asyncio

async def snapshot_to_parquet():
    while True:
        try:
            con.execute("COPY (SELECT * FROM prices) TO 'data/prices.parquet' (FORMAT PARQUET)")
            con.execute("COPY (SELECT * FROM news_articles) TO 'data/news_articles.parquet' (FORMAT PARQUET)")
            con.execute("COPY (SELECT * FROM ticker_mentions) TO 'data/ticker_mentions.parquet' (FORMAT PARQUET)")
        except Exception as e:
            print("Parquet export error:", e)
        await asyncio.sleep(1)  # export every second


This method allows us to export data from the database into a parquet file which allows for faster reads, amd better data compression. Our main reason behind this is that our connection to the database is always open in the writer.py file so we needed another way of reading data live.