In [1]:
# !pip install --upgrade firebase-admin

## Imports

In [2]:
# To make a request to get the company tickers file
import requests
# To access local files
import os
from pathlib import Path

# For DataFrame
import pandas as pd

# For type hints
from typing import List

# For regular expression matching
import re
# Handle json tasks
import json

# For firebase access
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore

## Constants

In [3]:
# Data Path
DATA_PATH = 'data'
# Config path
CONFIG_PATH = 'config'

# Constant for Form 10-K
FORM_10K = '10-K'
# Constant for Form 10-Q
FORM_10Q = '10-Q'
# Forms in scope
FORMS_SCOPE = [FORM_10K, FORM_10Q]

# List of Submissions fields we are interersted
SUB_DTYPES = {'adsh':str,'cik':'int32','name':str,'sic':str,'countryba':str,'fye':str,
              'form':str,'period':str,'fy':'str','fp':str,'filed':str,'accepted':str}
# Listy of Numbers fields we are interested
NUM_DTYPES = {'adsh':str,'tag':str,'version':str,'ddate':str,'qtrs':'int8','uom':str,'value':str}

# Symbols we are interested
SYMBOLS = ['GOOG','NVDA','ADBE', 'MSFT','AMZN','TSLA','WMT']

# Taxonomies we are considering
TAXONOMIES = ['dei', 'us-gaap']

# Qtrs we are interested in
QTRS_SCOPE = [0, 1, 4]
# Regular expression to pass taxonomies
TAX_RE = re.compile(f"({'|'.join(element for element in TAXONOMIES)})/*")

# Batch size
BATCH_SIZE = 499

# Collection names
SUB_COLLECTION = 'sub'
NUM_COLLECTION = 'num'

## Get symbol, cik mapping
### Reference: __[Access Companies SEC Filings Using Python](https://medium.datadriveninvestor.com/access-companies-sec-filings-using-python-760e6075d3ad)__
### json file used in this method -> https://www.sec.gov/files/company_tickers.json

In [4]:
def get_company_tickers() -> pd.DataFrame:
    '''
    Returns a DataFrame consists of CIK, ticker symbols
    
    Returns:
    pd.DataFrame: a DataFrame consists of CIK, ticker symbols or None for any errors
    '''
    url = 'https://www.sec.gov/files/company_tickers.json'
    response = requests.get(url, headers={
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
    })
    df = pd.json_normalize(pd.json_normalize(response.json(), max_level=0).to_numpy()[0])
    df.set_index("ticker",inplace=True)
    return df

In [5]:
tickers_cik = get_company_tickers()
tickers_cik

Unnamed: 0_level_0,cik_str,title
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1
MSFT,789019,MICROSOFT CORP
AAPL,320193,Apple Inc.
NVDA,1045810,NVIDIA CORP
AMZN,1018724,AMAZON COM INC
GOOGL,1652044,Alphabet Inc.
...,...,...
BAYAR,1969475,Bayview Acquisition Corp
QETAR,1978528,Quetta Acquisition Corp
QETAU,1978528,Quetta Acquisition Corp
NETDW,1975218,Nabors Energy Transition Corp. II


## Maps CIKs to Tickers in SYMBOLS

In [6]:
# Maps CIK -> ticker
cik_ticker_dict = {}
for symbol in SYMBOLS:
    # Only interested in CIK
    cik_ticker_dict[tickers_cik.loc[symbol]['cik_str']] = symbol
cik_ticker_dict

{1652044: 'GOOG',
 1045810: 'NVDA',
 796343: 'ADBE',
 789019: 'MSFT',
 1018724: 'AMZN',
 1318605: 'TSLA',
 104169: 'WMT'}

## Find CIK for Ticker

In [7]:
# def get_cik(ticker:str) -> str:
#     '''
#     Returns the CIK associate with the stock symbol
    
#     Parameters:
#     ticker (str): the ticker symbol for a stock
    
#     Returns:
#     str: CIK associated with given ticket symbol or None if ticker is not part of the SYMBOLS constant
#     '''
#     if ((ticker not in SYMBOLS) or (ticker not in tickers_cik.index)):
#         return None
#     return tickers_cik.loc[ticker]['cik_str']

## Firestore DB access

In [8]:
# Use a service account.
cred = credentials.Certificate(f"{os.path.join(CONFIG_PATH,'keys.json')}")
app = firebase_admin.initialize_app(cred)
db = firestore.client()

## Helper method to batch data
### Reference: __[Importing data into Firestore using Python](https://medium.com/@cbrannen/importing-data-into-firestore-using-python-dce2d6d3cd51)__

In [9]:
def batch_data(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]

## Add a DF to firestore

In [10]:
def add_df(df:pd.DataFrame, name:str):
    result = df.to_json(orient='records')
    parsed = json.loads(result)
    
    for idx, batched_data in enumerate(batch_data(parsed, BATCH_SIZE)):
        batch = db.batch()
        for data_item in batched_data:
            doc_ref = db.collection(name).document()
            batch.set(doc_ref, data_item)
        batch.commit()
        print(f'Committed batch - {idx}')

## Load Submissions Data Set

