In [139]:
# 1. Data Ingestion.

import yfinance as yf
from datetime import date

# Define the stock symbol and date range.
symbol = "AAPL"
start_date = "2022-09-08"
end_date = str(date.today())

# Download historical data.
data = yf.download(symbol, start=start_date, end=end_date)

# Save data to a CSV file.
data.to_csv(f"{symbol}_historical_data.csv")

[*********************100%%**********************]  1 of 1 completed


In [140]:
# 2. API Integration.

# Mock API is implemented using Django framework. See "api" folder.
# Endpoints:
# 1. POST http://{host}/api/stock-data
# payload: JSON
# 2. GET http://{host}/api/stock-data
# param: start_date. e.g. "2023-09-01" (optional)

import pandas as pd

df = pd.read_csv(f"{symbol}_historical_data.csv")
df.head()

# Add column "symbol" just for informative purpose in case we ingest multiple data for multiple symbols.
df["symbol"] = symbol

# Rename columns to be insync with API parameters.
df = df.rename(columns={"Date": "date", "Open": "open", "High": "high", "Low":"low", "Close":"close", "Adj Close": "adj_close", "Volume":"volume"})

# Convert to JSON.
stock_data = df.to_json(orient='records')

stock_data # Show data

'[{"date":"2022-09-08","open":154.6399993896,"high":156.3600006104,"low":152.6799926758,"close":154.4600067139,"adj_close":153.5490875244,"volume":84923800,"symbol":"AAPL"},{"date":"2022-09-09","open":155.4700012207,"high":157.8200073242,"low":154.75,"close":157.3699951172,"adj_close":156.4419250488,"volume":68028800,"symbol":"AAPL"},{"date":"2022-09-12","open":159.5899963379,"high":164.2599945068,"low":159.3000030518,"close":163.4299926758,"adj_close":162.4661865234,"volume":104956000,"symbol":"AAPL"},{"date":"2022-09-13","open":159.8999938965,"high":160.5399932861,"low":153.3699951172,"close":153.8399963379,"adj_close":152.9327392578,"volume":122656600,"symbol":"AAPL"},{"date":"2022-09-14","open":154.7899932861,"high":157.1000061035,"low":153.6100006104,"close":155.3099975586,"adj_close":154.3940582275,"volume":87965400,"symbol":"AAPL"},{"date":"2022-09-15","open":154.6499938965,"high":155.2400054932,"low":151.3800048828,"close":152.3699951172,"adj_close":151.4714202881,"volume":9048

In [182]:
# Try to upload to the API.
import requests
import json

url = "http://54.80.105.153:8000/api/stock-data"

payload = stock_data
headers = {
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text) # Show data

