In [None]:
# Default parameters that can be freely changed or overriden by pipeline run 

# Inputs
calendarPath = "abfss://mgdc@onastoreljdk5xxwtgp3e.dfs.core.windows.net/calendar_2022-06-01_to_2022-11-07/"
emailPath = "abfss://mgdc@onastoreljdk5xxwtgp3e.dfs.core.windows.net/email_2022-06-01_to_2022-11-07/"
teamsChatPath = "abfss://mgdc@onastoreljdk5xxwtgp3e.dfs.core.windows.net/teamschat_2022-06-01_to_2022-11-07/"
userPath = "abfss://mgdc@onastoreljdk5xxwtgp3e.dfs.core.windows.net/user_2022-06-01_to_2022-11-07/"

#Output Format: Can be csv or parquet
outputFormat = "csv"

# Output path of user vertices
usersOutputPath = "abfss://output@onastoreljdk5xxwtgp3e.dfs.core.windows.net/users_2022-06-01_to_2022-11-01_weighted.csv"

# Output path of user to user edges
interactionsOutputPath = "abfss://output@onastoreljdk5xxwtgp3e.dfs.core.windows.net/interactions_2022-06-01_to_2022-11-01_weighted.csv"

# StartDate/EndDate for this run that is denormalized to users and interactions tables
period = "2022-06-01 to 2022-11-07"

# Whether or not to md5 hash the input user emails
obfuscateEmails = True

# Leiden max cluster size, the maximum possible size for a detected community
leidenMaxClusterSize = 1000

In [None]:
usersOutputPath = usersOutputPath[0:usersOutputPath.rfind(".")]
interactionsOutputPath = interactionsOutputPath[0:interactionsOutputPath.rfind(".")]

In [None]:
# Load data
try:
    emailsRaw = spark.read.json(emailPath)
except (Exception) as error:
    print(error)
    raise Exception("Emails data not loaded, continuing without emails")

try:
    meetingsRaw = spark.read.json(calendarPath)
except (Exception) as error:
    print(error)
    raise Exception("Calendar data not loaded, continuing without meetings")

try:
    teamsChatsRaw = spark.read.json(teamsChatPath)
except (Exception) as error:
    print(error)
    raise Exception("TeamsChats data not loaded, continuing without teams messages")

try:
    usersRaw = spark.read.json(userPath)
except (Exception) as error:
    print(error)
    raise Exception("Users data not loaded. Check the file path.")

In [None]:
# Drop duplicates
usersDedup = usersRaw.dropDuplicates(["puser"])
emailsDedup = emailsRaw.dropDuplicates(["Id"]).select("Sender", "ToRecipients")
teamschatsDedup = teamsChatsRaw.dropDuplicates(["Id"]).select("Sender", "ToRecipients")
meetingsDedup = meetingsRaw.dropDuplicates(["Id"]).select("organizer", "attendees", "start", "end", "isAllDay", "isCancelled", "isOrganizer", "iCalUId")

In [None]:
from pyspark.sql.functions import coalesce, col, count, explode, format_number, isnull, lit, md5, rand, size, udf, unix_timestamp
import pyspark.sql.functions as F
from pyspark.sql import types as t

In [None]:
# Get the user email addresses and filter emails, teamschat, and meetings to only contain edges with those users
usersEmailAddresses = usersDedup.selectExpr("lower(mail) as id")

In [None]:
# Explode row with one sender -> N recipients into N rows
# Filter to only keep emails with 8 or less recipients
emails = emailsDedup.where(size(col("ToRecipients")) <= 8) \
                    .withColumn("weight", 1.0/size(col("ToRecipients"))) \
                    .select(F.lower(col("Sender.EmailAddress.Address")).alias("sender"), col("weight"), explode(col("ToRecipients")).alias("exploded")) \
                    .join(usersEmailAddresses, col("id") == col("sender"), "inner").drop("id") \
                    .join(usersEmailAddresses, col("id") == F.lower(col("exploded.EmailAddress.Address")), "inner").drop("id") \
                    .withColumnRenamed("sender", "src") \
                    .withColumn("dst", F.lower(col("exploded.EmailAddress.Address"))) \
                    .select(col("src"), col("dst"), col("weight")) \
                    .where(col("src") != col("dst"))
