# Data Ingestion for SEC 10-K Filings

In [0]:
from pyspark.sql.functions import col, current_timestamp

catalog_name = "financial_data"
schema_name = "lakehouse"
table_name = "bronze_filings"

datasets_path = f"/Volumes/{catalog_name}/{schema_name}/datasets"
files_path = f"/Volumes/{catalog_name}/{schema_name}/files"

In [0]:
def readStream_raw():
    filing_schema = StructType([
        StructField("cik", StringType(), True),
        StructField("company", StringType(), True),
        StructField("filing_type", StringType(), True),
        StructField("filing_date", StringType(), True),
        StructField("period_of_report", StringType(), True),
        StructField("sic", StringType(), True),
        StructField("state_of_inc", StringType(), True),
        StructField("state_location", StringType(), True),
        StructField("fiscal_year_end", StringType(), True),
        StructField("filing_html_index", StringType(), True),
        StructField("htm_filing_link", StringType(), True),
        StructField("complete_text_filing_link", StringType(), True),
        StructField("filename", StringType(), True),
        StructField("item_1", StringType(), True),
        StructField("item_1A", StringType(), True),
        StructField("item_1B", StringType(), True),
        StructField("item_2", StringType(), True),
        StructField("item_3", StringType(), True),
        StructField("item_4", StringType(), True),
        StructField("item_5", StringType(), True),
        StructField("item_6", StringType(), True),
        StructField("item_7", StringType(), True),
        StructField("item_7A", StringType(), True),
        StructField("item_8", StringType(), True),
        StructField("item_9", StringType(), True),
        StructField("item_9A", StringType(), True),
        StructField("item_9B", StringType(), True),
        StructField("item_10", StringType(), True),
        StructField("item_11", StringType(), True),
        StructField("item_12", StringType(), True),
        StructField("item_13", StringType(), True),
        StructField("item_14", StringType(), True),
        StructField("item_15", StringType(), True)
    ])

    raw_df = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", f"{files_path}/schema/bronze_ingestion")
        .schema(filing_schema)
        .load(f"{datasets_path}/sec_filings/*")
        .withColumn("source_file_path", col("_metadata.file_path"))
        .withColumn("ingest_timestamp", current_timestamp())
    )

    return raw_df


In [0]:
def writeStream_raw(raw_df):
    write = (
            raw_df.writeStream
            .option("checkpointLocation", f"{files_path}/checkpoint/bronze_ingestion")
            .option("mergeSchema", "true")
            .outputMode("append")
            .trigger(availableNow=True)
            .toTable(f"{catalog_name}.{schema_name}.{table_name}")
        )

In [0]:
df = readStream_raw()
writeStream_raw(df)