# Variables

In [3]:
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")

# reading the JSON document - FabricMonitoring_Variables.json
df = spark.read.option("multiline", "true").json("Files/FabricMonitoring_Variables.json")
filepathTenantSettings = df.first()['filepathTenantSettings']
theLakehouse = "theLakehouse"
deltaTablePrefix = "fabricmonitoring_"

# .first()[...] helps to return a string instead of a dataframe
currentTimeZone = spark.sql("SELECT current_timezone();").first()['current_timezone()']
#print(currentTimeZone)

StatementMeta(, c4e330e1-c37a-42ce-aba3-d78e7e2a86e4, 7, Finished, Available)

# All the imports

In [4]:
# all the stuff I need
from notebookutils import mssparkutils

from pyspark.sql.types import (StructType, StructField)
from pyspark.sql.types import (ArrayType, BooleanType, DateType, IntegerType, LongType, StringType, TimestampType)
from pyspark.sql.functions import (col, concat, from_json, get_json_object, input_file_name, lit, regexp_extract, split, udf, when)

from delta.tables import *

import json
import requests
import azure.identity as ai
from azure.identity import ClientSecretCredential
from datetime import datetime

StatementMeta(, c4e330e1-c37a-42ce-aba3-d78e7e2a86e4, 8, Finished, Available)

# Prepare the tables

## tenantsettings_bronze

In [5]:
if not spark.catalog.tableExists(f"{theLakehouse}.{deltaTablePrefix}tenantsettings_bronze"):

    #create empty delta table from schema
    schemaTenantSettings = StructType([
        StructField('canSpecifySecurityGroups', BooleanType(), True), 
        StructField('enabled', BooleanType(), True), 
        StructField('enabledSecurityGroups', 
        ArrayType(StructType([
            StructField('graphId', StringType(), True), 
            StructField('name', StringType(), True)]), True), True), 
        StructField('properties', 
            ArrayType(StructType([
                StructField('name', StringType(), True), 
                StructField('type', StringType(), True), 
                StructField('value', StringType(), True)]), True), True), 
        StructField('settingName', StringType(), True),
        StructField('tenantSettingGroup', StringType(), True), 
        StructField('title', StringType(), True),
        StructField('fileName', StringType(), True),
        StructField('fullFileName', StringType(), True),
        StructField('fileDate', DateType(), True),
        ])
    df_tenantsettings_bronze = spark.createDataFrame([], schemaTenantSettings)
    df_tenantsettings_bronze.write \
        .option("mergeSchema", "true") \
        .mode("overwrite") \
        .format("delta") \
        .saveAsTable(f"{theLakehouse}.{deltaTablePrefix}tenantsettings_bronze")


StatementMeta(, c4e330e1-c37a-42ce-aba3-d78e7e2a86e4, 9, Finished, Available)

## tenantsettings_silver

In [6]:
if not spark.catalog.tableExists(f"{theLakehouse}.{deltaTablePrefix}tenantsettings_silver"):

    #create empty delta table from schema
    schemaTenantSettings = StructType([
        StructField('tenantId', StringType(), True),
        StructField('canSpecifySecurityGroups', BooleanType(), True), 
        StructField('enabled', BooleanType(), True), 
        StructField('enabledSecurityGroups', 
        ArrayType(StructType([
            StructField('graphId', StringType(), True), 
            StructField('name', StringType(), True)]), True), True), 
        StructField('properties', 
            ArrayType(StructType([
                StructField('name', StringType(), True), 
                StructField('type', StringType(), True), 
                StructField('value', StringType(), True)]), True), True), 
        StructField('settingName', StringType(), True),
        StructField('tenantSettingGroup', StringType(), True), 
        StructField('title', StringType(), True),
        StructField('hasSecurityGroups', StringType(), True),
        StructField('fullFileName', StringType(), True),
        StructField('fileName', StringType(), True),
        StructField('fileDate', StringType(), True),
        StructField('eventDateTime', TimestampType(), False),
        StructField('eventTimezone', StringType(), False),
        ])
    df_tenantsettings_silver = spark.createDataFrame([], schemaTenantSettings)
    df_tenantsettings_silver.write \
        .option("mergeSchema", "true") \
        .mode("overwrite") \
        .format("delta") \
        .saveAsTable(f"{theLakehouse}.{deltaTablePrefix}tenantsettings_silver")

StatementMeta(, c4e330e1-c37a-42ce-aba3-d78e7e2a86e4, 10, Finished, Available)