if obfuscateEmails:
    emails = emails.withColumn("srcHash", md5(col("src"))) \
                .withColumn("dstHash", md5(col("dst"))) \
                .drop("src", "dst").selectExpr("srcHash as src", "dstHash as dst", "weight")

In [None]:
# Explode row with one organizer -> N attendees into N rows
# Filter to only keep meetings at least 2 and at most 9 attendees. (Number of attendees includes the organizer)
# Filter out cancelled and all day meetings
# Filter to the meeting instance belonging to the organizer's calendar
# Weight by meeting length in seconds divided by 400 (6.67 minutes) and divided by the number of recipients

dtFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS"
meetings = meetingsDedup.where((size(col("attendees")) <= 9) & (size(col("attendees")) >= 2)) \
                        .where((col("isAllDay") == False) & (col("isCancelled") == False) & (col("isOrganizer") == True)) \
                        .withColumn("meetingDurationInSeconds", unix_timestamp(col("end.dateTime"), dtFormat).cast("long") - unix_timestamp(col("start.dateTime"), dtFormat).cast("long")) \
                        .withColumn("weight", (col("meetingDurationInSeconds")/400.0) / (size(col("attendees")) - 1)) \
                        .select(F.lower(col("organizer.emailAddress.address")).alias("sender"), col("weight"), col("meetingDurationInSeconds"), col("attendees"), col("iCalUId"), explode(col("attendees")).alias("exploded")) \
                        .join(usersEmailAddresses, col("id") == col("sender"), "inner").drop("id") \
                        .join(usersEmailAddresses, col("id") == F.lower(col("exploded.EmailAddress.Address")), "inner").drop("id") \
                        .withColumnRenamed("sender", "src") \
                        .withColumn("dst", F.lower(col("exploded.EmailAddress.Address"))) \
                        .select(col("src"), col("dst"), col("weight"), col("meetingDurationInSeconds"), col("iCalUId"), col("attendees")) \
                        .where(col("src") != col("dst"))
if obfuscateEmails:
    meetings = meetings.withColumn("srcHash", md5(col("src"))) \
                       .withColumn("dstHash", md5(col("dst"))) \
                       .drop("src", "dst").selectExpr("srcHash as src", "dstHash as dst", "weight", "meetingDurationInSeconds", "iCalUId","attendees")

In [None]:
# Explode row with one sender -> N recipients into N rows
# Filter to only keep teamschat messages with 8 or less recipients
teamschats = teamschatsDedup.where(size(col("ToRecipients")) <= 8) \
                            .withColumn("weight", 1.0/(8*size(col("ToRecipients")))) \
                            .select(F.lower(col("Sender.EmailAddress.Address")).alias("sender"), col("weight"), explode(col("ToRecipients")).alias("exploded")) \
                            .join(usersEmailAddresses, col("id") == col("sender"), "inner").drop("id") \
                            .join(usersEmailAddresses, col("id") == F.lower(col("exploded.EmailAddress.Address")), "inner").drop("id") \
                            .withColumnRenamed("sender", "src") \
                            .withColumn("dst", F.lower(col("exploded.EmailAddress.Address"))) \
                            .select(col("src"), col("dst"), col("weight")) \
                            .where(col("src") != col("dst"))
if obfuscateEmails:
    teamschats = teamschats.withColumn("srcHash", md5(col("src"))) \
                           .withColumn("dstHash", md5(col("dst"))) \
                           .drop("src", "dst").selectExpr("srcHash as src", "dstHash as dst", "weight")

In [None]:
# Join after counting and summing weights from emails, teams chats, and meetings
emailEdges = emails.groupBy("src", "dst").agg(F.count(col("dst")).alias("InteractionsEmail"), F.sum(col("weight")).alias("EmailWeight")) \
                    .withColumnRenamed("src", "src1").withColumnRenamed("dst", "dst1")

meetingEdges = meetings.groupBy("src", "dst").agg(F.count(col("dst")).alias("InteractionsMeetings"), F.sum(col("weight")).alias("MeetingsWeight")) \
                           .withColumnRenamed("src", "src2").withColumnRenamed("dst", "dst2")

teamsChatEdges = teamschats.groupBy("src", "dst").agg(F.count(col("dst")).alias("InteractionsTeamsChat"), F.sum(col("weight")).alias("TeamsChatWeight")) \
                           .withColumnRenamed("src", "src3").withColumnRenamed("dst", "dst3")

