In [4]:
# # #Initializing the parameters
# # Read about how a Parameter cell should be used for definign and initializing
# # parameters in Synapse

# Storage Account Name
StorageAccountName = ""
# Main container/directory on the storage account
VivaInsightsDataFileSystem = ""

PipelineId = ""
MeetingQueryDatasetFolder = ""
SecondaryEmployeeId = ""

# Database connection information
SQLServerEndpoint = ""
DBName = ""
DBUser = ""
DBPass = ""
DBPort = ""




StatementMeta(SparkPool2, 38, 4, Finished, Available)

In [5]:
import sys
import json


from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import *


# constants_ path template to access storage account for read and write
inputFilePath = "abfss://{}@{}.dfs.core.windows.net/{}/raw/{}/*.txt"
storageAccount = "{}.dfs.core.windows.net"
outputFilePath = "https://{}.dfs.core.windows.net/{}/{}"

#Setting Prameters
extractionFS = VivaInsightsDataFileSystem

print("PipelineId is: ", PipelineId)


StatementMeta(SparkPool2, 38, 5, Finished, Available)

PipelineId is:  6f714f55-1b3c-4a32-ad47-deb4a3741f3f

In [6]:
from pyspark.sql.functions import split
from pyspark.sql.types import DateType

#Reading meeting csv file from storage account
meetingDf = spark.read.csv(inputFilePath.format(extractionFS, StorageAccountName, PipelineId, MeetingQueryDatasetFolder), header = 'true', inferSchema= 'true')

# Cleaning MeetingId column, removing the datetime portion 
meetingDf = meetingDf.withColumn("MeetingId", split(col("MeetingId"), ":").getItem(0))


# Dataframe prep

meetingDf = meetingDf.withColumn("StartDate",meetingDf['StartDate'].cast(DateType()))
meetingDf = meetingDf.withColumn("EndDate",meetingDf['EndDate'].cast(DateType()))

meetingDf = meetingDf.withColumn("StartTimestampUTC", to_timestamp(concat_ws(" ", meetingDf.StartDate, meetingDf.StartTimeUTC)))
meetingDf = meetingDf.withColumn("EndTimestampUTC", to_timestamp(concat_ws(" ",meetingDf.EndDate,  meetingDf.EndTimeUTC)))


meetingDf = meetingDf.withColumnRenamed("Organizer_"+SecondaryEmployeeId,"Organizer_EmployeeId")


meetingDf.createOrReplaceTempView('meetingDf')



StatementMeta(SparkPool2, 38, 6, Finished, Available)

In [8]:
# # Checking the Database for the last existing record
jdbcHostname = SQLServerEndpoint
jdbcDatabase = DBName
jdbcPort = DBPort
username = DBUser
password = DBPass
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
   "user" : username,
   "password" : password,
   "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
pushdown_query = "(Select max(StartTimestampUTC) as temp from viva_insights_meeting) tempTbl"
latestExistingDate = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties).first().temp
print("Latest existing meeting date in DB is", latestExistingDate)



StatementMeta(SparkPool2, 38, 8, Finished, Available)

Latest existing meeting date in DB is 2021-11-13 20:58:42

In [9]:
# Preparing dataframe for upsert/insert into database
# # Record selection
if (latestExistingDate == None):
    outputStatus = "FullUpload"
    outputDf = meetingDf
else:
    outputStatus = "PartialUpload"
    latestExistingDate = str(latestExistingDate.date())
    outputDf = meetingDf[meetingDf.StartTimestampUTC > latestExistingDate]

# Attribute selection
columns = ["MeetingId","StartTimestampUTC","EndTimestampUTC", "Organizer_PersonId","Organizer_EmployeeId", "Organizer_Organization", "Organizer_LevelDesignation"
, "Organizer_IsInternal","Attendees","Attendees_with_conflicting_meetings", "Invitees", "Emails_sent_during_meetings"
, "Instant_messages_sent_during_meetings" , "Attendees_multitasking", "Attendee_meeting_hours", "Redundant_attendees"
, "Total_meeting_cost", "Total_redundant_hours" , "IsCancelled", "DurationHours", "IsRecurring","Subject" ,"TotalAccept"
, "TotalNoResponse", "TotalDecline" , "TotalNoEmailsDuringMeeting", "TotalNoDoubleBooked", "TotalNoAttendees"
, "MeetingResources", "BusinessProcesses"]

outputDf = outputDf.select([col for col in columns])

# display(outputDf)
print("OutputStatus is: ", outputStatus)
print("Number of records inserted is: ", outputDf.count())


StatementMeta(SparkPool2, 38, 9, Finished, Available)

OutputStatus is:  PartialUpload
Number of records inserted is:  9

In [10]:
# Insert/Upset into database
mode = "append"
url = "jdbc:sqlserver://mgdcvivasynapse.sql.azuresynapse.net:1433;database=VivaInsights"
outputDf.write.jdbc(url=jdbcUrl, table="dbo.viva_insights_meeting", mode=mode, properties=connectionProperties)

StatementMeta(SparkPool2, 38, 10, Finished, Available)