In [1]:
# get_stock_prices.py

import os
from io import StringIO

import pandas as pd
import psycopg2
import yfinance as yf


def download_prices(ticker, period='2y', interval='60m', progress=False):
    """Download stock prices to a Pandas DataFrame"""
    
    df = yf.download(
        tickers=ticker,
        period=period,
        interval=interval, 
        progress=progress
    )
    
    df = df.reset_index() # remove the index
    df['ticker'] = ticker # add a column for the ticker
    
    # Rename columns to match our database table
    df = df.rename(columns={
        "Datetime": "time",
        "Open": "open",
        "High": "high",
        "Low": "low",
        "Close": "close",
        "Adj Close": "close_adj",
        "Volume": "volume",
    })
    
    return df


def upload_to_aws_efficiently(df, table_name='stock_prices'):
    """
    Upload the stock price data to AWS as quickly and efficiently as possible
    by truncating (i.e. removing) the existing data and copying all-new data
    """
    
    with psycopg2.connect(
        host='localhost',
        #host=os.getenv('POSTGRES_HOST'),
        port=os.getenv("POSTGRES_PORT"), 
        dbname=os.getenv("POSTGRES_DB"), 
        user=os.getenv("POSTGRES_USER"), 
        password=os.getenv("POSTGRES_PASSWORD"), 
        connect_timeout=5
    ) as conn:
        with conn.cursor() as cursor:
            # Truncate the existing table (i.e. remove all existing rows)
            cursor.execute(f"TRUNCATE {table_name}")
            conn.commit()
            
            # Now insert the brand-new data
            # Initialize a string buffer
            sio = StringIO()
            # Write the Pandas DataFrame as a CSV file to the buffer
            sio.write(df.to_csv(index=None, header=None))
            # Be sure to reset the position to the start of the stream
            sio.seek(0)
            cursor.copy_from(
                file=sio, 
                table=table_name, 
                sep=",", 
                null="", 
                size=8192, 
                columns=df.columns
            )
            conn.commit()
            print("DataFrame uploaded to TimescaleDB")



In [2]:
from dotenv import load_dotenv

load_dotenv()

True

In [3]:
# Download prices for the four stocks in which we're interested
msft = download_prices("MSFT")
tsla = download_prices("TSLA")
cvx = download_prices("CVX")
xom = download_prices("XOM")

# Append the four tables to each-other, one on top of the other
df_all = pd.concat([msft, tsla, cvx, xom])

# Erase existing data and upload all-new data to TimescaleDB
upload_to_aws_efficiently(df_all)

print("All done!")    


DataFrame uploaded to TimescaleDB
All done!


## TimescaleDB functions
- `time_bucket()`:
- `first()`: The `first` aggregate allows you to get the value of one column as ordered by another.
- `last()`: The last aggregate allows you to get the value of one column as ordered by another.

Important: The `last` and `first` commands do not use indexes, they perform a sequential scan through the group. They are primarily used for ordered selection within a `GROUP BY` aggregate, and not as an alternative to an `ORDER BY` time `DESC LIMIT 1` clause to find the latest value, which uses indexes.

In [107]:
queries=["SELECT * FROM stock_prices WHERE time > (now() - interval '14 days') ORDER BY time, ticker;"
        ,"SELECT time_bucket('1 day', time) AS period,ticker,first(open,time) as open,max(high,time) AS high,min(low,time) AS low,last(close,time) AS close,last(close_adj,time) as close_adj,sum(volume) AS volume FROM stock_prices GROUP BY ticker,time_bucket('1 day', time) ORDER BY ticker,time_bucket('1 day', time);"
        ,"SELECT time_bucket('1 day', time) AS period, AVG(close) AS avg_close,last(close, time) AS last_close --the latest value FROM stock_prices GROUP BY period;"
        ,"SELECT time_bucket('1 day', time) AS period,t2.name, --from the second metadata table t2.industry, --from the second metadata table AVG(close) AS avg_close,last(close, time) AS last_close --the latest value FROM stock_prices t1 INNER JOIN stock_tickers t2 on t1.ticker = t2.ticker GROUP BY period,t2.name,t2.industry;"
        ]

In [78]:
def get_data(query:str):
    with psycopg2.connect(
        host='localhost',
        #host=os.getenv('POSTGRES_HOST'),
        port=os.getenv("POSTGRES_PORT"), 
        dbname=os.getenv("POSTGRES_DB"), 
        user=os.getenv("POSTGRES_USER"), 
        password=os.getenv("POSTGRES_PASSWORD"), 
        connect_timeout=5
        ) as conn:
            with conn.cursor() as cursor:
                cursor.execute(query)
                return cursor.fetchall()


In [59]:
queries[:2]

["SELECT * FROM stock_prices WHERE time > (now() - interval '14 days') ORDER BY time, ticker;",
 "SELECT ticker,time_bucket('1 day', time) AS period,AVG(open) as open, AVG(high) AS high, AVG(low) AS low,AVG(close) AS close, AVG(close_adj) as close_adj AVG(volume) AS volume FROM stock_prices GROUP BY ticker,time_bucket('1 day', time) ORDER BY ticker,time_bucket('1 day', time);"]

In [108]:
result=get_data(queries[1])

UndefinedFunction: function max(numeric, timestamp without time zone) does not exist
LINE 1: ..., time) AS period,ticker,first(open,time) as open,max(high,t...
                                                             ^
HINT:  No function matches the given name and argument types. You might need to add explicit type casts.


In [99]:
cols:list[str]=['time','ticker','open','high','low','close','close_adj','volume']
fm={x:lambda v:f'{v:.2f}' for x in cols[2:]}

In [106]:
r=pd.DataFrame(result, columns=cols)
r.head().style.format(fm)

Unnamed: 0,time,ticker,open,high,low,close,close_adj,volume
0,2021-07-01 00:00:00,CVX,106.95,106.68,106.09,106.21,106.21,9024287.0
1,2021-07-02 00:00:00,CVX,105.7,106.06,105.58,106.07,106.07,5812730.0
2,2021-07-06 00:00:00,CVX,105.97,104.44,103.78,103.99,103.99,8561964.0
3,2021-07-07 00:00:00,CVX,103.96,103.43,102.58,102.94,102.94,7430728.0
4,2021-07-08 00:00:00,CVX,101.5,102.71,102.02,102.62,102.62,7684330.0
