# Load Test Results
This notebook takes CSV files located in the raw folder, processes the files, and loads the test results in the TestResults table, and updates the ProjectInformation and Calendar tables.

## Parameters for this Notebook

In [19]:
workspace_id = "x3dbac5b-48be-41eb-83a7-6c178940703x"
lakehouse_id = "x754c80f-5f13-40b1-ab93-e4368da923cx"


StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 21, Finished, Available, Finished)

## Import Libraries
* Imports the pyspark libraries to support data cleansing
* Imports the delta tables and set merging capabilities
* Imports os for file and folder manipulations
* Imports notebook utilities to mount lakehouse for automation purposes

In [20]:
# Import Libraries
from pyspark.sql.functions import (
    col, to_date, to_timestamp, to_utc_timestamp, year, month, date_format, split,
    hour, minute, second, expr,
    lit, concat, row_number, when
)
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType, IntegerType

# For merging tables
from delta.tables import *
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# For file movement
import os

# For data calculations
from datetime import timedelta
from math import ceil

# Mount Lakehouse programmatically
from notebookutils import mssparkutils

StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 22, Finished, Available, Finished)

## Setup Lakehouse Connections
* Setup variables to support storing and processing test results
* Mount Lakehouse

In [21]:
# Thanks to Sandeep Pawar
# https://fabric.guru/how-to-mount-a-lakehouse-and-identify-the-mounted-lakehouse-in-fabric-notebook

# Set Path Information
folder_path = "https://onelake.dfs.fabric.microsoft.com/" + workspace_id + "/" + lakehouse_id + "/"
abfss_path = "abfss://" + workspace_id + "@onelake.dfs.fabric.microsoft.com/" + lakehouse_id
relative_path = "Files/DQVTests/raw"
processing_path = relative_path + "/processing/"
processed_path = relative_path + "/processed/"
mount_name = "/lakehouse/default"

# Mount Lakehouse
mssparkutils.fs.mount(abfss_path, mount_name)

mount_points = mssparkutils.fs.mounts()
local_path = next((mp["localPath"] for mp in mount_points if mp["mountPoint"] == mount_name), None)

print(local_path)

# Retrieve Tables
tables = os.listdir(local_path + "/Tables")


StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a388ce29-fdd7-4b9b-bad2-12986ddf75c6)

SynapseWidget(Synapse.DataFrame, 94d0bed7-1df3-4976-97ba-100160c618eb)

/synfs/notebook/06114e8d-f378-47a7-b2c1-bdd22ea1b5f7/lakehouse/default


## Create Folders
* Boostrapping code creates folders if that don't exist already

In [22]:
# Check if Raw directory exists, if not, create it
if not os.path.exists(local_path + "/" + relative_path):
    os.makedirs(local_path + "/" + relative_path)

if not os.path.exists(local_path + "/" + processed_path):
    os.makedirs(local_path + "/" + processed_path)

if not os.path.exists(local_path + "/" + processing_path):
    os.makedirs(local_path + "/" + processing_path)

StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 24, Finished, Available, Finished)

## Identify Files for processing

In [23]:
# Get list of file names from local directory
file_names = os.listdir(local_path + "/" + relative_path)

# Initialize lists for source and target file paths
target_file_names = []
source_file_names = []

# Iterate through file names
for i, item in enumerate(file_names):
    # Check if file ends with '.csv'
    if item.endswith('.csv'):
        # Construct source and target paths
        temp_source = abfss_path + "/" + relative_path + "/" + item
        temp_target = abfss_path + "/" + processing_path + item
        
        # Add paths to respective lists
        source_file_names.append(temp_source)
        target_file_names.append(temp_target)
        
        # Move files from source to target location
        mssparkutils.fs.mv(temp_source, temp_target, overwrite=True)
        continue

# Print lists of source and target file paths
print(source_file_names)
print(target_file_names)


StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 25, Finished, Available, Finished)

