In [0]:
#read divisions_bronze table
df_divisionsraw_bronze = spark.read.format("delta").table("kusha_solutions.sai.divisions_bronze")
display(df_divisionsraw_bronze)
df_divisionsraw_bronze.printSchema()

In [0]:
#read participants_bronze table
df_participants_bronze = spark.read.format("delta").table("kusha_solutions.sai.participant_attributes_bronze")

display(df_participants_bronze)
df_participants_bronze.printSchema()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import regexp_extract, col, when
# select required columns from participants_bronze
exploded_df = df_participants_bronze.withColumn("participant", F.explode("participantData"))
 
result = exploded_df.filter(F.col("participant.participantAttributes.NTLogin").isNotNull()).select(
    F.col("conversationId"),
    
    F.col("participant.participantAttributes.LegId").alias("LegId"),
    
    F.col("participant.participantAttributes.NTLogin").alias("NTLogin")
)
 
display(result)

# Derive OrdinalLegValue and Leg_Ordinal_Flag

# Extract Leg_Ordinal from LegId (assuming LegId format contains ordinal as a suffix, e.g., "LEG_2")
df_flattened = result.withColumn(
    "Leg_Ordinal",
    regexp_extract(col("LegId"), r"(\d+)$", 1).cast("int")
)

# IF Leg_Ordinal > 1 then 1 else 0
df_flattened = df_flattened.withColumn(
    "Transfer_leg",
    when(col("Leg_Ordinal") > 1, 1).otherwise(0)
)

# IF LegId not suffixed with INT then "extract Code" else Null
df_flattened = df_flattened.withColumn(
    "Alternate_Leg_Flag",
    when(~col("LegId").rlike(r"INT$"), regexp_extract(col("LegId"), r"([A-Za-z]+)$", 1)).otherwise(None)
)

df_flattened = df_flattened.dropna(subset=["LegId"])
df_flattened = df_flattened.dropDuplicates(["LegId"])
display(df_flattened)


conversations jobs

In [0]:
#read conversations_bronze
df_conversations_bronze = spark.read.format("delta").table("kusha_solutions.sai.conversation_jobs_bronze")
display(df_conversations_bronze)
df_conversations_bronze.printSchema()

In [0]:
# Filter only outbound conversations
outbound_df = df_conversations_bronze.filter(F.col("originatingDirection") == "outbound")
outbound_df = outbound_df.dropDuplicates(["conversationId"])
display(outbound_df)



In [0]:
from pyspark.sql.functions import *
#  Extract required columns from conversations_bronze
df_selected = outbound_df.withColumn(
    "conversation_Date",
    date_format(col("conversationStart"), "yyyyMMdd").cast("int")
).withColumn(
    "Conversation_Start_Time",
    col("conversationStart").cast("timestamp")
).withColumn(
    "Conversation_End_Time",
    col("conversationEnd").cast("timestamp")
).withColumn(
    "Conversation_Date_Key",
    date_format(col("conversationStart"), "yyyyMMdd").cast("int")
).withColumn(
    "Conversation_Start_Time_Key",
    date_format(col("conversationStart"), "HHmmss").cast("int")
).withColumn(
    "Conversation_End_Time_Key",
    date_format(col("conversationEnd"), "HHmmss").cast("int")
)
df_selected=df_selected.dropDuplicates(["conversationId"])
df_selected = df_selected.select("conversationId","divisionIds","conversation_Date","Conversation_Start_Time","Conversation_End_Time","Conversation_Date_Key","Conversation_Start_Time_Key","Conversation_End_Time_Key")
display(df_selected)

In [0]:
from pyspark.sql import functions as F

# Step 1: Explode participants
df_participants = outbound_df.select(
    "conversationId",
    "conversationStart",
    "conversationEnd",
    F.explode("participants").alias("participant")
)

# Step 2: Explode sessions inside each participant
df_sessions = df_participants.select(
    "conversationId",
    "conversationStart",
    "conversationEnd",
    F.col("participant.purpose").alias("participantPurpose"),
    F.explode("participant.sessions").alias("session")
)

# Step 3: Define metric list
metric_list = [
    "tTalk", "tHeld", "tAcw", "tHandle", "tMonitoring", "tVoicemail",
    "nTransferred", "nBlindTransferred", "nConsultTransferred", "nCobrowseSessions", "nConsult"
]

# Step 4: define function to extract metric value
def get_metric_value(metric_name):
    return F.expr(f"filter(session.metrics, x -> x.name = '{metric_name}')[0].value")

df_with_metrics = df_sessions

for metric in metric_list:
    df_with_metrics = df_with_metrics.withColumn(metric, get_metric_value(metric))

# Optional: drop original session.metrics column
df_silver = df_with_metrics.drop("session.metrics")

df_silver = df_silver.fillna(0, subset=metric_list)
df_silver = df_silver.withColumnRenamed("tTalk", "Talk_Time")\
    .withColumnRenamed("tHeld", "Held_Time")\
    .withColumnRenamed("tDialing", "Dialing_Time")\
    .withColumnRenamed("tContacting", "Contacting_Time")\
    .withColumnRenamed("tAcw", "ACW_Time")\
    .withColumnRenamed("tHandle", "Handle_Time")\
    .withColumnRenamed("tMonitoring", "Monitoring_Time")\
    .withColumnRenamed("tVoicemail", "Voice_mail_Time")\
    .withColumnRenamed("nTransferred", "Transferred")\
    .withColumnRenamed("nBlindTransferred", "Transferred_Blind")\
    .withColumnRenamed("nConsultTransferred", "Transferred_Consult")\
    .withColumnRenamed("conference", "Conference")\
    .withColumnRenamed("nCobrowseSessions", "Cobrowse")\
    .withColumnRenamed("nConsult", "Consult")\
