Copyright (c) Microsoft Corporation.

Licensed under the MIT License.

In [None]:
data_lake_account_name = '' # Synapse Workspace ADLS
file_system_name = ''

In [None]:
inputFolderPath = '2021/11/2/2021-11-02T03:55:08'
initialLoad = True

In [None]:
if initialLoad == True:
    spark.sql('drop table if exists messages_temp')
    spark.sql('drop table if exists messagesdata') 

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

def load_messages_json_file(messagesPath):
    messages_df= spark.read.load(messagesPath, format='json')
    messages_df = messages_df.select("Id","ConversationId","Subject","SentDateTime","Sender","ToRecipients","CcRecipients","BccRecipients")

    df = messages_df.select('*',size('ToRecipients').alias('ToRecipients_cnt'),size('CcRecipients').alias('CcRecipients_cnt'),size('BccRecipients').alias('BccRecipients_cnt')) \
                    .select(explode(col("ToRecipients")).alias("ToRecipients"),"Id","ConversationId","Subject","SentDateTime","Sender","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt") \
                    .select(["Id","ConversationId","Subject","SentDateTime","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt",col("Sender.EmailAddress.Address").alias("Sender"),
                            col("ToRecipients.EmailAddress.Address").alias("Recipient")]) \
                    .withColumn("RType", lit('To')) \
                    .withColumn("LoadFolderPath", lit(inputFolderPath))
    try:
        df = df.union(messages_df.select('*',size('ToRecipients').alias('ToRecipients_cnt'),size('CcRecipients').alias('CcRecipients_cnt'),size('BccRecipients').alias('BccRecipients_cnt')) \
                        .select(explode(col("CcRecipients")).alias("CcRecipients"),"Id","ConversationId","Subject","SentDateTime","Sender","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt") \
                        .select(["Id","ConversationId","Subject","SentDateTime","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt",col("Sender.EmailAddress.Address").alias("Sender"),
                                col("CcRecipients.EmailAddress.Address").alias("Recipient")]) \
                        .withColumn("RType", lit('Cc')) \
                        .withColumn("LoadFolderPath", lit(inputFolderPath)) \
                        )
    except:
        pass 
    try:
        df = df.union(messages_df.select('*',size('ToRecipients').alias('ToRecipients_cnt'),size('CcRecipients').alias('CcRecipients_cnt'),size('BccRecipients').alias('BccRecipients_cnt')) \
                        .select(explode(col("BccRecipients")).alias("BccRecipients"),"Id","ConversationId","Subject","SentDateTime","Sender","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt") \
                        .select(["Id","ConversationId","Subject","SentDateTime","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt",col("Sender.EmailAddress.Address").alias("Sender"),
                                col("BccRecipients.EmailAddress.Address").alias("Recipient")]) \
                        .withColumn("RecipientType", lit('Bcc')) \
                        .withColumn("LoadFolderPath", lit(inputFolderPath)) \
                        )                
    except:
        pass

    df.write.mode("append").saveAsTable("messages_temp") 

def get_message_subfolder_files(folder):
    children = mssparkutils.fs.ls(folder)
    for child in children:
        if child.name == 'metadata':
            continue
        if child.isDir:
           get_message_subfolder_files(child.path)
        else:       
            # try:
            load_messages_json_file(child.path)
            # except:
            #     print(child.path)          

In [None]:
messagesPath = f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/o365data/messages"
get_message_subfolder_files(messagesPath)

In [None]:
sql_str = '''SELECT Id,ConversationId,Subject,to_timestamp(SentDateTime) as SentDateTime,to_date(SentDateTime) as SentDate,
lower(Sender) as Sender,lower(Recipient) as Recipient,RType,
reverse(split(lower(Sender),'@'))[0] as Sender_Domain,
reverse(split(lower(Recipient),'@'))[0] as Recipient_Domain,
ToRecipients_cnt, CcRecipients_cnt,BccRecipients_cnt,
(ToRecipients_cnt + CcRecipients_cnt + BccRecipients_cnt) as Recipients_cnt
FROM messages_temp'''

sql_str = sql_str + " where LoadFolderPath = '" + inputFolderPath + "'" 

sql_str = sql_str + ' UNION ALL '

sql_str = sql_str + '''SELECT Id,ConversationId,Subject,to_timestamp(SentDateTime) as SentDateTime,to_date(SentDateTime) as SentDate,
lower(Recipient) as Sender,lower(Sender) as Recipient,RType,
reverse(split(lower(Recipient),'@'))[0] as Sender_Domain,
reverse(split(lower(Sender),'@'))[0] as Recipient_Domain,
ToRecipients_cnt, CcRecipients_cnt,BccRecipients_cnt,
(ToRecipients_cnt + CcRecipients_cnt + BccRecipients_cnt) as Recipients_cnt
FROM messages_temp'''

sql_str = sql_str + " where LoadFolderPath = '" + inputFolderPath + "'" 

messagesdata_df = spark.sql(sql_str)
messagesdata_df.write.mode("append").saveAsTable("messagesdata")