Copyright (c) Microsoft Corporation.

Licensed under the MIT License.

In [None]:
data_lake_account_name = '' # Synapse Workspace ADLS
file_system_name = ''
subfolder_name = '' 
folder_name = 'messages'
user_group_name = ''
initialLoad = 'false'

In [None]:
base_path = f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/"

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

def load_messages_json_file(messagesPath):
    messages_df= spark.read.load(messagesPath, format='json')
    messages_df = messages_df.select("Id","puser","ParentFolderId","ConversationId","Subject","CreatedDateTime","LastModifiedDateTime","SentDateTime","Sender","ToRecipients","CcRecipients","BccRecipients")

    df = messages_df.select('*',size('ToRecipients').alias('ToRecipients_cnt'),size('CcRecipients').alias('CcRecipients_cnt'),size('BccRecipients').alias('BccRecipients_cnt')) \
                    .select(explode(col("ToRecipients")).alias("ToRecipients"),"Id","puser","ParentFolderId","ConversationId","Subject","CreatedDateTime","LastModifiedDateTime","SentDateTime","Sender","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt") \
                    .select(["Id","puser","ParentFolderId","ConversationId","Subject","CreatedDateTime","LastModifiedDateTime","SentDateTime","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt",col("Sender.EmailAddress.Address").alias("Sender"),
                            col("ToRecipients.EmailAddress.Address").alias("Recipient")]) \
                    .withColumn("RType", lit('To')) \
                    .withColumn("LoadDateRange", lit(subfolder_name)) \
                    .withColumn("UserGroup", lit(user_group_name))
    try:
        df = df.union(messages_df.select('*',size('ToRecipients').alias('ToRecipients_cnt'),size('CcRecipients').alias('CcRecipients_cnt'),size('BccRecipients').alias('BccRecipients_cnt')) \
                        .select(explode(col("CcRecipients")).alias("CcRecipients"),"Id","puser","ParentFolderId","ConversationId","Subject","CreatedDateTime","LastModifiedDateTime","SentDateTime","Sender","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt") \
                        .select(["Id","puser","ParentFolderId","ConversationId","Subject","CreatedDateTime","LastModifiedDateTime","SentDateTime","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt",col("Sender.EmailAddress.Address").alias("Sender"),
                                col("CcRecipients.EmailAddress.Address").alias("Recipient")]) \
                        .withColumn("RType", lit('Cc')) \
                        .withColumn("LoadDateRange", lit(subfolder_name)) \
                        .withColumn("UserGroup", lit(user_group_name))
                        )
    except:
        pass 
    try:
        df = df.union(messages_df.select('*',size('ToRecipients').alias('ToRecipients_cnt'),size('CcRecipients').alias('CcRecipients_cnt'),size('BccRecipients').alias('BccRecipients_cnt')) \
                        .select(explode(col("BccRecipients")).alias("BccRecipients"),"Id","puser","ParentFolderId","ConversationId","Subject","CreatedDateTime","LastModifiedDateTime","SentDateTime","Sender","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt") \
                        .select(["Id","puser","ParentFolderId","ConversationId","Subject","CreatedDateTime","LastModifiedDateTime","SentDateTime","ToRecipients_cnt","CcRecipients_cnt","BccRecipients_cnt",col("Sender.EmailAddress.Address").alias("Sender"),
                                col("BccRecipients.EmailAddress.Address").alias("Recipient")]) \
                        .withColumn("RecipientType", lit('Bcc')) \
                        .withColumn("LoadDateRange", lit(subfolder_name)) \
                        .withColumn("UserGroup", lit(user_group_name))
                        )                
    except:
        pass

    try:
        df = df.withColumn('CreatedDateTime', to_timestamp('CreatedDateTime')) \
                .withColumn('LastModifiedDateTime', to_timestamp('LastModifiedDateTime')) \
                .withColumn('SentDateTime', to_timestamp('SentDateTime')) \
                .withColumn('SentDate', to_date('SentDateTime')) \
                .withColumn('Sender', lower(col('Sender'))) \
                .withColumn('Recipient', lower(col('Recipient'))) \
                .withColumn('Sender_Domain', reverse(split(lower(col('Sender')),'@'))[0]) \
                .withColumn('Recipient_Domain', reverse(split(lower(col('Recipient')),'@'))[0]) \
                .withColumn('Recipients_cnt',(col('ToRecipients_cnt') + col('CcRecipients_cnt') + col('BccRecipients_cnt'))) \
                .select('Id','puser','ParentFolderId','ConversationId','Subject','CreatedDateTime', \
                        'LastModifiedDateTime','SentDateTime','SentDate','Sender','Recipient', \
                        'RType','Sender_Domain','Recipient_Domain','ToRecipients_cnt', \
                        'CcRecipients_cnt','BccRecipients_cnt','Recipients_cnt','LoadDateRange','UserGroup')
    except:
        pass
        
    processed_path = base_path +"o365data_processed_unfiltered/" + folder_name + "/" + user_group_name + "/" + subfolder_name
    df.write.format("parquet").mode("append").save(processed_path)
    
