In [None]:
!pip install psycopg2

In [54]:
import psycopg2
import psycopg2.extras
from pathlib import Path
import json
import gzip
import csv
import os
import logging
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict

import io
from datetime import datetime
import pprint

#### Setting up of TimescaleDB

In [21]:
DB_USER = "postgres"
DB_PASSWORD = "password"
DB_HOST = "localhost"
DB_NAME = "postgres"

CONNECTION_URL = f"postgres://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"

In [24]:
def db_exec(sql, log="SQL executed successfully."):
    try:
        with psycopg2.connect(CONNECTION_URL) as conn:
            cursor = conn.cursor()
            cursor.execute(sql)
        conn.commit()
        print(f"{log}")
    except Exception as e:
        logging.error('Error at %s', 'division', exc_info=e)
        print(f"\nSQL executed unsuccessfully")

In [59]:
tables = ["industry", "symbol", "ohlc"]
for table in tables:
    db_exec(f"DROP TABLE IF EXISTS {table} CASCADE", f"{table.upper()} table dropped successfully.")


INDUSTRY table dropped successfully.
SYMBOL table dropped successfully.
OHLC table dropped successfully.


In [61]:
create_ohlc_table = """CREATE TABLE IF NOT EXISTS ohlc (
    symbol VARCHAR(10) NOT NULL,
    industry VARCHAR(50),
    timestamp TIMESTAMP NOT NULL,
    open DECIMAL(10,2),
    high DECIMAL(10,2),
    low DECIMAL(10,2),
    close DECIMAL(10,2)
);
"""
db_exec(create_ohlc_table, "OHLC table created successfully")

OHLC table created successfully


### Load Data

In [None]:
BASE_DIR = "./dataset"

In [28]:
PROFILE_PATH = os.path.join(BASE_DIR, "profile_estimate", "profile.json")

def extract_symbol_industry():
    symbol_industry = defaultdict(str)
    with open(PROFILE_PATH, "r") as file:
        company_profiles = json.load(file)
        for symbol, profile in company_profiles.items():
            symbol_industry[symbol] = profile[0]["industry"]
    return symbol_industry

In [63]:
OHLC_DIR = os.path.join(BASE_DIR, "OHLC")

symbol_industry = extract_symbol_industry()

def insert_ohlc(symbol):
    symbol_path = os.path.join(OHLC_DIR, symbol)
    new_keys = {"symbol": symbol, "industry": symbol_industry[symbol]}

    ohlc_buffer = io.StringIO()
    
    for date in os.listdir(symbol_path):
        file_path = os.path.join(symbol_path, date)
        with gzip.open(file_path, "rt") as file:
            reader = csv.DictReader(file, delimiter=",")
            for entry in reader:
                row = (
                    f"{new_keys['symbol']},"
                    f"{new_keys['industry']},"
                    f"{datetime.fromtimestamp(int(entry['timestamp']))},"
                    f"{float(entry['open'])},"
                    f"{float(entry['high'])},"
                    f"{float(entry['low'])},"
                    f"{float(entry['close'])}\n"
                )
                ohlc_buffer.write(row)

    ohlc_buffer.seek(0)

    with psycopg2.connect(CONNECTION_URL) as conn:
        with conn.cursor() as cursor:
            cursor.copy_from(
                ohlc_buffer,
                "ohlc",
                sep=",",
                columns=["symbol", "industry", "timestamp", "open", "high", "low", "close"]
            )
        conn.commit()


# While the code below works, it takes significantly longer
# OHLC_DIR = os.path.join(BASE_DIR, "OHLC")

# symbol_industry = extract_symbol_industry()

# def insert_ohlc(symbol):
#     symbol_path = os.path.join(OHLC_DIR, symbol)
#     new_keys = {"symbol": symbol, "industry": symbol_industry[symbol]}
#     for date in os.listdir(symbol_path):
#         ohlc_data = []
#         file_path = os.path.join(symbol_path, date)
#         with gzip.open(file_path, "rt") as file:
#             ohlc_data.extend(csv.DictReader(file, delimiter=","))
        
#         ohlc_data = [{**entry, **new_keys} for entry in ohlc_data]
        