[{"id":1516,"symbol":"AAPL","date":"2022-09-08","open":"154.63999938960000000000","high":"156.36000061040000000000","low":"152.67999267580000000000","close":"154.46000671390000000000","adj_close":"153.54908752440000000000","volume":84923800,"cdate":"2023-09-12T04:42:02.667607Z"},{"id":1517,"symbol":"AAPL","date":"2022-09-09","open":"155.47000122070000000000","high":"157.82000732420000000000","low":"154.75000000000000000000","close":"157.36999511720000000000","adj_close":"156.44192504880000000000","volume":68028800,"cdate":"2023-09-12T04:42:02.674647Z"},{"id":1518,"symbol":"AAPL","date":"2022-09-12","open":"159.58999633790000000000","high":"164.25999450680000000000","low":"159.30000305180000000000","close":"163.42999267580000000000","adj_close":"162.46618652340000000000","volume":104956000,"cdate":"2023-09-12T04:42:02.680954Z"},{"id":1519,"symbol":"AAPL","date":"2022-09-13","open":"159.89999389650000000000","high":"160.53999328610000000000","low":"153.36999511720000000000","close":"153.

In [183]:
# 3. ETL Operation.

# Extracts the data points from API into DataFrame.

import requests
import json

url = "http://54.80.105.153:8000/api/stock-data"

headers = {
  'Content-Type': 'application/json'
}

response = requests.request("GET", url, headers=headers)

data = json.loads(response.text)
df = pd.DataFrame(data)
df # Show data.

Unnamed: 0,id,symbol,date,open,high,low,close,adj_close,volume,cdate
0,1516,AAPL,2022-09-08,154.63999938960000000000,156.36000061040000000000,152.67999267580000000000,154.46000671390000000000,153.54908752440000000000,84923800,2023-09-12T04:42:02.667607Z
1,1517,AAPL,2022-09-09,155.47000122070000000000,157.82000732420000000000,154.75000000000000000000,157.36999511720000000000,156.44192504880000000000,68028800,2023-09-12T04:42:02.674647Z
2,1518,AAPL,2022-09-12,159.58999633790000000000,164.25999450680000000000,159.30000305180000000000,163.42999267580000000000,162.46618652340000000000,104956000,2023-09-12T04:42:02.680954Z
3,1519,AAPL,2022-09-13,159.89999389650000000000,160.53999328610000000000,153.36999511720000000000,153.83999633790000000000,152.93273925780000000000,122656600,2023-09-12T04:42:02.687055Z
4,1520,AAPL,2022-09-14,154.78999328610000000000,157.10000610350000000000,153.61000061040000000000,155.30999755860000000000,154.39405822750000000000,87965400,2023-09-12T04:42:02.693525Z
...,...,...,...,...,...,...,...,...,...,...
247,1763,AAPL,2023-09-01,189.49000549320000000000,189.91999816890000000000,188.27999877930000000000,189.46000671390000000000,189.46000671390000000000,45732600,2023-09-12T04:42:04.617640Z
248,1764,AAPL,2023-09-05,188.27999877930000000000,189.97999572750000000000,187.61000061040000000000,189.69999694820000000000,189.69999694820000000000,45280000,2023-09-12T04:42:04.623193Z
249,1765,AAPL,2023-09-06,188.39999389650000000000,188.85000610350000000000,181.47000122070000000000,182.91000366210000000000,182.91000366210000000000,81755800,2023-09-12T04:42:04.627930Z
250,1766,AAPL,2023-09-07,175.17999267580000000000,178.21000671390000000000,173.53999328610000000000,177.55999755860000000000,177.55999755860000000000,112488800,2023-09-12T04:42:04.633127Z


In [184]:
# Transforms the data by cleaning, handling missing values, etc.

# 1. Remove rows is any null values detected.
df = df.dropna()

# 2. Remove duplicates.
df = df.drop_duplicates()

# Create new column "next_open" as the target variable.
df['next_open'] = df['open'].shift(-1)  # Shift opening prices by one day to get the next day's opening price.
df.dropna(inplace=True) # Remove the last row with NaN values.

#  Normalize some of the values.
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
df['open'] = scaler.fit_transform(df[['open']])
df['high'] = scaler.fit_transform(df[['high']])
df['low'] = scaler.fit_transform(df[['low']])
df['close'] = scaler.fit_transform(df[['close']])
df['adj_close'] = scaler.fit_transform(df[['adj_close']])
df['volume'] = scaler.fit_transform(df[['volume']])

# Drop some columns before inserting into PostgreSQL table.
df = df.drop(columns=['id', 'cdate'], errors='ignore')

df # Show data.

Unnamed: 0,symbol,date,open,high,low,close,adj_close,volume,next_open
0,AAPL,2022-09-08,0.407660,0.405762,0.400928,0.412152,0.405324,0.401080,155.47000122070000000000
1,AAPL,2022-09-09,0.419479,0.426483,0.430038,0.452891,0.445673,0.274339,159.58999633790000000000
2,AAPL,2022-09-12,0.478143,0.517882,0.494023,0.537729,0.529698,0.551354,159.89999389650000000000
3,AAPL,2022-09-13,0.482557,0.465087,0.410631,0.403472,0.396728,0.684137,154.78999328610000000000
4,AAPL,2022-09-14,0.409796,0.416265,0.414006,0.424052,0.417110,0.423897,154.64999389650000000000
...,...,...,...,...,...,...,...,...,...
246,AAPL,2023-08-31,0.880393,0.870707,0.890311,0.879882,0.884023,0.220070,189.49000549320000000000
247,AAPL,2023-09-01,0.903887,0.882061,0.901561,0.902142,0.906201,0.107081,188.27999877930000000000
248,AAPL,2023-09-05,0.886658,0.882912,0.892139,0.905502,0.909548,0.103686,188.39999389650000000000
249,AAPL,2023-09-06,0.888367,0.866875,0.805794,0.810444,0.814843,0.377314,175.17999267580000000000


In [185]:
# Loads the data into a PostgreSQL database.

# Table schema:
# CREATE TABLE stock_price (
#     id serial4 NOT NULL,
#     symbol varchar(50) NULL,
#     date date NULL,
#     open numeric(30,20) NULL,
#     next_open numeric(30,20) NULL,
#     high numeric(30,20) NULL,
#     low numeric(30,20) NULL,
#     close numeric(30,20) NULL,
#     adj_close numeric(30,20) NULL,
#     volume numeric(10,5) NULL,
#     cdate timestamptz NOT null default current_timestamp,
#     CONSTRAINT stock_price_pkey PRIMARY KEY (id)
# );

import psycopg2
from sqlalchemy import create_engine
import pandas as pd

# Assume PostgreSQL is installed locally and table has been created.
conn_string = 'postgresql://postgres:password123@localhost:5432/local'
 
# Create an engine instance.
db = create_engine(conn_string)

# Insert data.
df.to_sql('stock_price', db, if_exists='append', index=False)

# Disconnect.
db.dispose()

In [186]:
# 4. Data Pipeline.
# Create a real-time data pipeline that automatically pushes new stock data points into the machine learning model. 
# You can use simple timestamping for the demonstration of real-time data handling.

import os.path
import pandas as pd
from sqlalchemy import create_engine

# Try to read latest timestamp from file system. If exists then we will use the timestamp to query the data
# otherwise query all data for the first time.
last_timestamp_path = "last_timestamp.txt"
ts = None
if os.path.isfile(last_timestamp_path):
    last_timestamp = open(last_timestamp_path, "r")
    ts = last_timestamp.read()
    last_timestamp.close()

# Create a database connection.
conn_string = 'postgresql://postgres:password123@localhost:5432/local'
db = create_engine(conn_string)

# Read data from PostgreSQL table into a pandas DataFrame.
if not ts:
    query = "SELECT open, close, volume, next_open FROM stock_price"
else:
    query = f"SELECT open, close, volume, next_open FROM stock_price where cdate > '{ts}'"
df = pd.read_sql(query, db)

# Close the database connection.
db.dispose()

# Write new timestamp.
from datetime import datetime
now = datetime.utcnow() # Current date and time in UTC since Postgres compares in UTC by default.
date_time = now.strftime("%Y-%m-%d %H:%M:%S %z")
with open('last_timestamp.txt', 'w') as file:
    file.write(date_time)

# Now we can work with the DataFrame.
df.head()

Unnamed: 0,open,close,volume,next_open
0,0.40766,0.412152,0.40108,155.470001
1,0.419479,0.452891,0.27434,159.589996
2,0.478143,0.537729,0.55135,159.899994
3,0.482557,0.403472,0.68414,154.789993
4,0.409796,0.424052,0.4239,154.649994


In [188]:
# Step 2: Feature Engineering.

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

# Define features and target variable
features = ['open', 'close', 'volume']  # For this demonstration using only open, close, volume features.
target = 'next_open'

# Step 3: Data Splitting.
x = df[features]
y = df[target]
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)

In [189]:
# Step 4: Model Training
model = LinearRegression()
model.fit(x_train, y_train)

LinearRegression()

In [190]:
# Step 5: Model Evaluation
y_pred = model.predict(x_test)
mse = mean_squared_error(y_test, y_pred)
print(f'Mean Squared Error: {model.intercept_}')
print(f'Coef: {model.coef_}')
print(f'Score: {model.score(x_test, y_test)}')

Mean Squared Error: 125.68689929421585
Coef: [ 2.47172331 68.16647909 -1.04217596]
Score: 0.9945134362914513


In [191]:
# Step 6: Prediction.
next_day_open = model.predict(x.tail(1))
print(f'Predicted Next Day Opening Price: {next_day_open[0]}')

Predicted Next Day Opening Price: 176.92345809198187