allEdges = emailEdges.alias("e").join(meetingEdges.alias("m"), (col("src1") == col("src2")) & (col("dst1") == col("dst2")), "full") \
                                .join(teamsChatEdges.alias("t"), (col("src1") == col("src3")) & (col("dst1") == col("dst3")), "full")
                              

In [None]:
# Coalesce together src/dst duplicate columns after join
teamsChatToEmailRatio = 8 # interaction ratio for teamschat to email
edgesCombined = allEdges.select(
    coalesce( *[col(c) for c in ["src1", "src2", "src3"]]).alias("Source"),
    coalesce( *[col(c) for c in ["dst1", "dst2", "dst3"]]).alias("Target"),
    col("InteractionsEmail"),
    col("InteractionsMeetings"),
    col("InteractionsTeamsChat"),
    col("EmailWeight"),
    col("MeetingsWeight"),
    col("TeamsChatWeight")
    ).fillna(0) \
    .withColumn("Interactions", (col("InteractionsEmail") + col("InteractionsMeetings") + F.round(col("InteractionsTeamsChat")/teamsChatToEmailRatio)).cast('int')) \
    .withColumn("InteractionsWeight", (col("EmailWeight") + col("MeetingsWeight") + col("TeamsChatWeight")/teamsChatToEmailRatio)) \
    .withColumn("Period", lit(period))

if outputFormat == "csv":
    edgesCombined.coalesce(1).write.option("header", True).mode("overwrite").csv(interactionsOutputPath)
    
    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    # get the part file generated by spark write
    fs = Path(interactionsOutputPath).getFileSystem(sc._jsc.hadoopConfiguration())
    part_file = fs.globStatus(Path(interactionsOutputPath + "/part*"))[0].getPath()
    # set final target path
    target_path_interactions = interactionsOutputPath+"."+outputFormat
    # move and rename the file
    fs.rename(part_file, Path(target_path_interactions))
    fs.delete(Path(interactionsOutputPath), True)
elif outputFormat == "parquet":
    edgesCombined.write.option("header", True).mode("overwrite").parquet(interactionsOutputPath)
else:
    raise Exception ("outputFormat should be csv or parquet")

In [None]:
eventsOrganized = meetings.groupBy("src").count().withColumnRenamed("count", "NumberOfEventsOrganized")
eventsAttended = meetings.groupBy("dst").count().withColumnRenamed("count", "NumberOfEventsAttended")
emailsSent = emails.groupBy("src").count().withColumnRenamed("count", "NumberOfEmailsSent")
emailsReceived = emails.groupBy("dst").count().withColumnRenamed("count", "NumberOfEmailsReceived")
teamsChatsSent = teamschats.groupBy("src").count().withColumnRenamed("count", "NumberOfChatsSent")
teamsChatsReceived = teamschats.groupBy("dst").count().withColumnRenamed("count", "NumberOfChatsReceived")

In [None]:
# Select user properties for output and join all raw email/teamschat/meeting counts
if obfuscateEmails:
    usersDedup = usersDedup.withColumn("EmailAddress",  md5(F.lower(col("mail"))))
else:
    usersDedup = usersDedup.withColumn("EmailAddress", F.lower(col("mail")))
usersRenamed = usersDedup.selectExpr("EmailAddress", "department as Department", "jobTitle as Title", "state as StateOrProvince",
                                     "country as Country","preferredLanguage as Languages","ptenant as TenantID")
usersJoined = usersRenamed.join(eventsOrganized, col("src") == col("EmailAddress"), "left").drop("src") \
                          .join(eventsAttended, col("dst") == col("EmailAddress"), "left").drop("dst") \
                          .join(emailsSent, col("src") == col("EmailAddress"), "left").drop("src") \
                          .join(emailsReceived, col("dst") == col("EmailAddress"), "left").drop("dst") \
                          .join(teamsChatsSent, col("src") == col("EmailAddress"), "left").drop("src") \
                          .join(teamsChatsReceived, col("dst") == col("EmailAddress"), "left").drop("dst") \
                          .fillna(0)
numUsers = usersJoined.count()

In [None]:
# Calculate out-degrees and in-degrees based on number of connections
outDegreeEdges = edgesCombined.where(col("Interactions") > 0).groupBy("Source").count().select(col("Source"), col("count").alias("Out-DegreeIndex"))
inDegreeEdges = edgesCombined.where(col("Interactions") > 0).groupBy("Target").count().select(col("Target"), col("count").alias("In-DegreeIndex"))

