# Some variables

In [1]:
# path that points to the files folder in the default lakehouse that contains the stored json files
# this path path must be the same as the path that is used by the getData notebook
filePathTimezones = "Files/PowerBIAndFabricSummit/"
filePath = "Files/PowerBIAndFabricSummit/2024/"

# a prefix that is used to mark the delta tables, in case there are a lot of tables in the default lakehouse. in case
# that a dedicated lakehouse is used, it's recommended to use an empty string like so: ""
deltaTablePrefix = "globalSummit_"

StatementMeta(, ab446f62-72fb-4c03-8930-fca41095ee95, 3, Finished, Available)

# Imports

In [2]:
import json
import requests

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import IntegerType, LongType, StringType, BooleanType, TimestampType, DateType, ArrayType, MapType

from pyspark.sql import Window

from pyspark.sql.functions import (col, explode, filter, isnull, lit, monotonically_increasing_id, 
    regexp_replace, row_number, split, trim, unix_timestamp, when)

StatementMeta(, ab446f62-72fb-4c03-8930-fca41095ee95, 4, Finished, Available)

# read json/transform/write dataframe
+ agenda, read json
+ agenda --> speaker
+ agenda --> sessionspeaker
+ agenda --> session
+ agenda --> trackMultipleValues (unique values)
+ agenda --> trackMultipleValues exploded
+ agenda --> session level
+ timetables and timezones
+ rooms
+ schedule

## agenda

### agenda read

In [3]:
# filepath: Files/PowerBIAndFabricSummit/2024

df_agenda = spark.read.json(f"{filePath}agenda.json")
#display(df_agenda.head(10))

StatementMeta(, ab446f62-72fb-4c03-8930-fca41095ee95, 5, Finished, Available)

In [None]:
# I consider the schema a valueable information that helps to create the desired table
#print(df_agenda.schema)

### agenda --> speaker

In [5]:
df_speakers = df_agenda.select("sessionId", "sessionSpeakers") \
    .withColumn("sessionSpeakers", explode(col("sessionSpeakers")))
df_speakers = df_speakers.select("sessionId", "sessionSpeakers.speaker.*") \
    .withColumn("designation",regexp_replace(col("designation"), "[^ ,0-9a-zA-Z]", ""))\
    .drop("sessionId") \
    .dropDuplicates(["speakerId"])
#display(df_speakers.head(10))

StatementMeta(, ab446f62-72fb-4c03-8930-fca41095ee95, 7, Finished, Available)

In [10]:
df_speakers.write \
   .option("mergeSchema", "true") \
   .mode("overwrite").format("delta").saveAsTable(f"{deltaTablePrefix}dim_speakers")

StatementMeta(, da3f4b62-7b29-413a-952a-abdcebcbdc97, 12, Finished, Available)

### agenda --> sessionsspeaker

In [None]:
df_sessions_and_speakers = df_agenda.select("sessionId", "sessionSpeakers") \
    .withColumn("sessionSpeakers", explode(col("sessionSpeakers")))
df_sessions_and_speakers = df_sessions_and_speakers.select("sessionId", "sessionSpeakers.speaker.speakerId") \
    .dropDuplicates(["sessionId", "speakerId"])
#display(df_sessions_and_speakers.head(10))

In [None]:
df_sessions_and_speakers.write \
   .option("mergeSchema", "true") \
   .mode("overwrite").format("delta").saveAsTable(f"{deltaTablePrefix}dim_sessionsspeakers")

### agenda --> session

In [None]:
df_sessions = df_agenda
#print(df_sessions.schema)
df_sessions = df_sessions.select("*") \
    .withColumn("sessionLevel",regexp_replace(col("sessionLevel"), "[^ ,0-9a-zA-Z]", ""))\
    .withColumn("tracks",regexp_replace(col("track"), "[^ ,0-9a-zA-Z]", ""))\
    .withColumn("tracks" , \
        when( isnull(col("tracks")), "no track assigned").otherwise(col("tracks"))) \
    .drop("sessionSpeakers")
#display(df_sessions.head(10))

In [6]:
df_sessions.write \
   .option("mergeSchema", "true") \
   .mode("overwrite").format("delta").saveAsTable(f"{deltaTablePrefix}dim_sessions")

StatementMeta(, da3f4b62-7b29-413a-952a-abdcebcbdc97, 8, Finished, Available)

### agenda --> Tracks multiple values per cell, but unique occurences
This table contains the tracks which are assigned to a session, because multitple tracks can be assigned to a single session. There will be a bridge table containing unique rows of all the combination, and of course a single value as well when only a single track is assigned to a session. But there is also a dimension table that contains a column with the multiple values and a column that contains distinct values, this column will be used on visuals.

In [None]:
df_tracks = df_agenda \
    .select("track") \
    .withColumn("tracks",regexp_replace(col("track"), "[^ ,0-9a-zA-Z]", ""))
df_tracksBridgeUnique = df_tracks \
    .select("tracks") \
    .dropDuplicates(["tracks"]) \
    .orderBy("tracks") \
    .withColumn("tracksID" , when( isnull(col("tracks")), -1).otherwise(monotonically_increasing_id())) \
    .withColumn("tracks" , \
        when( isnull(col("tracks")), "no track assigned").otherwise(col("tracks")))
#display(df_tracksBridgeUnique.head(10))

In [86]:
df_tracksBridgeUnique.write \
   .option("mergeSchema", "true") \
   .mode("overwrite").format("delta").saveAsTable(f"{deltaTablePrefix}dim_tracks_bridge")

StatementMeta(, c9a4f7ed-b321-455e-b3d2-9b314c7370dc, 88, Finished, Available)

### agenda --> Tracks
The table containing the track column that will be used on slicers and axis

