In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, isnull
import pandas as pd
import requests
import json

In [0]:
TABLE_NAME = "finance_catalog.db_landing.src_raw_index_keys"

In [0]:
base_url = "https://www.sec.gov/files/company_tickers_exchange.json"

headers = {
    "User-Agent": "hello@hotmail.com",
    "Accept-Encoding": "gzip, deflate"
}

response = requests.get(base_url, headers=headers)
data = response.json()

columns = data['fields']
values = data['data']

In [0]:
df = pd.DataFrame(values, columns=columns)

spark_df = (
  spark
      .createDataFrame(df)
      .select(
        F.lpad(F.col("cik"), 10, '0').alias("cik"),
        F.regexp_replace("ticker", "-", ".").alias("ticker"), 
        "name", 
        "exchange"
        )
)

In [0]:
sic_data = []

for row in spark_df.select("cik", "ticker").collect():
    cik = row["cik"]
    ticker = row["ticker"]
    url = f"https://data.sec.gov/submissions/CIK{cik}.json"
    response = requests.get(url, headers=headers)
    
    if response.status_code == 200:
        data = response.json()
        sic_data.append((cik, ticker, data.get("sic", None)))
    else:
        sic_data.append((cik, ticker, None))

sic_df = spark.createDataFrame(sic_data, ["cik", "ticker", "sic"])

(
    spark_df
            .join(sic_df, on=["cik", "ticker"], how="left")
            .write
            .format("delta")
            .mode("overwrite")
            .saveAsTable(TABLE_NAME)
)