Configuration, and read in of all the available CSV files from the raw data storage

In [0]:
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import to_timestamp, col, year, month, when

container = "databricks"
storage_account = "experimentcidt"
tenant_id = "80a5cb6b-ae21-4ea8-bd3f-25e005cefc5b"
managed_identity_client_id = "33f7c0cd-4fa6-432d-9be6-225da9c1768b"

# Get the storage account key from the secret scope, which is connected to Azure data vault
storage_account_key = dbutils.secrets.get(scope="Experiments3", key="storage-account-key")
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.blob.core.windows.net",
    f"{storage_account_key}"
)

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

df_with_source = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("multiLine", "true") \
    .csv(f"wasbs://{container}@{storage_account}.blob.core.windows.net/*.csv") \
    .withColumn("source_file", input_file_name())

# Alternatively, create a dataframe from all CSV files in: /Volumes/experiments3/raw_logging/vol1/csvs/




Some initial data cleaning

In [0]:
# Remove all rows where the user is 'NULL'
df_with_source = df_with_source.filter(df_with_source.user != "NULL")

# Remove all rows where cpd_name is 'NULL'
df_with_source = df_with_source.filter(df_with_source.cpd_name != "NULL")

# Remove all rows where CPD_NAME is in ['DIVEIN', 'TSTCPD', 'CCBB']
df_with_source = df_with_source.filter(~df_with_source.cpd_name.isin(["DIVEIN", "TSTCPD", "CCBB"]))

# Remove all rows where user is in ['rameer', 'kwijk', 'rwithage', 'simoncox']
df_with_source = df_with_source.filter(~df_with_source.user.isin(["rameer", "kwijk", "rwithage", "simoncox"]))

# Remove all rows where logger is 'linuxFileSystemWatcher'
# There is some serious error here that clutters the logging
df_with_source = df_with_source.filter(df_with_source.logger != "linuxFileSystemWatcher") 

# Remove duplicate rows where _time and user and cpd_name are the same
# Because we have multiple CSV they overlap.
df_with_source = df_with_source.dropDuplicates(["_time", "user", "cpd_name"])

# Remove when the message contains 'Checker found an error in DDF'. The DDF checker pollutes
# the messages. This message has to become DEBUG level.
df_with_source = df_with_source.filter(~df_with_source.message.contains("Checker found an error in DDF"))

# Print the total number of rows
print(f"Total number of rows: {df_with_source.count()}")


For later partitioning, we need a year and month column. Also add a iso_timestamp column having an ISO formatted timestamp.

In [0]:
# The _time column looks like 'Sun Jan 19 14:20:55 2025', create a iso_timestamp column 
df_with_source = df_with_source.withColumn("iso_timestamp", to_timestamp(col("_time"), "EEE MMM dd HH:mm:ss yyyy"))

# Add a year and month column, for later partitioning of the delta table
df_with_source = df_with_source.withColumn("year", year(col("iso_timestamp")))
df_with_source = df_with_source.withColumn("month", month(col("iso_timestamp")))

# Print the min and max timestamp in the dataframe
print(f"Min timestamp: {df_with_source.agg({'iso_timestamp': 'min'}).collect()[0][0]}")
print(f"Max timestamp: {df_with_source.agg({'iso_timestamp': 'max'}).collect()[0][0]}")

Create a feature column, having the feature name obtained from the logger and/or the message

In [0]:
#####
# Features
#####

# If the logger column contains 'devbenchExtension' and the message is 'Debugger attached successfully',
# put 'Python remote debugging' in a new 'feature' column. Else put 'NULL' into this column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("devbenchExtension") & col("message").contains("Debugger attached successfully"), "Python remote debugging").otherwise("NULL"))

# If the logger column contains 'reportPreviewExtension' put 'Report preview' in the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("reportPreviewExtension"), "Report preview").otherwise(col("feature")))

# If the logger column contain 'ddfDefinitionProvider' put 'DDF jump around' in the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("ddfDefinitionProvider"), "DDF jump around").otherwise(col("feature")))

# If the logger column contains 'aspectsExtension' put 'CPD aspects overview' in the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("aspectsExtension"), "CPD aspects overview").otherwise(col("feature")))

# If the logger column contains 'ddfFileTreeExtension' put 'Required interfaces viewer' in the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("ddfFileTreeExtension"), "Required interfaces viewer").otherwise(col("feature")))

# If the logger column contains 'devbenchExtension' and the message contains 'Adding filewatcher' put 'Devbench sync, automatic file upload' in the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("devbenchExtension") & col("message").contains("Adding filewatcher"), "Devbench sync, automatic file upload").otherwise(col("feature")))

# If the logger column contains 'devbenchExtension' and the message contains 'successfully created' put 'Devbench integration' in the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("devbenchExtension") & col("message").contains("successfully created"), "Devbench integration").otherwise(col("feature")))

# If the logger column contains 'reportEditor' put 'Report editor' to the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("reportEditor"), "Report editor").otherwise(col("feature")))

# If the logger column contain 'ddfCheckerExtension' put 'DDF syntax checking' to the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("ddfCheckerExtension"), "DDF syntax checking").otherwise(col("feature")))

# If the logger column contains 'flowEditor' put 'Flow editor' to the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("flowEditor"), "Flow editor").otherwise(col("feature")))

# If the logger column contains 'devbenchExtension' and the message contains 'on ER/ER_event_log' put 'ER eventlog viewer' to the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("devbenchExtension") & col("message").contains("on ER/ER_event_log"), "ER eventlog viewer").otherwise(col("feature")))

# If the logger column contains 'devbenchExtension' and the message contains 'Live sync enabled for' put 'Live sync' to the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("devbenchExtension") & col("message").contains("Live sync enabled for"), "Live sync").otherwise(col("feature")))

# If the logger column contains 'swipeExtension' put 'Swipe integration' to the feature column
df_with_source = df_with_source.withColumn("feature", when(col("logger").contains("swipeExtension"), "Swipe integration").otherwise(col("feature")))

Determine which CPDs are classic CPDs. Put True/False in the classic column.

In [0]:
classic_cpd_names = spark.table("experiments3.logging.classic_cpd_names")
classic_cpd_names.show(200, False)

# For every row in df_with_source, if the cpd_name column contains a value from classic_cpd_names, put True in the classic column, else False
df_with_source = df_with_source.withColumn("classic", when(col("cpd_name").isin(classic_cpd_names.select("cpd_name").rdd.flatMap(lambda x: x).collect()), True).otherwise(False))


In [0]:
# Write out the combined table, partition by year and month
df_with_source.write.partitionBy("year", "month").mode("overwrite").saveAsTable("experiments3.logging.combined")

# Print the nummer of rows written
print(f"Total number of rows written: {df_with_source.count()} to table: experiments3.logging.combined")