['abfss://c3dbac5b-48be-41eb-83a7-6c1789407037@onelake.dfs.fabric.microsoft.com/a754c80f-5f13-40b1-ab93-e4368da923c4/Files/DQVTests/raw/2024-06-25T13-53-03Z-5530fe89-403a-4a59-af46-83357d80839a.csv', 'abfss://c3dbac5b-48be-41eb-83a7-6c1789407037@onelake.dfs.fabric.microsoft.com/a754c80f-5f13-40b1-ab93-e4368da923c4/Files/DQVTests/raw/2024-06-25T13-55-07Z-c7518fe9-184b-4264-bb5d-04cd517ae28a.csv']
['abfss://c3dbac5b-48be-41eb-83a7-6c1789407037@onelake.dfs.fabric.microsoft.com/a754c80f-5f13-40b1-ab93-e4368da923c4/Files/DQVTests/raw/processing/2024-06-25T13-53-03Z-5530fe89-403a-4a59-af46-83357d80839a.csv', 'abfss://c3dbac5b-48be-41eb-83a7-6c1789407037@onelake.dfs.fabric.microsoft.com/a754c80f-5f13-40b1-ab93-e4368da923c4/Files/DQVTests/raw/processing/2024-06-25T13-55-07Z-c7518fe9-184b-4264-bb5d-04cd517ae28a.csv']


## Add additional columns to test files and set types

In [24]:
if len(target_file_names) > 0:
    # Read Files
    df = spark.read.format("csv").option("header", "true") \
                                 .option("quote", '"') \
                                 .option("escape", '"') \
                                 .load(target_file_names)

    # Create Concatenated Key to help with the merge statement
    df = df.withColumn("ConcatenatedKey", concat(df["RunID"], lit('|'), df["Order"]))

    # Extract the date part from the datetime column
    df = df.withColumn("RunDate", to_date(col("RunDateTime")))

    # Setup project specific information
    df = df.withColumn("ProjectConcatenatedKey", concat(col("ModelName"), lit("|"),
                                                        col("BranchName"), lit("|"),
                                                        col("RepositoryName"), lit("|"),
                                                        col("ProjectName")))

    # Extract various components from the datetime column
    df = df.withColumn("RunYear", year(df['RunDate']))
    df = df.withColumn("RunMonth", month(df['RunDate']))
    df = df.withColumn("RunDay", date_format(df['RunDate'], "d").cast('int'))
    df = df.withColumn("RunTimeStr", split(df['RunDateTime'], 'T')[1])
    df = df.withColumn("RunHour", split(df['RunTimeStr'], '-')[0])
    df = df.withColumn("RunMinute", split(df['RunTimeStr'], '-')[1])
    df = df.withColumn("RunSeconds", split(split(df['RunTimeStr'], '-')[2], 'Z')[0])
    df = df.withColumn("RunTime", to_timestamp(concat(lit("1970-1-1 "),
                                                      col("RunHour"), lit(":"), 
                                                      col("RunMinute"), lit(":"), 
                                                      col("RunSeconds"), lit(" Z"))))


StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 26, Finished, Available, Finished)

## Upsert to Project Information Table

In [25]:
if len(target_file_names) > 0:
    # Define schema for Test Results
    schema_proj_info = StructType([
        StructField("ProjectConcatenatedKey", StringType(), True),
        StructField("ProjectName", StringType(), True),
        StructField("RepositoryName", StringType(), True),
        StructField("BranchName", StringType(), True),
        StructField("ArtifactName", StringType(), True),
        StructField("ArtifactType", StringType(), True),
        StructField("ProjectInformationID", IntegerType(), True),
    ])

    # Select relevant columns and filter out rows where 'ProjectConcatenatedKey' is null
    prelim_project_df = df.select(
        "ProjectConcatenatedKey",
        "ProjectName",
        "RepositoryName",
        "BranchName",
        "ModelName",
    ).where(col('ProjectConcatenatedKey').isNotNull())

    prelim_project_df = prelim_project_df.withColumnRenamed("ModelName","ArtifactName")\
                                         .withColumn("ArtifactType",lit("Semantic Model"))

    # Determine the maximum ProjectInformationID to generate unique identifiers
    if not ("ProjectInformation" in tables):
        max_id = 0
    else:
        max_project_df = spark.sql("SELECT MAX(ProjectInformationID) as max FROM ProjectInformation")
        max_id = max_project_df.first()['max']

    # Generate unique identifiers using window function and update prelim_project_df
    w = Window().orderBy(lit('ProjectConcatenatedKey'))
    prelim_project_df = prelim_project_df.dropDuplicates(['ProjectConcatenatedKey'])
    prelim_project_df = prelim_project_df.withColumn("ProjectInformationID", lit(max_id) + row_number().over(w))

    # Enforce the schema when creating DataFrame
    project_df = spark.createDataFrame(prelim_project_df.toPandas(), schema_proj_info)

    # Load or overwrite project information based on table existence
    if "ProjectInformation" in tables:
        project_delta_df = DeltaTable.forPath(spark, "Tables/ProjectInformation")
        condition = "target.ProjectConcatenatedKey = source.ProjectConcatenatedKey"
        project_delta_df.alias("target").merge(project_df.alias("source"), condition)\
            .whenMatchedUpdate(set={"ProjectConcatenatedKey": "source.ProjectConcatenatedKey"})\
            .whenNotMatchedInsert(values={
                "ProjectConcatenatedKey": "source.ProjectConcatenatedKey",
                "ProjectName": "source.ProjectName",
                "RepositoryName": "source.RepositoryName",
                "ArtifactName": "source.ArtifactName",
                "ArtifactType": "source.ArtifactType",
                "BranchName": "source.BranchName",
                "ProjectInformationID": "source.ProjectInformationID"
            }).execute()
    else:
        project_df.write.format("delta").mode("overwrite").save("Tables/ProjectInformation")


StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 27, Finished, Available, Finished)

## Upsert Test Results

In [27]:
if len(target_file_names) > 0:
    updated_project_df = spark.sql("SELECT DISTINCT ProjectInformationID, ProjectConcatenatedKey FROM ProjectInformation")

    # Schema for Test Results
    schema_test_results = StructType([
        StructField("ProjectConcatenatedKey", StringType(), True),
        StructField("ConcatenatedKey", StringType(), True),
        StructField("RunID", StringType(), True),
        StructField("Message", StringType(), True),
        StructField("HasPassed", IntegerType(), True),
        StructField("RunDate", DateType(), True),
        StructField("RunTime", TimestampType(), True),
        StructField("ProjectInformationID", IntegerType(), True),
        # Note in a star schema this column may be better served as a dimension 
        # but depends on what additional columns may be helpful about the tester
        StructField("Tester", StringType(), True),        
        StructField("Order", IntegerType(), True)
    ])

    
    # Filter to just test results
    prelim_test_results_df = df.filter(df["IsTestResult"] == 'True')
    # Update columns
    prelim_test_results_df = prelim_test_results_df.withColumn("HasPassed",when(df["LogType"].isin("Error","Failure"),0).otherwise(1))
    prelim_test_results_df = prelim_test_results_df.withColumnRenamed("UserName","Tester")
    # Left join to get project id
    prelim_test_results_df = prelim_test_results_df.join(updated_project_df, \
                                           prelim_test_results_df.ProjectConcatenatedKey == updated_project_df.ProjectConcatenatedKey,\
                                           how='left').drop(updated_project_df.ProjectConcatenatedKey)
    # Get specific columns
    prelim_test_results_df = prelim_test_results_df.select("ProjectConcatenatedKey",\
                                    "ConcatenatedKey",\
                                    "RunID",\
                                    "Message",\
                                    "HasPassed",\
                                    "RunDate",\
                                    "RunTime",\
                                    "ProjectInformationID",\
                                    "Tester",\
                                    "Order").withColumn("Order", col("Order").cast("int"))
    # Enforce the schema
    test_results_df = spark.createDataFrame(prelim_test_results_df.toPandas(),schema_test_results)
    # Create TestResults Table with this schema
    if(not ("TestResults" in tables)):
        # Save Table
        test_results_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true")\
                                             .partitionBy("RunDate", "ProjectInformationID")\
                                             .save("Tables/TestResults")
    else:
        # Upsert
        testresults_delta_df = DeltaTable.forPath(spark,"Tables/TestResults")
        
        condition = 'target.ConcatenatedKey = source.ConcatenatedKey'
        testresults_delta_df.alias('target').merge(test_results_df.alias('source'), condition)\
                            .whenNotMatchedInsert(values={"ProjectConcatenatedKey":"source.ProjectConcatenatedKey",\
                                                        "ConcatenatedKey":"source.ConcatenatedKey",\
                                                        "RunID":"source.RunID",\
                                                        "Message":"source.Message",\
                                                        "HasPassed": "source.HasPassed",\
                                                        "RunDate":"source.RunDate",\
                                                        "RunTime":"source.RunTime",\
                                                        "ProjectInformationID":"source.ProjectInformationID",\
                                                        "Tester":"source.Tester",\
                                                        "Order":"source.Order"}).execute()



StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 29, Finished, Available, Finished)

## Move files to Processed folder

In [28]:
# Move files to processed
for i, item in enumerate(target_file_names):
    if item.endswith('.csv'):
        # Get file name
        file = item.split(processing_path)[1]
        # Move Files
        mssparkutils.fs.mv(abfss_path + "/" + processing_path + file, abfss_path + "/" + processed_path + "/" + file)
        continue


StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 30, Finished, Available, Finished)

## Setup Date Dimension

