# Common functions
Contains common functions needed for the script snippets below

In [None]:
from pyspark.sql.functions import when, to_date, avg, to_timestamp, col, cast
from pyspark.sql.dataframe import DataFrame

storageAccountName = "dynamicsstagingsa.dfs.core.windows.net"
containerName = "stagingdata"
manifestPath = "%s/sample/CDS/model.json" % (containerName)
outputPath = "output"

def readEntityFromLake(storageAccount, manifest, entityName):
    dataFrame = (spark.read.format("com.microsoft.cdm")
        .option("storage", storageAccount)
        .option("manifestPath", manifest)
        .option("entity", entityName)
        .option("mode", "permissive")
        .load())

    return dataFrame

def writeToCsv(dataFrame: DataFrame, csvName):
    csvPath = 'abfss://%s@%s/%s/%s' % (containerName, storageAccountName, outputPath, csvName)
    dataFrame.write.csv(csvPath, mode = 'overwrite', header = 'true')


# Cases per day
This calculates the number of cases per day, by queue.

In [None]:
queueDf = readEntityFromLake(storageAccountName, manifestPath, "queue")
queueItemsDf = readEntityFromLake(storageAccountName, manifestPath, "queueitem")
incidentDf = readEntityFromLake(storageAccountName, manifestPath, "incident")

# Define columns to select
incidentColumns = ["createdon", "incidentid", "title"]
queueItemsColumns = ["queueid", "objectid"]
queueColumns = ["queueid", "name"]
joinedDfColumns = ["incidentid", "createdon", "name"]

# Filter data frames for required rows and columns
filteredIncidentDf = incidentDf \
                        .filter(incidentDf.createdon.isNotNull()) \
                        .select(*incidentColumns)

filteredQueueItemDf = queueItemsDf \
                        .filter(queueItemsDf.queueid.isNotNull() & queueItemsDf.objectid.isNotNull()) \
                        .select(*queueItemsColumns) \
                        .withColumnRenamed("queueid", "qi_queueid")

filteredQueueDf = queueDf \
                    .withColumn("name", when((queueDf.name.isNull()) | (queueDf.name == ""), "<Unnamed Queue>")
                                        .otherwise(queueDf.name)) \
                    .select(*queueColumns)

# Join the data sets
joinedDf = filteredQueueItemDf \
                .join(filteredQueueDf, filteredQueueItemDf.qi_queueid == filteredQueueDf.queueid, "inner") \
                .join(filteredIncidentDf, filteredQueueItemDf.objectid == filteredIncidentDf.incidentid, "rightouter") \
                .select(*joinedDfColumns)

joinedDf = joinedDf \
                .withColumn("name", when(joinedDf.name.isNull(), "<No_Queue_Assigned>")
                                    .otherwise(joinedDf.name)) \
                .withColumn("createdon_date", to_date(joinedDf.createdon))

# Group joined data set on created date and queue name
groupedDf = joinedDf \
                .groupBy(joinedDf.createdon_date, joinedDf.name) \
                .count() \
                .orderBy(joinedDf.createdon_date, joinedDf.name) \
                .withColumnRenamed("createdon_date", "date") \
                .withColumnRenamed("name", "queue_name")

groupedDf.show(truncate=False)
writeToCsv(groupedDf, "IncidentsPerDay")

# Participants by session
This calculates the number of participants by session

In [None]:
sessionParticipantDf = readEntityFromLake(storageAccountName, manifestPath, "msdyn_sessionparticipant")

# Define columns to select
sessionParticipantColumns = ["createdon","msdyn_omnichannelsession", "msdyn_omnichannelsessionname", "msdyn_sessionparticipantid"]

# Filter data frames for required rows and columns
filteredSessionDf = sessionParticipantDf \
                        .filter(sessionParticipantDf.createdon.isNotNull()) \
                        .select(*sessionParticipantColumns) \
                        .withColumn("createdon_date", to_date(sessionParticipantDf.createdon))

# Group joined data set on created date and queue name
groupedDf = filteredSessionDf \
                .groupBy(filteredSessionDf.createdon_date,filteredSessionDf.msdyn_omnichannelsession, filteredSessionDf.msdyn_omnichannelsessionname) \
                .count() \
                .orderBy(filteredSessionDf.createdon_date, filteredSessionDf.msdyn_omnichannelsession) \
                .withColumnRenamed("createdon_date", "date")

groupedDf.show(truncate=False)
writeToCsv(groupedDf, "ParticipantsPerSession")

# Average Scheduled duration, actual duration and on hold time for sessions

In [None]:
sessionDf = readEntityFromLake(storageAccountName, manifestPath, "msdyn_ocsession")

# Define columns to select
sessionColumns = ["createdon","msdyn_sessionid", "actualdurationminutes", "scheduleddurationminutes","onholdtime"]

# Filter data frames for required rows and columns
filteredSessionDf = sessionDf \
                        .filter(sessionDf.createdon.isNotNull()) \
                        .select(*sessionColumns) \
                        .withColumn("createdon_date", to_date(sessionDf.createdon))