In [11]:
def read_subs(filename:str, ciks:List) -> pd.DataFrame:
    # Derive the dataset based on the filename
    _,dataset,_ = filename.split(os.path.sep)
    try:
        # Read data with pandas
        df = pd.read_csv(filename, sep='\t', dtype=SUB_DTYPES, usecols=SUB_DTYPES.keys())
    except Exception as error:
        print("An error occurred:", error, filename)
        # if this fails create an empty pandas dataframe with the same SUB_DTYPES as the good data
        df = pd.read_csv(io.StringIO(''), dtype=SUB_DTYPES, usecols=SUB_DTYPES.keys())

    # Custom field - adds the dataset name
    df['dataset'] = dataset

    # Filter out any forms we are not interested for SUB
    df = df.query('(form in @FORMS_SCOPE) & (cik in @ciks)')
    return df

## Create SUB Data Set

In [12]:
%%time
# List of files we are going to load
files = [f for f in Path(DATA_PATH).glob(os.path.join('20*q*', 'sub.*'))]
# List of CIKs we are interested
ciks = list(cik_ticker_dict.keys())
# List of adsh values for SUB
adsh_list = []
# Loop through files
for file in files:
    df = read_subs(str(file), ciks)

    # Set columns as per SUBs specification
    for key in ['period','filed','accepted']:
        df[key] = df[key].astype('datetime64[ns]')
        
    df['sic'] = df['sic'].astype('Int16')
    df['fy']= df['fy'].astype('Int16')
    # Add adsh of the current df to the list of adsh
    [adsh_list.append(x) for x in df['adsh'].to_list()]
    add_df(df=df, name=SUB_COLLECTION)

Committed batch - 0
Committed batch - 0
Committed batch - 0
Committed batch - 0
Committed batch - 0
Committed batch - 0
Committed batch - 0
Committed batch - 0
CPU times: user 1.05 s, sys: 169 ms, total: 1.22 s
Wall time: 1.93 s


## Load NUM Dataset

In [13]:
def read_nums(filename:str, sub_adsh:List) -> pd.DataFrame:
    # Derive the dataset based on the filename
    _,dataset,_ = filename.split(os.path.sep)
    try:
        # Read data with pandas
        df = pd.read_csv(filename, sep='\t', dtype=NUM_DTYPES, usecols=NUM_DTYPES.keys())
    except Exception as error:
        print("An error occurred:", error, filename)
        # if this fails create an empty pandas dataframe with the same NUM_DTYPES as the good data
        df = pd.read_csv(io.StringIO(''), dtype=NUM_DTYPES, usecols=NUM_DTYPES.keys())

    # Custom field - adds the dataset name
    df['dataset'] = dataset
    # Columns contains True if the record contains taxonomies in scope
    df['taxonomies'] = df['version'].apply(lambda x: TAX_RE.match(x) != None)
    # Filter out quarters, include adsh beloging to subs and records with taxonomies in scope
    df = df.query('(qtrs in @QTRS_SCOPE) & (adsh in @sub_adsh) & (taxonomies)')
    # Drop this column as we don't need it anymore
    df = df.drop(columns=['taxonomies'])
    return df

## Create NUM Data Set

In [21]:
%%time
# List of files we are going to load
# 2022, 2023 done
files = [f for f in Path(DATA_PATH).glob(os.path.join('2022q*', 'num.*'))]
# Loop through files
for file in files:
    df = read_nums(str(file), adsh_list)
    # Convert value to float
    df['value'] = df['value'].astype(float)
    # Convert to date time format
    df['ddate'] = df['ddate'].astype('datetime64[ns]')
    add_df(df=df, name=NUM_COLLECTION)

Committed batch - 0
Committed batch - 1
Committed batch - 2
Committed batch - 3
Committed batch - 4
Committed batch - 0
Committed batch - 1
Committed batch - 2
Committed batch - 3
Committed batch - 4
Committed batch - 5
Committed batch - 6
Committed batch - 7
Committed batch - 0
Committed batch - 1
Committed batch - 2
Committed batch - 3
Committed batch - 4
Committed batch - 0
Committed batch - 1
Committed batch - 2
Committed batch - 3
CPU times: user 18.6 s, sys: 1.65 s, total: 20.2 s
Wall time: 38.9 s


## Find out the collection size

In [15]:
def collection_size(name:str) -> List:
    # Count all docs in collection
    collection = db.collection(name)
    count_query = collection.count()
    return count_query.get()

In [22]:
res = collection_size(name=SUB_COLLECTION)
print(f'No of docs in {SUB_COLLECTION} collection: {res[0][0].value}')
res = collection_size(name=NUM_COLLECTION)
print(f'No of docs in {NUM_COLLECTION} collection: {res[0][0].value}')

No of docs in sub collection: 56
No of docs in num collection: 20036


## Helper method to delete a collection
### Reference: __[Delete data from Cloud Firestore](https://firebase.google.com/docs/firestore/manage-data/delete-data)__

In [17]:
def delete_collection(coll_ref, batch_size):
    if batch_size == 0:
        return

    docs = coll_ref.list_documents(page_size=batch_size)
    deleted = 0

    for doc in docs:
        # print(f"Deleting doc {doc.id} => {doc.get().to_dict()}")
        doc.delete()
        deleted = deleted + 1

    if deleted >= batch_size:
        return delete_collection(coll_ref, batch_size)

## Delete collections

In [18]:
# delete_collection(db.collection(SUB_COLLECTION), BATCH_SIZE)
# delete_collection(db.collection(NUM_COLLECTION), BATCH_SIZE)