In [None]:
import sempy.fabric          as f
import pyspark.sql.types     as st
import pyspark.sql.functions as sf
import time                  as t

In [None]:
is_debug = True

# Lakehouses

## Create lakehouses

In [None]:
# Get current WorkspaceID
workspace_id = f.get_workspace_id()
if is_debug: print(f"workspace_id: {workspace_id}")

# Get lakehouses in the current workspace
sdf_existing_dataset = spark.createDataFrame(f.list_datasets())
if is_debug:
    print("sdf_existing_dataset:")
    display(sdf_existing_dataset)

# Create dataframe with the lakehouses that need to be created
sdf_new_dataset = spark.createDataFrame([
    "lh_cfg"
    , "lh_log"
    , "lh_bronze"
    , "lh_silver"
    , "lh_AdventureWorks"
    , "lh_dataverse_tablename_workspace_abc"
    ]
    , "string"
).toDF("lakehouse_name")
if is_debug:
    print("sdf_new_dataset:")
    display(sdf_new_dataset)

# Join "existing" and "new" lakehouses and create the missing ones
sdf_joined = sdf_new_dataset.alias("n")\
    .join(sdf_existing_dataset.alias("e"), sf.col("n.lakehouse_name") == sf.col("e.Dataset Name"), "leftanti")\
    .select(sf.col("n.lakehouse_name")
)
sdf_joined_count = sdf_joined.count()
if is_debug:
    print("sdf_joined:")
    display(sdf_joined)
    print(f"sdf_joined_count: {sdf_joined_count}")

# loop joined dataframe and create the new lakehouses
for row in sdf_joined.collect():
    lakehouse_id = f.create_lakehouse(row.lakehouse_name)
    print(f"Lakehouse with id\"{lakehouse_id}\" is created successfully")

# Sleep until lakehouses are created
if sdf_joined_count > 0: t.sleep(60)

## Getting the ABFS paths 

In [None]:
# Create variables for abfs paths (abfs_path_<lakehouse_name>)
list_lakehouse = mssparkutils.lakehouse.list(workspaceId = workspace_id)

for v in list_lakehouse:
    lakehouse_name = v.displayName
    abfs_path      = v.properties["abfsPath"]

    if is_debug: print(f"abfs_path_{lakehouse_name}: {abfs_path}")
    locals()[f"abfs_path_{lakehouse_name}"] = abfs_path

## Create Tables

### Extract Object

One table per "frequency", keeping all the attributes, needed for the extraction of this object

#### lh_cfg.eo_sqlserver

