In [1]:
from datetime import datetime, date, time

DB_NAME = "stockdata.sqlite3"

# Use an incredibly generic User-Agent to get around the site not allowing downloads
GENERIC_HEADER = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0'}

YAHOO_API_URL = 'https://query1.finance.yahoo.com/v7/finance/download/'
ASX_LISTINGS_URL = "https://asx.api.markitdigital.com/asx-research/1.0/companies/directory/file"

HISTORY_START = int(datetime(hour=11, minute=0, day=29, month=1, year=1988).timestamp())
TODAY = int(datetime.combine(date.today(), time(0, 10, 0)).timestamp())
PARAMS = [f'period1={HISTORY_START}', f'period2={TODAY}', 'interval=1d', 'events=history', 'includeAdjustedClose=true']

### Get a listing of all the companies
Request a list of all currently listed companies from the ASX website (we'll deal with historical listings later)

In [2]:
import requests
from io import StringIO
import pandas as pd

resp = requests.get(ASX_LISTINGS_URL, headers=GENERIC_HEADER)
if not resp.ok:
	raise ValueError("Failed to retrieve data!")

# Convert all the csv data into a dataframe
with StringIO(resp.text) as f:
	df = pd.read_csv(f)

# Make dataframe look nicer
companies = df.rename(columns={
	"ASX code": "Code",
	"Company name": "Name",
	"GICs industry group": "Sector",
	"Listing date": "Start Date",
	"Market Cap": "Value",
}).set_index("Code")

companies

Unnamed: 0_level_0,Name,Sector,Start Date,Value
Code,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
14D,1414 DEGREES LIMITED,Capital Goods,12/09/2018,17624471
1AD,ADALTA LIMITED,"Pharmaceuticals, Biotechnology & Life Sciences",22/08/2016,14294964
1AE,AURORA ENERGY METALS LIMITED,Materials,18/05/2022,13608844
1AG,ALTERRA LIMITED,"Food, Beverage & Tobacco",16/05/2008,4310732
1AI,ALGORAE PHARMACEUTICALS LIMITED,"Pharmaceuticals, Biotechnology & Life Sciences",01/09/2004,16873947
...,...,...,...,...
ZLD,ZELIRA THERAPEUTICS LIMITED,"Pharmaceuticals, Biotechnology & Life Sciences",28/07/2003,8737309
ZMI,ZINC OF IRELAND NL,Materials,18/09/2007,2344587
ZMM,ZIMI LIMITED,Technology Hardware & Equipment,10/09/2007,1899971
ZNC,ZENITH MINERALS LIMITED,Materials,29/05/2007,19380949


In [3]:
# Function to manually fetch data
import requests
import pandas as pd
from io import StringIO

def data_fetcher(query: str) -> pd.DataFrame:
		resp = requests.get(f'{YAHOO_API_URL}{query}?{'&'.join(PARAMS)}', headers=GENERIC_HEADER)

		if not resp.ok or 'text/csv' not in resp.headers['content-type']:
			raise requests.HTTPError("Getting response failed!")
		
		with StringIO(resp.text) as f:
			frame = pd.read_csv(f)
		
		return frame

In [4]:
#import psycopg2 as sql
import sqlite3 as sql

db = sql.connect(DB_NAME, check_same_thread=False)
# yes not using check_same_thread is an irresponsible decision, bite me sub

In [5]:
with db:
	cur = db.cursor()
	cur.execute(
		"SELECT name\n"
		"FROM sqlite_master\n"
		"WHERE type = 'table';"
	)
	tables = [name for name,*_ in cur.fetchall()]

	if 'stock' not in tables:
		print("Creating stock table")
		cur.execute(
			'CREATE TABLE stock (\n'
			'	sid INTEGER PRIMARY KEY,\n'
			'	ticker CHAR(5) NOT NULL,\n'
			'	name TEXT,\n'
			'	start DATE,\n'
			'	active BOOL NOT NULL\n'
			');'
		)
	
	if 'stockprice' not in tables:
		print("Creating stockprice table")
		cur.execute(
			'CREATE TABLE "stockprice" (\n'
			'	tid INTEGER PRIMARY KEY AUTOINCREMENT,\n'
			'   stock INTEGER NOT NULL,\n'
			'	rdate DATE NOT NULL,\n'
			'	open NUMERIC(16, 8),\n'
			'	close NUMERIC(16, 8),\n'
			'	adj_close NUMERIC(16, 8),\n'
			'	high NUMERIC(16, 8),\n'
			'	low NUMERIC(16, 8),\n'
			'	volume INTEGER,\n'
			'	FOREIGN KEY (stock) REFERENCES stock(sid)'
			');'
		)

Creating stock table
Creating stockprice table


In [6]:
from datetime import datetime

# Add or update active companies in the database
with db:
	cur = db.cursor()

	# First find all active companies that are currently in the database
	existing_stock = cur.execute(
		'SELECT sid, ticker\n'
		'FROM stock\n'
		'WHERE active = true'
	).fetchall()
	
	new_active = set(companies.index)

	# Perform updates to which stock are active (if any exist already)
	if len(existing_stock):
		for id, ticker in existing_stock:
			if ticker not in new_active:
				print(f"Setting {ticker} to inactive")
				cur.execute(
					'UPDATE stock\n'
					'SET active = false\n'
					'WHERE sid = ?',
					(id,)
				)
	
	# Add any new stock into the table
	existing_stock = { ticker for _, ticker in existing_stock }
	for ticker, name, _, start, _ in companies.itertuples():
		if ticker in existing_stock:
			continue

		
		
		cur.execute(
			'INSERT INTO stock (ticker, name, start, active)\n'
			'VALUES (?, ?, ?, ?)',
			(ticker, name, datetime.strptime(start, "%d/%M/%Y").date(), True)
		)


  cur.execute(


In [7]:
# Manual function to submit data to the database (not really used, but there for testing)
import numpy as np

def load_history(db:sql.Connection, ticker:str, table:pd.DataFrame):
	with db:
		cur = db.cursor()

		# Ensure stock actually exists in the database
		try:
			cur.execute(
				'SELECT sid, active\n'
				'FROM stock\n'
				'WHERE ticker = ?',
				(ticker,)
			)
			id, active = cur.fetchone()
		except TypeError:
			raise ValueError(f"{ticker} stock entry does not exist")

		cur.execute(
			'SELECT sid, COUNT(ticker)\n'
			'FROM stockprice\n'
			'WHERE ticker = %s\n'
			'GROUP BY ticker;',
			(ticker,)
		)

		entries = cur.fetchone()
		if entries is None:
			print(f"Adding entries for {ticker}")
		elif entries[0] == id:
			db.rollback()
			raise ValueError("History for this stock already exists!")

		cur.executemany(
			'INSERT INTO stockprice (stock, rdate, open, close, adj_close, high, low, volume)\n'
			'VALUES (%s, %s, %s, %s, %s, %s, %s, %s)',
			[
				(id, rdate, open, high, low, close, aclose, int(vol if not np.isnan(vol) else 0))
				for _, rdate, open, high, low, close, aclose, vol in table.itertuples()
			]
		)

def load_single(db:sql.Connection, ticker, rdate, open, high, low, close, aclose, vol):
	with db:
		cur = db.cursor()

		# Ensure stock actually exists in the database
		try:
			cur.execute(
				'SELECT sid, active\n'
				'FROM stock\n'
				'WHERE ticker = ?',
				(ticker,)
			)
			id, active = cur.fetchone()
		except TypeError:
			raise ValueError(f"{ticker} stock entry does not exist")
		
		cur.execute(
			'INSERT INTO stockprice (stock, rdate, open, close, adj_close, high, low, volume)\n'
			'VALUES (%s, %s, %s, %s, %s, %s, %s)',
			(id, rdate, open, high, low, close, aclose, vol)
		)

def fetch_and_store_table(db:sql.Connection, tick):
	df = data_fetcher(f'{tick}.AX')
	load_history(db, tick, df)

### Parse all listed companies

In [8]:
from queue import Queue
from threading import Thread, Lock
from time import sleep
from tqdm import tqdm

def worker(q: Queue, db:sql.Connection, db_sync:Lock, bar:tqdm):
	with requests.session() as conn:
		cur = db.cursor()
		conn.headers.update(GENERIC_HEADER)

		fetch = lambda query: conn.get(f'{YAHOO_API_URL}{f"{query}.AX"}?{'&'.join(PARAMS)}')

		while (query := q.get()) is not None:
			for _ in range(3):
				if (resp := fetch(query)).ok:
					break

				if not resp.status_code == 404:
					bar.set_description(f"{query}: Warning, server denied permission!")
					sleep(30)
				else:
					bar.set_description(f"{query}: Not Found!")
					sleep(3)
			else:
				print(f"Retries for {query} expired, skipping...")
				q.task_done()
				continue
			
			with StringIO(resp.text) as f:
				df = pd.read_csv(f)
				if any(x not in df.columns for x in ('Date', 'Open', 'Close', 'High', 'Low', 'Volume')):
					bar.set_description(f"{query}: format didn't match!")
					q.task_done()
					continue
				else:
					bar.set_description(f"{query}: Adding to DB...")
			
			with db_sync, db:
				# Get the id of the stock from the database
				try:
					id, = cur.execute(
						'SELECT sid\n'
						'FROM stock\n'
						'WHERE ticker = ?',
						(query,)
					).fetchone()
				except ValueError:
					print("Failed to find stock in database!")
					q.task_done()
					continue

				cur.executemany(
						'INSERT INTO stockprice (stock, rdate, open, close, adj_close, high, low, volume)\n'
						'VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
						[
							(id, rdate, open, high, low, close, aclose, int(vol if not np.isnan(vol) else 0))
							for _, rdate, open, high, low, close, aclose, vol in df.itertuples()
						]
					)
			
			q.task_done()
		
		q.put(None)
		return

In [9]:
from string import ascii_uppercase as uppers
from itertools import product

NUM_THREADS = 10
q = Queue(maxsize=NUM_THREADS)

# Create workers, progress bar and threads
work = tqdm([x for x in companies.index])
lock = Lock()
workers = [Thread(target=worker, args=(q, db, lock, work), daemon=True) for n in range(NUM_THREADS)]
for worker in workers:
	worker.start()

# Feed work items through to the threads
for item in work:
	q.put(item)

# Finally send the stop signal, and wait for all the workers to die
q.put(None)
for t in workers:
	t.join()

ICE: Adding to DB...:  46%|████▌     | 914/1978 [00:42<02:02,  8.72it/s]                   

Retries for FRXN expired, skipping...


MNB: Adding to DB...:  60%|██████    | 1195/1978 [01:12<01:14, 10.47it/s]                   

Retries for M2RN expired, skipping...


TEE: Adding to DB...:  90%|████████▉ | 1778/1978 [02:07<00:13, 15.25it/s]                   

Retries for HIO expired, skipping...
Retries for HIQ expired, skipping...
Retries for HLS expired, skipping...
Retries for HLX expired, skipping...


TG6: Adding to DB...:  90%|█████████ | 1783/1978 [02:07<00:12, 15.22it/s]                   

Retries for HM1 expired, skipping...
Retries for HMD expired, skipping...


Z2U: Adding to DB...: 100%|██████████| 1978/1978 [02:26<00:00, 13.51it/s]                   


Retries for TEG expired, skipping...
Retries for TEK expired, skipping...
Retries for TEM expired, skipping...
Retries for TFL expired, skipping...
Retries for TGH expired, skipping...
Retries for TGP expired, skipping...
