In [7]:
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import numpy as np 
import matplotlib.pyplot as plt 
import pandas as pd
import datetime 
from polygon import StocksClient
from env import POLYGON_API_KEY as api_key
import tqdm
from alive_progress import alive_bar
import os

#Polygon api
client = StocksClient(api_key, connect_timeout=10)

def create_database(cursor, db_name):
    cursor.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{db_name}'")
    exists = cursor.fetchone()
    if not exists:
        cursor.execute(f"CREATE DATABASE {db_name}")

#Connect to the PostgreSQL server
connection = psycopg2.connect(host='localhost',
                            user = 'postgres',
                            password = 'OSlin2023')

# Allow database creation by setting isolation level
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

# Create a cursor object
cursor = connection.cursor()

# Execute a function to create a new database if it doesn't exist
create_database(cursor, 'test_1')

# Close the cursor and connection
cursor.close()
connection.close()

In [12]:
# Connect to the TimescaleDB database
connection = psycopg2.connect(host='localhost',
                              user = 'postgres',
                              password = 'OSlin2023',
                              dbname='test_1')

# Create a cursor object
cursor = connection.cursor()

cursor.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;")

# Create a single table to store all stock data
create_table_query = """
    CREATE TABLE IF NOT EXISTS StocksData (
        date TIMESTAMPTZ NOT NULL,
        id SERIAL,
        open_price FLOAT,
        close_price FLOAT,
        high_price FLOAT,
        low_price FLOAT,
        volume INT,
        symbol VARCHAR(10),
        granularity VARCHAR(10) NOT NULL,
        PRIMARY KEY(date, symbol)
    )
"""
cursor.execute(create_table_query)

# Turn the table into a hypertable for better performance in TimescaleDB
cursor.execute("SELECT create_hypertable('StocksData', 'date', if_not_exists => TRUE, migrate_data => TRUE)")

# Commit the table creation transactions
connection.commit()

days = pd.read_csv('matchDays.csv',index_col=False)

try:
    with open('last_successful_index.txt', 'r') as f:
        c = int(f.read())
except FileNotFoundError:
    c = 50000

data = []
days.columns = ['date','symbol']
hour_4 = datetime.time(hour=4)
hour_9 = datetime.time(hour=9)
hour_16 = datetime.time(hour=16)
h=0

#There are NaN values or Non String values that would interrupt the download
#So we save them to a DF and a csv afterwards
error_df = pd.DataFrame(columns=['date', 'symbol'])

with alive_bar(len(days)-c,force_tty=True) as bar:
    for _,i in days[c:].iterrows():
        c+=1
        
        if not isinstance(i[1], str):
            print(f"Expected a string for the symbol, but got {i[1]} of type {type(i[1])}")
            # Add this row to the error dataframe
            new_row_df = pd.DataFrame([[i[0], i[1]]], columns=error_df.columns)
            error_df = pd.concat([error_df, new_row_df], ignore_index=True)
            # Skip to the next iteration
            continue
        
        day = datetime.datetime.strptime(i[0],"%m/%d/%Y") + datetime.timedelta(days=1)
        day_4 = datetime.datetime.combine(day,hour_4)
        day_9 = datetime.datetime.combine(day,hour_9)
        day_16 = datetime.datetime.combine(day,hour_16)
        bar()
        
        try:
            aux = client.get_aggregate_bars(i[1], day_4, day_16, timespan='min')
        except Exception as e:
            print(f"Error when making API request: {e}")
            continue
        
        if(aux['status']=='OK' and aux['queryCount']!=0):
            df = pd.DataFrame(aux['results'])
            df['symbol'] = i[1]
            for index,row in df.iterrows():
                # Convert Unix timestamp in milliseconds to datetime
                timestamp_s = row['t'] / 1000.0  # Convert to seconds
                date = datetime.datetime.fromtimestamp(timestamp_s)
                
                # Determine the time granularity of the current row
                if date.second != 0:
                    time_granularity = 'Seconds'
                elif date.minute != 0:
                    time_granularity = 'Minutes'
                elif date.hour != 0:
                    time_granularity = 'Hours'
                elif date.weekday() != 0:
                    time_granularity = 'Days'
                elif date.day - 1 // 7 != 0:
                    time_granularity = 'Weeks'
                else:
                    time_granularity = 'Months'
                
                # PostgreSQL Query to INSERT
                query = f"INSERT INTO StocksData (date, open_price, close_price, high_price, low_price, volume, symbol, granularity) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
                values = (date, row['o'], row['c'], row['h'], row['l'], row['v'], row['symbol'], time_granularity)
                
                try:
                    cursor.execute(query, values)
                    connection.commit()
                except Exception as e:
                    print(f"Error when executing SQL command: {e}")
                
            # Save the current row index to resume from in case of interruption
            with open('last_successful_index.txt', 'w') as f:
                f.write(str(c))
        
    error_df.to_csv('errors.csv', index=False)

# Close the cursor and connection
cursor.close()
connection.close()

|████████████████████████████████████████| 84/84 [100%] in 20.7s (4.09/s)       
