In [0]:
BLOB_STORAGE_ACCOUNT_NAME = "pocspirelake"
BLOB_CONTAINER_URL = f"{BLOB_STORAGE_ACCOUNT_NAME}.blob.core.windows.net"

STORAGE_ACCOUNT_ACCESS_KEY_NAME = f"fs.azure.account.key.{BLOB_CONTAINER_URL}"

# Read secret from Azure Key Vault–backed scope
ACCOUNT_KEY = dbutils.secrets.get(scope = "spireitopsStorageSecret", key = "dbstogare-acckey")

# Config to access storage account
spark.conf.set(STORAGE_ACCOUNT_ACCESS_KEY_NAME, ACCOUNT_KEY)

In [0]:
isoRequestLogPath = f"wasbs://silver@{BLOB_CONTAINER_URL}/bimrailsdb/iso_request_log.parquet"
isoRequestLogOutputDirPath = f"wasbs://gold@{BLOB_CONTAINER_URL}/bimrailsdb"

# Read the Parquet file with the defined schema
isoRequestLog_df = spark.read.parquet(isoRequestLogPath)
#print(f"Columns count: {len(isoRequestLog_df.columns)}")
#print(f"Rows count: {isoRequestLog_df.count()}")
#df.display()

In [0]:
from databricks.sdk.runtime import dbutils
from pyspark.sql import DataFrame

def write_single_parquet_file(df: DataFrame, fileName: str, output_path: str):
    print(f"Writing '{fileName}.parquet'")
    filepath = f"{output_path}/{fileName}.parquet"
    temp_filepath = f"{output_path}/temp_{fileName}.parquet"

    # Write a temporary folder containing one Parquet file
    print("Write a temporary folder containing one Parquet file")
    df.repartition(1).write.format("parquet").save(temp_filepath, mode="overwrite")

    # Find the Parquet file in the temporary folder
    files = dbutils.fs.ls(temp_filepath)
    output_file = next(x for x in files if x.name.startswith("part-"))

    print("Delete filepath, if it exists")
    # Delete filepath, if it exists
    try:
        dbutils.fs.ls(filepath)
        # Filepath exists
        print(f"Deleting old {filepath}")
        dbutils.fs.rm(filepath, recurse=True)
    except Exception as e:
        if "java.io.FileNotFoundException" not in str(e):
            raise e

    # Move the Parquet file to the final location
    dbutils.fs.mv(output_file.path, filepath)

    # Delete temporary folder
    print("Delete temporary folder")
    dbutils.fs.rm(temp_filepath, True)
    print(f"Wrote '{filepath}'")

In [0]:
from pyspark.sql.functions import col

write_single_parquet_file(isoRequestLog_df, "iso_request_log", isoRequestLogOutputDirPath)
write_single_parquet_file(isoRequestLog_df.filter(col('ElapseTime') < 7000), "iso_request_log_less_then_seven_second", isoRequestLogOutputDirPath)
write_single_parquet_file(isoRequestLog_df.filter((col('ElapseTime') >= 7000) & (col('ElapseTime') <= 10000)), "iso_request_log_seven_to_ten_second", isoRequestLogOutputDirPath)
write_single_parquet_file(isoRequestLog_df.filter(col('ElapseTime') >= 10000), "iso_request_log_more_than_ten_second", isoRequestLogOutputDirPath)