In [0]:
%sql
SELECT * FROM workspace.raw.accession

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os

# Define path and table variables
volumes_path = "/Volumes/workspace/raw/accession"
uc_table = "workspace.raw.accession"

def ingest_accession_numbers():
    try:
        spark = SparkSession.builder.getOrCreate()
        # Read all .txt files from the Volumes path
        txt_files_df = spark.read.text(volumes_path + "/*.txt").withColumnRenamed("value", "id")
        txt_files_df = txt_files_df.dropDuplicates(["id"])

        # Read current accession numbers from UC table
        if spark.catalog.tableExists(uc_table):
            existing_df = spark.table(uc_table)
        else:
            existing_df = spark.createDataFrame([], txt_files_df.schema)

        # Find new accession numbers not already in the table
        new_df = txt_files_df.join(existing_df, on="id", how="left_anti")

        # Write only new accession numbers to the table, using append mode
        new_df_count = new_df.count()
        print(f"New accession numbers found: {new_df_count}")
        if new_df_count > 0:
            new_df.write.mode("append").saveAsTable(uc_table)
            print(f"Successfully ingested {new_df_count} new accession numbers in {uc_table}")
        else:
            print(f"All accession numbers already exist in {uc_table}")

    except Exception as e:
        print(f"Failed to ingest accession numbers with Error: {e}")

In [0]:
ingest_accession_numbers()

In [0]:
%sql
SELECT * FROM workspace.raw.accession

In [0]:
%sql
DELETE FROM workspace.raw.accession