In [None]:
sch = st.StructType([
  st.StructField("technology"                     , st.StringType(), False) # PK
  , st.StructField("frequency"                    , st.StringType(), False) # PK
  , st.StructField("server_name"                  , st.StringType(), False) # PK
  , st.StructField("database_name"                , st.StringType(), False) # PK
  , st.StructField("schema_name"                  , st.StringType(), False) # PK
  , st.StructField("table_name"                   , st.StringType(), False) # PK
  , st.StructField("keyvault_url"                 , st.StringType())
  , st.StructField("keyvault_secret_name_user"    , st.StringType())
  , st.StructField("keyvault_secret_name_password", st.StringType())
  , st.StructField("extract_type"                 , st.StringType())  
  , st.StructField("sequence"                     , st.LongType())
  , st.StructField("extraction_timeframe"         , st.StringType())
  , st.StructField("from"                         , st.StringType())
  , st.StructField("where"                        , st.StringType())
  , st.StructField("is_pk"                        , st.BooleanType())
  , st.StructField("prefix_select"                , st.StringType())
  , st.StructField("column_name"                  , st.StringType())
  , st.StructField("is_extracted"                 , st.BooleanType())
  , st.StructField("data_type"                    , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "True")\
  .save(f"{abfs_path_lh_cfg}/Tables/eo_sqlserver")

if is_debug:
  sdf.printSchema()
  display(sdf)

#### lh_cfg.eo_lakehouse

In [None]:
sch = st.StructType([
  st.StructField("technology"            , st.StringType(), False) # PK
  , st.StructField("frequency"           , st.StringType(), False) # PK
  , st.StructField("lakehouse_name"      , st.StringType(), False) # PK
  , st.StructField("table_name"          , st.StringType(), False) # PK
  , st.StructField("extract_type"        , st.StringType())
  , st.StructField("sequence"            , st.LongType())
  , st.StructField("extraction_timeframe", st.StringType())
  , st.StructField("from"                , st.StringType())
  , st.StructField("where"               , st.StringType())
  , st.StructField("is_pk"               , st.BooleanType())
  , st.StructField("prefix_select"       , st.StringType())
  , st.StructField("column_name"         , st.StringType())
  , st.StructField("is_extracted"        , st.BooleanType())
  , st.StructField("data_type"           , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "True")\
  .save(f"{abfs_path_lh_cfg}/Tables/eo_lakehouse")

if is_debug:
  sdf.printSchema()
  display(sdf)

#### lh_cfg.eo_excel

In [None]:
sch = st.StructType([
  st.StructField("technology"      , st.StringType(), False) # PK
  , st.StructField("frequency"     , st.StringType(), False) # PK
  , st.StructField("folder_name"   , st.StringType(), False) # PK
  , st.StructField("file_name"     , st.StringType(), False) # PK
  , st.StructField("worksheet_name", st.StringType(), False) # PK
  , st.StructField("extract_type"  , st.StringType())
  , st.StructField("first_row"     , st.ShortType())
  , st.StructField("sequence"      , st.LongType())
  , st.StructField("is_pk"         , st.BooleanType())
  , st.StructField("column_name"   , st.StringType())
  , st.StructField("is_extracted"  , st.BooleanType())
  , st.StructField("data_type"     , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "True")\
  .save(f"{abfs_path_lh_cfg}/Tables/eo_excel")

if is_debug:
  sdf.printSchema()
  display(sdf)

#### lh_cfg.eo_csv

In [None]:
sch = st.StructType([
  st.StructField("technology"     , st.StringType(), False) # PK
  , st.StructField("frequency"    , st.StringType(), False) # PK
  , st.StructField("folder_name"  , st.StringType(), False) # PK
  , st.StructField("file_name"    , st.StringType(), False) # PK
  , st.StructField("has_header"   , st.BooleanType())
  , st.StructField("delimiter"    , st.StringType())
  , st.StructField("row_separator", st.StringType())
  , st.StructField("quote"        , st.StringType())
  , st.StructField("extract_type" , st.StringType())
  , st.StructField("sequence"     , st.LongType())
  , st.StructField("is_pk"        , st.BooleanType())
  , st.StructField("column_name"  , st.StringType())
  , st.StructField("is_extracted" , st.BooleanType())
  , st.StructField("data_type"    , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "True")\
  .save(f"{abfs_path_lh_cfg}/Tables/eo_csv")

if is_debug:
  sdf.printSchema()
  display(sdf)

#### lh_cfg.eo_json

In [None]:
sch = st.StructType([
  st.StructField("technology"    , st.StringType(), False) # PK
  , st.StructField("frequency"   , st.StringType(), False) # PK
  , st.StructField("folder_name" , st.StringType(), False) # PK
  , st.StructField("file_name"   , st.StringType(), False) # PK
  , st.StructField("extract_type", st.StringType())
  , st.StructField("sequence"    , st.LongType())
  , st.StructField("is_pk"       , st.BooleanType())
  , st.StructField("column_name" , st.StringType())
  , st.StructField("is_extracted", st.BooleanType())
  , st.StructField("data_type"   , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "True")\
  .save(f"{abfs_path_lh_cfg}/Tables/eo_json")

if is_debug:
  sdf.printSchema()
  display(sdf)

#### lh_cfg.eo_api

In [None]:
sch = st.StructType([
    st.StructField("technology"           , st.StringType(), False) # PK
    , st.StructField("frequency"          , st.StringType(), False) # PK
    , st.StructField("table_name"         , st.StringType(), False) # PK
    , st.StructField("url"                , st.StringType(), False) # PK requests.get/post(url)
    , st.StructField("method"             , st.StringType())
    , st.StructField("headers"            , st.StringType()) # requests.get/post(headers)
    , st.StructField("data"               , st.StringType()) # requests.post(data)
    , st.StructField("root_element_name"  , st.StringType())
    , st.StructField("pagination_value"   , st.StringType())
    , st.StructField("params"             , st.StringType()) # requests.get(params)
    , st.StructField("authorization_type" , st.StringType())
    , st.StructField("authorization_value", st.StringType())
    , st.StructField("request_timeout"    , st.IntegerType())
    , st.StructField("response_format"    , st.IntegerType())
    , st.StructField("is_extracted"       , st.BooleanType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
	.format("delta")\
	.mode("overwrite")\
	.option("overwriteSchema", "True")\
	.save(f"{abfs_path_lh_cfg}/Tables/eo_api")

if is_debug:
	sdf.printSchema()
	display(sdf)

### Parameter

#### lh_cfg.global_parameter

The global parameters are applicable to the entire process (bronze, silver, gold)

In [None]:
sch = st.StructType([
  st.StructField("name",         st.StringType(),  False) # PK
  , st.StructField("value",      st.StringType(),  False) # PK
  , st.StructField("note",       st.StringType(),  True)
  , st.StructField("is_active" , st.BooleanType(), False)
])

# Collect existing lakehouses and create dynamic "create dataframe" list
list_lakehouse = mssparkutils.lakehouse.list(workspaceId = workspace_id)

s = "spark.createDataFrame(["

for v in list_lakehouse:
    lakehouse_name = v.displayName
    abfs_path      = v.properties["abfsPath"]

    if is_debug:
        s += f"(\"abfs_path_{lakehouse_name}\", abfs_path_{lakehouse_name}.strip(),\"abfs path for lakehouse {lakehouse_name}\"),"

s += "], sch)"

# Create sdf_global_parameter
sdf_global_parameter = eval(s)

sdf_global_parameter.write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "True")\
  .save(f"{abfs_path_lh_cfg}/Tables/global_parameter")

if is_debug:
  sdf_global_parameter.printSchema()
  display(sdf_global_parameter)

#### lh_cfg.ep_sqlserver

Parameters as per teshnology, created and used dynamically during the process run

In [None]:
sch = st.StructType([
	st.StructField("technology"      , st.StringType(), False) # PK
	, st.StructField("frequency"	 , st.StringType(), False) # PK
	, st.StructField("server_name"   , st.StringType(), False) # PK
	, st.StructField("database_name" , st.StringType(), False) # PK
	, st.StructField("schema_name"   , st.StringType(), False) # PK
	, st.StructField("table_name"    , st.StringType(), False) # PK
	, st.StructField("name"		     , st.StringType())
	, st.StructField("value"		 , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
	.format("delta")\
	.mode("overwrite")\
	.option("overwriteSchema", "True")\
	.save(f"{abfs_path_lh_cfg}/Tables/ep_sqlserver")

if is_debug:
	sdf.printSchema()
	display(sdf)

#### lh_cfg.ep_lakehouse

In [None]:
sch = st.StructType([
	st.StructField("technology"      , st.StringType(), False) # PK
	, st.StructField("frequency"	 , st.StringType(), False) # PK
	, st.StructField("lakehouse_name", st.StringType(), False) # PK (Technology = Lakehouse)
	, st.StructField("name"		     , st.StringType())
	, st.StructField("value"		 , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
	.format("delta")\
	.mode("overwrite")\
	.option("overwriteSchema", "True")\
	.save(f"{abfs_path_lh_cfg}/Tables/ep_lakehouse")

if is_debug:
	sdf.printSchema()
	display(sdf)

#### lh_cfg.ep_api

In [None]:
sch = st.StructType([
	st.StructField("technology"  , st.StringType(), False) # PK
	, st.StructField("frequency" , st.StringType(), False) # PK
	, st.StructField("table_name", st.StringType(), False) # PK
	, st.StructField("base_url"  , st.StringType(), False) # PK
	, st.StructField("name"		 , st.StringType())
	, st.StructField("value"	 , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
	.format("delta")\
	.mode("overwrite")\
	.option("overwriteSchema", "True")\
	.save(f"{abfs_path_lh_cfg}/Tables/ep_api")

if is_debug:
	sdf.printSchema()
	display(sdf)

### Power BI

#### lh_cfg.pbi_refresh

In [None]:
sch = st.StructType([
    st.StructField("frequency"                , st.StringType(), False) # PK
    , st.StructField("environment"            , st.StringType())
    , st.StructField("workspace_id"           , st.StringType(), False) # PK
    , st.StructField("semantic_model_id"      , st.StringType(), False) # PK
    , st.StructField("semantic_model_name"    , st.StringType())
    , st.StructField("request_timeout"        , st.IntegerType())
    , st.StructField("refresh_timeout"        , st.IntegerType())
    , st.StructField("loop_wait_time_seconds" , st.IntegerType())
    , st.StructField("is_active"              , st.BooleanType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
	.format("delta")\
	.mode("overwrite")\
	.option("overwriteSchema", "True")\
	.save(f"{abfs_path_lh_cfg}/Tables/pbi_refresh")

if is_debug:
	sdf.printSchema()
	display(sdf)

### Metadata

#### lh_cfg.md_column

The metadata columns that are added automatically to all bronze tables

In [None]:
sch = st.StructType([
  st.StructField("technology" , st.StringType(), False) # PK
  , st.StructField("frequency", st.StringType(), False) # PK
  , st.StructField("name"     , st.StringType(), False) # PK
  , st.StructField("note"     , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "True")\
  .save(f"{abfs_path_lh_cfg}/Tables/md_column")

if is_debug:
  sdf.printSchema()
  display(sdf)

### Log

#### lh_log.log

In [None]:
sch = st.StructType([
    st.StructField("process_timestamp"  , st.StringType())
    , st.StructField("medallion_name"   , st.StringType())
    , st.StructField("source_name"      , st.StringType())
    , st.StructField("locals"           , st.StringType())
    , st.StructField("alert"            , st.StringType())
    , st.StructField("alert_description", st.StringType())
    , st.StructField("logged_datetime"  , st.TimestampType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
    .format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "True")\
    .save(f"{abfs_path_lh_log}/Tables/log")

if is_debug:
    sdf.printSchema()
    display(sdf)

#### lh_log.extract

In [None]:
# PL: Ignore this!

In [None]:
sch = st.StructType([
    st.StructField("technology"             , st.StringType())
    , st.StructField("frequency"            , st.StringType())
    , st.StructField("lakehouse_name"       , st.StringType())
    , st.StructField("server_name"          , st.StringType())
    , st.StructField("database_name"        , st.StringType())
    , st.StructField("schema_name"          , st.StringType())
    , st.StructField("table_name"           , st.StringType())
    , st.StructField("folder_name"          , st.StringType())
    , st.StructField("file_name"            , st.StringType())
    , st.StructField("worksheet_name"       , st.StringType())
    , st.StructField("extraction_date_start", st.TimestampType())
    , st.StructField("extraction_date_end"  , st.TimestampType())
    , st.StructField("process_timestamp"    , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
    .format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "True")\
    .save(f"{abfs_path_lh_log}/Tables/extract")

if is_debug:
    sdf.printSchema()
    display(sdf)

### Schedule

#### lh_cfg.schedule

In [None]:
sch = st.StructType([
    st.StructField("name"            , st.StringType())
    , st.StructField("frequency"     , st.StringType())
    , st.StructField("run_hour"      , st.StringType())
    , st.StructField("run_minute"    , st.StringType())
    , st.StructField("run_am_pm"     , st.StringType())
    , st.StructField("start_datetime", st.TimestampType())
    , st.StructField("end_datetime"  , st.TimestampType())
    , st.StructField("days_of_week"  , st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
    .format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "True")\
    .save(f"{abfs_path_lh_cfg}/Tables/schedule")

if is_debug:
    sdf.printSchema()
    display(sdf)

### Data Validation

#### lh_cfg.data_validation

In [None]:
# TODO: Create logic for Data Validation

sch = st.StructType([
    st.StructField("column_1"  , st.StringType())
    , st.StructField("column_2", st.StringType())
])

sdf = spark.createDataFrame([], sch)

sdf.write\
    .format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "True")\
    .save(f"{abfs_path_lh_cfg}/Tables/data_validation")

if is_debug:
    sdf.printSchema()
    display(sdf)