def get_message_subfolder_files(folder):
    children = mssparkutils.fs.ls(folder)
    for child in children:
        if child.name == 'metadata':
            continue
        if child.isDir:
           get_message_subfolder_files(child.path)
        else:       
            load_messages_json_file(child.path)
            

In [None]:
messagesPath = base_path +"o365data/" + folder_name + "/" + user_group_name + "/" + subfolder_name

get_message_subfolder_files(messagesPath) 

In [None]:
processed_path = base_path + "o365data_processed_unfiltered/" + folder_name + "/" + user_group_name + "/" + subfolder_name
df_processed = spark.read.load(processed_path, format='parquet',header=True)

folder_filter_path = base_path + "o365data_processed/folderfiltersdata/" #+ user_group_name
df_folder_filter = spark.read.load(folder_filter_path, format='parquet',header=True)

df_folder_filter = df_folder_filter.withColumnRenamed('id','FolderId') \
                                    .withColumnRenamed('puser','puser1') \
                                    .withColumnRenamed('ParentFolderId','ParentFolderId1')

df_filtered = df_processed.join(df_folder_filter, \
                (df_processed['puser'] == df_folder_filter['puser1']) & (df_processed['ParentFolderId'] == df_folder_filter['FolderId'])) \
                .select(df_processed.columns)   


filtered_path = base_path +"o365data_processed/" + folder_name + "/" + user_group_name + "/" + subfolder_name
df_filtered.write.format("parquet").mode("overwrite").save(filtered_path)


In [None]:
from pyspark.sql.types import *
from pyspark.sql import Row

load_stats_schema = StructType([StructField('all_messages_count', IntegerType()), 
                     StructField('filtered_messages_count',IntegerType()),
                     StructField('discarded_messages_count',IntegerType()),
                     ])
load_stats = [Row(all_messages_count=df_processed.count(), 
            filtered_messages_count=df_filtered.count(),
            discarded_messages_count = (df_processed.count()-df_filtered.count())
            )]
df_load_stats = spark.createDataFrame(load_stats, load_stats_schema)

df_load_stats = df_load_stats.withColumn("LoadDateRange", lit(subfolder_name)) \
                        .withColumn("UserGroup", lit(user_group_name))

load_stats_path = base_path +"o365data_loadstats/" + folder_name + "/" + user_group_name + "/" + subfolder_name
df_load_stats.write.format("parquet").mode("overwrite").save(load_stats_path)

In [None]:
processed_path = base_path +"o365data_processed/" + folder_name + "/" + user_group_name + "/" + subfolder_name
df_messages = spark.read.format("parquet").load(processed_path,header=True)
df_messages = df_messages.select('Id','Sender','Recipient','RType','SentDate','SentDateTime','Sender_Domain','Recipient_Domain')

df_messages = df_messages.withColumn('IsReversed_Row',lit(0))

df_messages_copy = df_messages.select('Id','Sender','Recipient','RType','SentDate','SentDateTime','Sender_Domain','Recipient_Domain') \
                              .withColumn('Sender',col('Recipient')) \
                              .withColumn('Recipient',col('Sender')) \
                              .withColumn('Sender_Domain',col('Recipient_Domain')) \
                              .withColumn('Recipient_Domain',col('Sender_Domain')) \
                              .withColumn('IsReversed_Row',lit(1))
df_messages = df_messages.union(df_messages_copy)

df_messages.write.mode("append").saveAsTable("messagesdata")