In [None]:
df_tracks = df_agenda \
    .select("track") \
    .withColumn("tracks",regexp_replace(col("track"), "[^ ,0-9a-zA-Z]", ""))
df_tracks= df_tracks \
    .select("tracks") \
    .dropDuplicates(["tracks"]) \
    .withColumn("tracks" , \
        when( isnull(col("tracks")), "no track assigned").otherwise(col("tracks"))) \
    .withColumn("track" , explode(split('tracks', ', ')))
display(df_tracks.head(10))

In [None]:
df_tracks.write \
   .option("mergeSchema", "true") \
   .mode("overwrite") \
   .format("delta") \
   .saveAsTable(f"{deltaTablePrefix}dim_tracks")

### agenda --> SessionLevel

In [7]:
# be aware that monotonically_increasing_id() does not garantee consecutuve numbers
df_SessionLevel = df_agenda \
    .select("sessionLevel") \
    .withColumn("clean_name",regexp_replace(col("sessionLevel"), "[^ 0-9a-zA-Z]", ""))
df_SessionLevel = df_SessionLevel.select("clean_name") \
    .dropDuplicates(["clean_name"]) \
    .orderBy("clean_name") \
    .withColumn("sessionLevelId" , when( isnull(col("clean_name")), -1).otherwise(monotonically_increasing_id())) \
    .withColumn("sessionLevel" , \
        when( isnull(col("clean_name")), "unknown Session Level").otherwise(col("clean_name"))) \
    .drop("clean_name")
#display(df_SessionLevel.head(10))

StatementMeta(, da3f4b62-7b29-413a-952a-abdcebcbdc97, 9, Finished, Available)

In [None]:
df_SessionLevel.write \
   .option("mergeSchema", "true") \
   .mode("overwrite").format("delta").saveAsTable(f"{deltaTablePrefix}dim_sessionLevels")

## room

In [None]:
# filepath: Files/PowerBIAndFabricSummit/2024

df_room = spark.read.json(f"{filePath}room.json")
#display(df_room.head(10))

In [None]:
df_room.write \
   .option("mergeSchema", "true") \
   .mode("overwrite") \
   .format("delta") \
   .saveAsTable(f"{deltaTablePrefix}dim_rooms")

## timetable

In [3]:
# filepath: Files/PowerBIAndFabricSummit/2024

df_timetable = spark.read.json(f"{filePath}timetable.json")
df_timetable = df_timetable.select("*") \
    .withColumn("DateStart" , col("startAt").cast("date")) \
    .withColumn("DateStartTime" , col("startAt").cast("timestamp")) \
    .withColumn("DateStartTimeText" , col("startAt").cast("string"))
#display(df_timetable.head(10))

StatementMeta(, a7c5ebfe-a1e3-4d81-a02f-49a41afc1ea2, 5, Finished, Available)

In [None]:
df_timetable.write \
   .option("mergeSchema", "true") \
   .mode("overwrite") \
   .format("delta")\
   .saveAsTable(f"{deltaTablePrefix}dim_timetable")

### unique DateStartTimes

In [None]:
df_timetableDateStartTime = df_timetable.select("DateStartTime")
df_timetableDateStartTime = df_timetableDateStartTime.select("*") \
    .dropDuplicates(["DateStartTime"])
#display(df_timetableDateStartTime.head(10))

In [None]:
df_timetableDateStartTime.write \
   .option("mergeSchema", "true") \
   .mode("overwrite") \
   .format("delta")\
   .saveAsTable(f"{deltaTablePrefix}dim_timetableDateStartTimeUnique")

### read timezoneOffsets.csv

In [None]:
timezonesSchema = StructType([
  StructField("UTCOffset", StringType(), True),
  StructField("OffsetSeconds", LongType(), True)
])

#.schema(timezonesSchema) \
df_timezones = spark.read.format("csv") \
    .option("delimiter", ";") \
    .option("header","true") \
    .load(f"{filePathTimezones}timezoneOffsets.csv")
df_timezones = df_timezones.select("*") \
  .withColumn("UTCOffset", col("UTC Offset")) \
  .drop("UTC Offset")
#display(df_timezones.head(10))

### crossjoin starttime with timezones

In [None]:
df_DateStartTimeTimezones = df_timetableDateStartTime.crossJoin(df_timezones) \
    .withColumn("DateStartTimeOffset", (unix_timestamp("DateStartTime") - 46800 + col("OffsetSeconds") ).cast("timestamp")) \
    .withColumn("DateStartOffset", col("DateStartTimeOffset").cast("date"))
#display(df_DateStartTimeTimezones.filter(col("UTCOffset") == "UTC+1").head(50))

In [9]:
df_DateStartTimeTimezones.write \
   .option("mergeSchema", "true") \
   .mode("overwrite") \
   .format("delta")\
   .saveAsTable(f"{deltaTablePrefix}dim_timetableDateStartTimeTimezones")

StatementMeta(, a7c5ebfe-a1e3-4d81-a02f-49a41afc1ea2, 11, Finished, Available)

## schedule

In [10]:
# filepath: Files/PowerBIAndFabricSummit/2024

df_schedule = spark.read.json(f"{filePath}schedule.json")
df_schedule = df_schedule.select("*") \
    .drop("room" , "timeTable")
#print(df_schedule.schema)
#display(df_schedule.head(10))

StatementMeta(, a7c5ebfe-a1e3-4d81-a02f-49a41afc1ea2, 12, Finished, Available)

In [11]:
df_schedule.write \
   .option("mergeSchema", "true") \
   .mode("overwrite") \
   .format("delta")\
   .saveAsTable(f"{deltaTablePrefix}fact_schedule")

StatementMeta(, a7c5ebfe-a1e3-4d81-a02f-49a41afc1ea2, 13, Finished, Available)