In [None]:
# Construct networkx graph object
import networkx as nx
edges = edgesCombined.selectExpr("Source as src", "Target as dst", "InteractionsWeight as wgt") \
                     .where((col("InteractionsWeight") >= 0.25) & (col("InteractionsWeight") <= 2000))
edgesList = [(e.src, e.dst, e.wgt) for e in edges.collect()]
graph = nx.DiGraph()
graph.add_weighted_edges_from(edgesList)

In [None]:
# Calculate Influence Index based on page rank
graphPageRank = nx.pagerank(graph, alpha=0.85, personalization=None, max_iter=100, tol=0.001)

In [None]:
# Define udf for adding page rank to dataframe
maxPageRank = max(graphPageRank.values())
def getPageRank(x):
    pageRank = graphPageRank.get(x)
    if pageRank is None:
        return 0
    return  pageRank / maxPageRank
influenceIndexUdf = udf(getPageRank, t.FloatType())

In [None]:
# Calculate Betweeness Index
# Commented out since the complexity is O(EV) where E = edges, V = vertices
# This will be slow for larger graphs, roughly above 10K users
# graphBetweenness = nx.betweenness_centrality(graph)

In [None]:
# Define udf for adding betweeness to dataframe
# def getBetweeness(x):
#     return graphBetweenness.get(x)
# betweenessIndexUdf = udf(getBetweeness, t.FloatType())

In [None]:
# Calculate Community Bridging Index based on Leiden community detection
import graspologic
from graspologic.partition import leiden

# Constructs undirected graph using bidrectional edges only, see networkx DiGraph.to_undirected doc
undirectedGraph = graph.to_undirected()

leidenResult = graspologic.partition.hierarchical_leiden(undirectedGraph, max_cluster_size=leidenMaxClusterSize)
leidenClusters = leidenResult.final_level_hierarchical_clustering()

In [None]:
# Construct udf for mapping users to community label
def getLabel(x):
    return leidenClusters.get(x)

labelUdf = udf(getLabel, t.StringType())

In [None]:
# Counts how many communities C a user is connected to with an out edge, normalized by num of communities
# For all users, compute C / (num of Communities)
# 1 = they are connected to all communities
# 0 = they have no connections

# enrich edges by mapping target dst node to community
edgesLabelled = edges.withColumn("Community", labelUdf(col("dst"))).drop("dst").distinct()

# group on src and count how many distinct community labelled targets each src has
numCommunities = len(set(leidenClusters.values()))
communityBridging = edgesLabelled.groupBy("src").count() \
                                 .withColumn("CommunityBridgeIndex", col("count") / float(numCommunities)).drop("count")

In [None]:
# Join all indexes to users and output
usersEnriched = usersJoined.join(outDegreeEdges, col("Source") == col("EmailAddress"), "left").drop("Source") \
                           .join(inDegreeEdges, col("Target") == col("EmailAddress"), "left").drop("Target") \
                           .fillna(0) \
                           .withColumn("DegreeIndex", (col("In-DegreeIndex") + col("Out-DegreeIndex")) / (2 * numUsers)) \
                           .withColumn("Community", labelUdf(col("EmailAddress"))) \
                           .join(communityBridging, col("src") == col("EmailAddress"), "left").drop("src") \
                           .withColumn("InfluenceIndex", influenceIndexUdf(col("EmailAddress"))) \
                           .fillna(0) \
                           .withColumn("Period", lit(period))

if outputFormat == "csv":
    usersEnriched.coalesce(1).write.option("header", True).mode("overwrite").csv(usersOutputPath)
    
    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    # get the part file generated by spark write
    fs = Path(usersOutputPath).getFileSystem(sc._jsc.hadoopConfiguration())
    part_file = fs.globStatus(Path(usersOutputPath + "/part*"))[0].getPath()
    #set final target path
    target_path_users = usersOutputPath+"."+outputFormat
    # move and rename the file
    fs.rename(part_file, Path(target_path_users))
    fs.delete(Path(usersOutputPath), True)
elif outputFormat == "parquet":
    usersEnriched.write.option("header", True).mode("overwrite").parquet(usersOutputPath)
else:
    raise Exception ("outputFormat should be csv or parquet")