.withColumn("ACD_OB_Attempt", F.lit("1"))
# Final display or write to Silver layer
df_silver = df_silver.drop(col("participant"), col("participantPurpose"), col("session"), col("participantData"),  col("conversationStart"), col("conversationEnd"), col("source_file"), col("Conversations_ingestion_timestamp"))
display(df_silver)


In [0]:
from pyspark.sql.functions import explode, col, row_number, count, max as spark_max, unix_timestamp, current_timestamp
from pyspark.sql.window import Window

# Explode the participants and sessions
df_ani_contacts = outbound_df.select(
    col("conversationId"),
    explode("participants").alias("participant")
).select(
    col("conversationId"),
    explode("participant.sessions").alias("session")
).select(
    col("conversationId"),
    col("session.dnis").alias("dnis"),
    col("session.ani").alias("ani"),
    col("session.direction").alias("direction"),
    explode("session.segments").alias("segment")
).select(
    col("conversationId"),
    col("dnis"),
    col("ani"),
    col("direction"),
    col("segment.segmentStart").alias("startTime")
)

# Count of contacts per ANI/DDI (dnis)
df_ani_counts = df_ani_contacts.groupBy("ani", "dnis","conversationId").agg(
    count("*").alias("Customer_Contact_Sequence"),
    spark_max("startTime").alias("Previous_Contact_DateTime")
)

# Rank contacts per ANI/DDI by last contact time
window_spec = Window.partitionBy("ani", "dnis").orderBy(col("Previous_Contact_DateTime").desc())
df_ani_counts = df_ani_counts.withColumn("contact_rank", row_number().over(window_spec))

# Calculate time since last contact 


df_ani_counts = df_ani_counts.withColumn(
    "Delta_Contact_DateTime",
    unix_timestamp(current_timestamp()) - unix_timestamp(to_timestamp(col("Previous_Contact_DateTime"), 'yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\''))
)

display(df_ani_counts.drop(col("contact_rank")))



In [0]:
# Extract conference and disconnect type

df_segments = outbound_df.select(
    col("conversationId"),
    explode("participants").alias("participant")
).select(
    col("conversationId"),
    explode("participant.sessions").alias("session")
).select(
    col("conversationId"),
    explode("session.segments").alias("segment")
).select(
    col("conversationId"),
    col("segment.disconnectType"),
    col("segment.conference")
)
df_segments= df_segments.dropDuplicates(["conversationId"])
display(df_segments)

In [0]:
from pyspark.sql.functions import  col



# Extract the first destination address from array
df_dest = df_sessions.withColumn("destinationAddress", col("session.destinationAddresses"))
df_dest=df_dest.select("conversationId","destinationAddress")
df_dest=df_dest.dropDuplicates(["conversationId"])
display(df_dest)




timezone

In [0]:


# Select the first value from divisionIds in df_conversations_meta
df_conversation_divisions = outbound_df.select(
    col("conversationId"),
    col("divisionIds")[0].alias("divisionId")
)

# Join with df_divisionsraw_meta on id
df_joined = df_conversation_divisions.join(
    df_divisionsraw_bronze,
    df_conversation_divisions.divisionId == df_divisionsraw_bronze.id,
    "inner"
)

# Select the right 2 characters of the name string
df_result = df_joined.select(
    col("conversationId"),
    substring(col("name"), -2, 2).alias("TimeZone")
)
df_result=df_result.dropDuplicates(["conversationId"])
display(df_result)

In [0]:
# 1. Join df_result with df_segments on conversationId
df_join_1 = df_result.join(df_segments, on="conversationId", how="left")

# 2. Join the result with df_filtered_metrics on conversationId
df_join_2 = df_join_1.join(df_silver, on="conversationId", how="left")

# 3. Join the result with df_selected (contact history) on conversationId
df_join_3 = df_join_2.join(df_selected, on="conversationId", how="left")
# 4. Join the result with df_ani_counts on conversationId
df_join_4 = df_join_3.join(df_ani_counts, on="conversationId", how="left")
# 5. Join the result with df_dest on conversationId
df_join_5 = df_join_4.join(df_dest, on="conversationId", how="left")

# Check schema

df_join_5.printSchema()


In [0]:
# drop duplicates and display the data
df_final=df_join_5.drop(col("contact_rank"),col("ani"),col("divisionIds")).dropDuplicates(["conversationId"])
df_final.display(truncate=False)
df_final.printSchema()

In [0]:
#try to join conversation jobs and participant attributes
df_test=df_final.join(df_flattened ,on="conversationId",how="outer")
df_test.display(truncate=False)
df_test.printSchema()

In [0]:
# write the data to delta table
df_flattened.write.mode("overwrite").saveAsTable("kusha_solutions.sai.participant_attributes_silver")
df_final.write.mode("overwrite").saveAsTable("kusha_solutions.sai.conversation_jobs_silver")