# Read the JSON files, transform the data

## json to raw

In [7]:
df_TenantSettings_json = spark.read.json(f"{filepathTenantSettings}*.json")
# adding housekeeping columns
df_TenantSettings_json = df_TenantSettings_json.select("*") \
    .withColumn("fullFileName", input_file_name()) \
    .withColumn("fileName", split(input_file_name(), "/")[7]) \
    .withColumn("fileDate", regexp_extract(split(input_file_name(), "/")[7], '(?<=\_)(.*?)(?=\.)', 0).cast("date") )
#display(df_TenantSettings_json.select("fullfilename","fileName","fileDate"))

StatementMeta(, c4e330e1-c37a-42ce-aba3-d78e7e2a86e4, 11, Finished, Available)

In [8]:
df_TenantSettings_json.write \
   .option("mergeSchema", "true") \
   .mode("overwrite") \
   .format("delta")\
   .saveAsTable(f"{theLakehouse}.{deltaTablePrefix}TenantSettings_raw")

StatementMeta(, c4e330e1-c37a-42ce-aba3-d78e7e2a86e4, 12, Finished, Available)

## raw to bronze

In [9]:
event_DateTime = datetime.now()

targetTable = DeltaTable.forName(spark, f"{theLakehouse}.{deltaTablePrefix}tenantsettings_bronze")

sourceTable = spark.read.table(f"{theLakehouse}.{deltaTablePrefix}tenantsettings_raw")
    
targetTable.alias("t") \
   .merge(
     sourceTable.alias("s"),
     't.fileName = s.fileName AND t.settingName = s.SettingName'
   ) \
    .whenMatchedUpdate(set =
     {
          
     }
   ) \
  .whenNotMatchedInsert(values =
     {
        "canSpecifySecurityGroups": "s.canSpecifySecurityGroups",
        "enabled": "s.enabled",
        "enabledSecurityGroups": "s.enabledSecurityGroups",
        "properties": "s.properties",
        "settingName": "s.settingName",
        "tenantSettingGroup": "s.tenantSettingGroup",
        "title": "s.title",
        "fileName": "s.fileName",
        "fullFileName": "s.fullFileName",
        "fileDate": "s.fileDate"
     }
   ) \
   .execute()

StatementMeta(, c4e330e1-c37a-42ce-aba3-d78e7e2a86e4, 13, Finished, Available)

## bronze to silver

In [10]:
event_DateTime = datetime.now()

targetTable = DeltaTable.forName(spark, f"{theLakehouse}.{deltaTablePrefix}tenantsettings_silver")

sourceTable = spark.read.table(f"{theLakehouse}.{deltaTablePrefix}tenantsettings_bronze")
# adding housekeeping columns
sourceTable = sourceTable.select("*") \
    .withColumn("hasSecurityGroups", \
        when( (col("canSpecifySecurityGroups") == True) & (col("enabled") == True) & col("enabledSecurityGroups").isNotNull() , "cool").otherwise( \
        when( (col("canSpecifySecurityGroups") == True) & (col("enabled") == True) & col("enabledSecurityGroups").isNull() , "not cool").otherwise( \
        when( (col("canSpecifySecurityGroups") == True) & (col("enabled") == False), "not required").otherwise("not applicable")))) \
    .withColumn("eventDateTime", lit(event_DateTime)) \
    .withColumn("eventTimezone", lit(currentTimeZone))
    
targetTable.alias("t") \
   .merge(
     sourceTable.alias("s"),
     't.fileName = s.fileName AND t.settingName = s.SettingName'
   ) \
    .whenMatchedUpdate(set =
     {
          
     }
   ) \
  .whenNotMatchedInsert(values =
     {
        "canSpecifySecurityGroups": "s.canSpecifySecurityGroups",
        "enabled": "s.enabled",
        "enabledSecurityGroups": "s.enabledSecurityGroups",
        "properties": "s.properties",
        "settingName": "s.settingName",
        "tenantSettingGroup": "s.tenantSettingGroup",
        "title": "s.title",
        "hasSecurityGroups": "s.hasSecurityGroups",
        "fileName": "s.fileName",
        "fullFileName": "s.fullFileName",
        "fileDate": "s.fileDate",
        "eventDateTime": "s.eventDateTime",
        "eventTimezone": "s.eventTimezone"
     }
   ) \
   .execute()

StatementMeta(, c4e330e1-c37a-42ce-aba3-d78e7e2a86e4, 14, Finished, Available)