In [29]:
if len(target_file_names) > 0:
    # Load data from TestResults table to find min and max dates
    testresults_stored_df = spark.sql("SELECT MAX(RunDate) as max, MIN(RunDate) as min FROM TestResults")

    # Extract start and end dates from test results
    start_date = testresults_stored_df.first()['min']
    end_date = testresults_stored_df.first()['max']

    # List to store dates
    dates = []

    # Delta to increment date in each iteration
    delta = timedelta(days=1)
    loop_date = start_date

    # Define schema for dates DataFrame
    schema = StructType([
        StructField("Date", DateType(), True),
        StructField("DayofMonth", IntegerType(), True),
        StructField("DayName", StringType(), True),
        StructField("Month", IntegerType(), True),
        StructField("MonthName", StringType(), True),
        StructField("Quarter", IntegerType(), True),
        StructField("Year", IntegerType(), True),
    ])

    # Loop through dates from start_date to end_date
    while loop_date <= end_date:
        # Create dictionary with date attributes
        row = {\
            "Date": loop_date,\
            "DayofMonth": loop_date.day,\
            "DayName": loop_date.strftime('%A'),\
            "Month": loop_date.month,\
            "MonthName": loop_date.strftime('%B'),\
            "Quarter": ceil(loop_date.month / 3),\
            "Year": loop_date.year\
        }
        dates.append(row)
        # Increment loop_date by delta (1 day)
        loop_date += delta

    # Create DataFrame from list of dates with enforced schema
    dates_df = spark.createDataFrame(data=dates, schema=schema)

    # Write DataFrame to Delta format, overwriting existing data in "Tables/Calendar"
    dates_df.write.format("delta").mode("overwrite").save("Tables/Calendar")



StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 31, Finished, Available, Finished)

## Setup Time Dimension

In [30]:
if(not ("Time" in tables)):
    # Step 1: Define Time Dimension Schema with Hour and Minute Bins
    schema_time = StructType([
        StructField("Time", TimestampType(), True),
        StructField("Hour", IntegerType(), True),
        StructField("Minute", IntegerType(), True),
        StructField("Second", IntegerType(), True),
        StructField("AMPM", StringType(), True),
        StructField("HourBin12", IntegerType(), True),
        StructField("HourBin8", IntegerType(), True),
        StructField("HourBin6", IntegerType(), True),
        StructField("HourBin4", IntegerType(), True),
        StructField("HourBin3", IntegerType(), True),
        StructField("HourBin2", IntegerType(), True),
        StructField("MinuteBin30", IntegerType(), True),
        StructField("MinuteBin15", IntegerType(), True),
        StructField("MinuteBin10", IntegerType(), True),
    ])

    # Convert 'id' column to timestamp
    time_df = spark.range(0, 24 * 60 * 60).selectExpr("timestamp(id) as Time")

    # Extract hour, minute, and second from the timestamp
    time_df = time_df.withColumn("Hour", hour("Time"))
    time_df = time_df.withColumn("Minute", minute("Time"))
    time_df = time_df.withColumn("Second", second("Time"))
    time_df = time_df.withColumn("AMPM", lit("AM").alias("AMPM"))

    # Calculate Hour Bins
    time_df = time_df.withColumn("HourBin12", expr("Hour % 12"))
    time_df = time_df.withColumn("HourBin8", expr("Hour % 8"))
    time_df = time_df.withColumn("HourBin6", expr("Hour % 6"))
    time_df = time_df.withColumn("HourBin4", expr("Hour % 4"))
    time_df = time_df.withColumn("HourBin3", expr("Hour % 3"))
    time_df = time_df.withColumn("HourBin2", expr("Hour % 2"))

    # Calculate Minute Bins
    time_df = time_df.withColumn("MinuteBin30", expr("Minute % 30"))
    time_df = time_df.withColumn("MinuteBin15", expr("Minute % 15"))
    time_df = time_df.withColumn("MinuteBin10", expr("Minute % 10"))

    # Select Columns as per Schema
    prelim_time_df = time_df.select("Time", "Hour", "Minute", "Second", "AMPM",
                                    "HourBin12", "HourBin8", "HourBin6", "HourBin4", "HourBin3", "HourBin2",
                                    "MinuteBin30", "MinuteBin15", "MinuteBin10")

    # Enforce the schema
    time_df = spark.createDataFrame(prelim_time_df.toPandas(),schema_time)  
    
    # Save
    time_df.write.format("delta").mode("overwrite").save("Tables/Time")

StatementMeta(, 06114e8d-f378-47a7-b2c1-bdd22ea1b5f7, 32, Finished, Available, Finished)