# Group joined data set on created date and queue name
groupedDf = filteredSessionDf.groupBy(filteredSessionDf.createdon_date) \
                .agg(avg(filteredSessionDf.actualdurationminutes),avg(filteredSessionDf.scheduleddurationminutes),avg(filteredSessionDf.onholdtime)) \
                .orderBy(filteredSessionDf.createdon_date) \
                .withColumnRenamed("createdon_date", "date")

groupedDf.show(truncate=False)
writeToCsv(groupedDf, "SessionMetrics")

# Time to assign live work item

In [None]:
workItemDf = readEntityFromLake(storageAccountName, manifestPath, "msdyn_ocliveworkitem")

# Define columns to select
workItemColumns = ["createdon","msdyn_activeagentassignedon"]

# Filter data frames for required rows and columns
filteredDf = workItemDf \
                        .filter(workItemDf.createdon.isNotNull()) \
                        .select(*workItemColumns) \
                        .withColumn("createdon_date", to_date(workItemDf.createdon)) \
                        .withColumn("createdon_timestamp", to_timestamp(workItemDf.createdon)) \
                        .withColumn("assignedon_timestamp", to_timestamp(workItemDf.msdyn_activeagentassignedon)) \
                        .withColumn("timetoassigninseconds", col("assignedon_timestamp").cast("long") - col("createdon_timestamp").cast("long"))


# Group joined data set on created date and queue name
groupedDf = filteredDf.groupBy(filteredDf.createdon_date) \
                .agg(avg(filteredDf.timetoassigninseconds)) \
                .orderBy(filteredDf.createdon_date) \
                .withColumnRenamed("createdon_date", "date")

groupedDf.show(truncate=False)
writeToCsv(groupedDf, "TimeToAssignWorkItem")

# Average wrap up time

In [None]:
workItemDf = readEntityFromLake(storageAccountName, manifestPath, "msdyn_ocliveworkitem")
# Define columns to select
workItemColumns = ["createdon","msdyn_activeagentassignedon","msdyn_wrapupinitiatedon","actualend"]
# Filter data frames for required rows and columns
filteredDf = workItemDf \
                        .filter(workItemDf.createdon.isNotNull()) \
                        .select(*workItemColumns) \
                        .withColumn("createdon_date", to_date(workItemDf.createdon)) \
                        .withColumn("wrapupstart_timestamp", to_timestamp(workItemDf.msdyn_wrapupinitiatedon)) \
                        .withColumn("end_timestamp", to_timestamp(workItemDf.actualend)) \
                        .withColumn("timetowrapinseconds", col("actualend").cast("long") - col("msdyn_wrapupinitiatedon").cast("long"))

# Group joined data set on created date and queue name
groupedDf = filteredDf.groupBy(filteredDf.createdon_date) \
                .agg(avg(filteredDf.timetowrapinseconds)) \
                .orderBy(filteredDf.createdon_date) \
                .withColumnRenamed("createdon_date", "date")
groupedDf.show(truncate=False)
writeToCsv(groupedDf, "WrapUpTime")

# Average idle time for agent in a session

In [None]:
sesionParticipantDf = readEntityFromLake(storageAccountName, manifestPath, "msdyn_sessionparticipant")
# Define columns to select
workItemColumns = ["createdon","msdyn_idletime","msdyn_agentidname"]
# Filter data frames for required rows and columns
filteredSessionDf = sesionParticipantDf \
                        .filter(sesionParticipantDf.createdon.isNotNull()) \
                        .select(*workItemColumns) \
                        .withColumn("createdon_date", to_date(sesionParticipantDf.createdon)) \
# Group joined data set on created date and queue name
groupedDf = filteredSessionDf.groupBy(filteredSessionDf.createdon_date,filteredSessionDf.msdyn_agentidname) \
                .agg(avg(filteredSessionDf.msdyn_idletime)) \
                .orderBy(filteredSessionDf.createdon_date) \
                .withColumnRenamed("createdon_date", "date")
groupedDf.show(truncate=False)
writeToCsv(groupedDf, "IdleTimeSession")

# Average session duration

In [None]:
sessionDf = readEntityFromLake(storageAccountName, manifestPath, "msdyn_ocsession")
# Define columns to select
workItemColumns = ["createdon","actualstart","actualend"]
# Filter data frames for required rows and columns
filteredDf = sessionDf \
                        .filter(sessionDf.createdon.isNotNull()) \
                        .select(*workItemColumns) \
                        .withColumn("createdon_date", to_date(sessionDf.createdon)) \
                        .withColumn("start_timestamp", to_timestamp(sessionDf.actualstart)) \
                        .withColumn("end_timestamp", to_timestamp(sessionDf.actualend)) \
                        .withColumn("timeinseconds", col("actualend").cast("long") - col("start_timestamp").cast("long"))

# Group joined data set on created date and queue name
groupedDf = filteredDf.groupBy(filteredDf.createdon_date) \
                .agg(avg(filteredDf.timeinseconds)) \
                .orderBy(filteredDf.createdon_date) \
                .withColumnRenamed("createdon_date", "date")
groupedDf.show(truncate=False)
writeToCsv(groupedDf, "SessionDuration")