#         with psycopg2.connect(CONNECTION_URL) as conn:
#             cursor = conn.cursor()
#             psycopg2.extras.execute_batch(
#                 cursor,
#                 "INSERT INTO ohlc (symbol, industry, timestamp, open, high, low, close) VALUES (%s, %s, TO_TIMESTAMP(%s), %s, %s, %s, %s)",
#                 [(entry["symbol"], entry["industry"], int(entry["timestamp"]), float(entry["open"]), float(entry["high"]), float(entry["low"]), float(entry["close"])) for entry in ohlc_data],
#                 page_size=20000  # Inserts in batches of 1000
#             )
#         conn.commit()

# insert_ohlc("AAPL") # page_size makes little to no diff, takes around 1 min

In [None]:
# uses multiprocessing to parallelize bulk insertions
def mp_insert_ohlc():
    ohlc_symbols = os.listdir(OHLC_DIR)
    
    with ThreadPoolExecutor() as executor:
        executor.map(insert_ohlc, ohlc_symbols)

mp_insert_ohlc()

In [66]:
def create_ohlc_hypertable():
    db_exec("""SELECT create_hypertable('ohlc', by_range('timestamp'), migrate_data => TRUE);""")

create_ohlc_hypertable()

SQL executed successfully.


### Workload 1

In [None]:
def fetch_moving_averages(window_size='1 week', symbol='AAPL', ratio_threshold=5):
    query = f"""
    WITH symbol_data AS (
        SELECT symbol, time_bucket(%s, timestamp) AS time_window, AVG(close) AS avg_close
        FROM ohlc
        WHERE timestamp >= '2014-01-01' AND symbol = %s
        GROUP BY symbol, time_window
    ),
    moving_avg AS (
        SELECT symbol, time_window, avg_close, LAG(avg_close) OVER (PARTITION BY symbol ORDER BY time_window) AS prev_avg_close
        FROM symbol_data
    )
    SELECT 
        symbol, 
        time_window, 
        avg_close, 
        prev_avg_close, 
        (avg_close - prev_avg_close) / prev_avg_close * 100 AS percent_change
    FROM moving_avg
    WHERE prev_avg_close IS NOT NULL
    AND ABS((avg_close - prev_avg_close) / prev_avg_close * 100) >= %s
    ORDER BY symbol, time_window;
    """

    with psycopg2.connect(CONNECTION_URL) as conn:
        with conn.cursor() as cursor:
            cursor.execute(query, (window_size, symbol, ratio_threshold))
            results = cursor.fetchall()
        
    return results


results = fetch_moving_averages(window_size='1 week', symbol='AAPL', ratio_threshold=5)


In [97]:
# Without the use of timescaledb
# def get_moving_averages(symbol, window=1, ratio=5, group_by_industry=False):
#     """
#     Calculates moving averages for a given symbol with an optional industry grouping.

#     Args:
#         symbol (str): Stock symbol to analyze.
#         window (int): Window size in weeks (1 or 2).
#         ratio (int): Significant price change threshold in percentage (±5, ±10, ±15).
#         group_by_industry (bool): If True, groups by industry instead of symbol.

#     Returns:
#         List of tuples: (symbol/industry, window_start, avg_close, prev_avg_close, price_change_percentage)
#     """
#     group_column = "industry" if group_by_industry else "symbol, industry"
#     partition_column = "industry" if group_by_industry else "symbol"
    
#     sql = f"""
#     WITH ohlc_window AS (
#         SELECT 
#             {group_column},
#             DATE_TRUNC('week', timestamp) AS window_start,
#             AVG(close) AS avg_close
#         FROM ohlc
#         WHERE symbol = %s AND timestamp >= '2014-01-01'
#         GROUP BY {group_column}, window_start
#     ),
#     price_changes AS (
#         SELECT 
#             {group_column},
#             window_start,
#             avg_close,
#             LAG(avg_close) OVER (PARTITION BY {partition_column} ORDER BY window_start) AS prev_avg_close
#         FROM ohlc_window
#     )
#     SELECT 
#         {group_column},
#         window_start,
#         avg_close,
#         prev_avg_close,
#         (avg_close - prev_avg_close) / prev_avg_close * 100 AS price_change_percentage
#     FROM price_changes
#     WHERE prev_avg_close IS NOT NULL
#     AND ABS((avg_close - prev_avg_close) / prev_avg_close * 100) >= %s;
#     """

#     with psycopg2.connect(CONNECTION_URL) as conn:
#         with conn.cursor() as cursor:
#             cursor.execute(sql, (symbol, ratio))
#             result = cursor.fetchall()
    
#     return result

# # Example Usage:
# symbol = "AAPL"
# window = 2  # 2-week window
# ratio = 10  # 10% price change threshold
# group_by_industry = False  # Set to True if you want industry-level aggregation

# result = get_moving_averages(symbol, window, ratio, group_by_industry)
# print(result)


### Workload 2

In [None]:
with open("./dataset/profile_estimate/historical_earning_estimates.json", "r") as file:
    earnings_estimate_data = json.load(file) 

In [None]:
calc_moving_averge = """
WITH ohlc_windows AS (
    SELECT 
        time_bucket('1 week', o.timestamp) AS window_start,  
        s.symbol, 
        i.industry,
        AVG(o.close) AS moving_avg,
        LAG(AVG(o.close)) OVER (PARTITION BY s.id ORDER BY time_bucket('1 week', o.timestamp)) AS prev_moving_avg
    FROM ohlc o
    JOIN symbol s ON o.symbol_id = s.id
    JOIN industry i ON s.industry_id = i.id
    WHERE o.timestamp >= '2014-01-01'
    GROUP BY s.id, s.symbol, i.industry, time_bucket('1 week', o.timestamp)
)
SELECT 
    window_start,
    symbol,
    industry,
    moving_avg,
    prev_moving_avg,
    ((moving_avg - prev_moving_avg) / prev_moving_avg) * 100 AS percent_change
FROM ohlc_windows
WHERE ABS((moving_avg - prev_moving_avg) / prev_moving_avg) * 100 IN (5, 10, 15);
"""
db_exec(calc_moving_averge)

In [None]:
create_industry_table = """CREATE TABLE IF NOT EXISTS industry (
    id SERIAL PRIMARY KEY,
    name VARCHAR(50) UNIQUE NOT NULL
);
"""
db_exec(create_industry_table, "INDUSTRY table created successfully")

create_symbol_table = """CREATE TABLE IF NOT EXISTS symbol  (
    id SERIAL PRIMARY KEY,
    name VARCHAR(10) UNIQUE NOT NULL,
    industry_id INTEGER references industry(id) ON DELETE SET NULL
);
"""
db_exec(create_symbol_table, "SYMBOL table created successfully")

create_ohlc_table = """CREATE TABLE IF NOT EXISTS ohlc (
    id SERIAL PRIMARY KEY,
    symbol_id INTEGER REFERENCES symbol(id) ON DELETE CASCADE,
    timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
    open DECIMAL(10,2),
    high DECIMAL(10,2),
    low DECIMAL(10,2),
    close DECIMAL(10,2),
    volume BIGINT
);
"""
db_exec(create_ohlc_table, "OHLC table created successfully")

In [None]:

def extract_industries():
    industries = set()
    with open(PROFILE_PATH, "r") as file:
        company_profiles = json.load(file)
        for companies in company_profiles.values():
            industry = companies[0]["industry"]
            if industry:
                industries.add(industry)
    return industries

def insert_industries():
    industries = extract_industries()
    values = ", ".join(["(%s)"] * len(industries))
    sql = f"INSERT INTO industry (name) VALUES {values} ON CONFLICT (name) DO NOTHING" # batch insertion
    with psycopg2.connect(CONNECTION_URL) as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, tuple(industries))
        conn.commit()

def extract_symbols():
    symbols = set()
    with open(PROFILE_PATH, "r") as file:
        company_profiles = json.load(file)
        for symbol in company_profiles.keys():
            symbols.add(symbol)
    return symbols

def insert_symbols():
    symbols = extract_symbols()
    values = ", ".join(["(%s)"] * len(symbols))
    sql = f"INSERT INTO symbol (name, industry_id) VALUES {values} ON CONFLICT (name) DO NOTHING;"

    industry_lookup_sql = "SELECT id, name FROM industry;"

    with psycopg2.connect(CONNECTION_URL) as conn:
        with conn.cursor() as cursor:
            cursor.execute(industry_lookup_sql)
            industry_map = {name: id for id, name in cursor.fetchall()}

            params = []
            for symbol in symbols:
                industry_id = industry_map.get(symbol["industry"])
                if industry_id:
                    params.extend([symbol["name"], industry_id])

            if params:
                cursor.execute(sql, tuple(params))
                conn.commit()