forked from diehl/Email-Analytics
/
berkeley_enron_import_redis.py
329 lines (244 loc) · 11.2 KB
/
berkeley_enron_import_redis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# import the modules
import cgi
import marshal
import MySQLdb
import redis
from calendar import timegm
from dateutil import tz
from dateutil.parser import parse
# create a connection to the MySQL databases
UCBdb = MySQLdb.connect(host='localhost',user='root',passwd='',db='berkeley_enron')
ISIdb = MySQLdb.connect(host='localhost',user='root',passwd='',db='isi_enron')
# create a connection to the redis database
rdb = redis.Redis(host='localhost', port=6379, db=0)
# delete previous keys if they exist
rdb.delete('fully_observed_addresses','messages','social_relationships','sorted_message_ids',
'mids_per_directed_comm_relationship', 'recipients_per_sender_address',
'senders_per_recipient_address')
# fetch message information from the UC Berkeley MySQL database
print "Pulling message data from the UC Berkeley MySQL database."
cursor = UCBdb.cursor()
cursor.execute("select t2.email, t1.date, t1.timezone, t1.smtpid, t1.subject, t1.body from msgs as t1, addresses as t2 where t1.eid = t2.eid;")
results = cursor.fetchall()
# fetch email addresses corresponding to the 151 email inboxes that make up the dataset
print "Pulling email address data from the USC/ISI MySQL database."
cursor = ISIdb.cursor()
cursor.execute("select Email_id as email from employeelist;")
addressTuples = cursor.fetchall()
fullyObservedAddresses = [e[0] for e in addressTuples]
# find and modify the specific address that appears to be incorrectly represented in the ISI database
for i in range(len(fullyObservedAddresses)):
if fullyObservedAddresses[i] == 'paul.y barbo@enron.com':
break
fullyObservedAddresses[i] = "paul.y'barbo@enron.com"
# add the set of fully observed addresses to the redis database
for address in fullyObservedAddresses:
rdb.sadd('fully_observed_addresses',address)
# for each message...
numMsgs = len(results)
for i in xrange(numMsgs):
# get the message
msg = results[i]
# get the message datetime
msgDT = msg[1]
# get the message timezone
msgTZ = msg[2]
if msgDT != None:
# create new datetime object with time normalized to UTC
msgDT = parse(msgDT.strftime('%Y/%m/%d %H:%M:%S') + ' ' + msgTZ[:5]).astimezone(tz.tzoffset('',0))
# convert the datetime into seconds since the epoch
msgDTsecs = timegm(msgDT.utctimetuple())
# message datetime string
msgDTstring = str(msgDT)
else:
# message datetime string
msgDTstring = 'None'
# seconds since the epoch
msgDTsecs = 0
# get the message ID
msgID = msg[3]
# get the message subject
msgSubj = msg[4]
# get the message body
msgBody = msg[5]
# get the message sender's email address
sender = msg[0]
# decode the address from ISO-8859-1 and encode in ASCII preserving extended characters
sender = cgi.escape(sender.decode('iso-8859-1')).encode('ascii','xmlcharrefreplace')
# create a message dictionary
mDict = { 'message_id' : msgID, 'datetime' : msgDTstring, 'epoch_secs' : msgDTsecs,
'subject' : msgSubj, 'body' : msgBody, 'sender' : sender, 'to' : [],
'cc' : [], 'bcc' : [] }
# add message dictionary to the redis hash
rdb.hset('messages',msgID,marshal.dumps(mDict))
if (i+1) % 500 == 0:
print "%d messages processed." % (i+1)
# fetch recipient information from the MySQL database
print "Pulling recipient data from the MySQL database."
cursor = UCBdb.cursor()
cursor.execute("select t1.smtpid, t2.reciptype, t2.reciporder, t3.email from msgs as t1, recip_info as t2, addresses as t3 where t1.mid = t2.mid and t2.eid = t3.eid;")
results = cursor.fetchall()
# for each recipient...
numRecips = len(results)
for i in xrange(numRecips):
# get the recipient information
recip = results[i]
# get the message ID
msgID = recip[0]
# get the recipient type
recipType = recip[1]
# get the recipient order
recipOrder = int(recip[2])
# get the recipient email address
recipEmail = recip[3]
# decode the address from ISO-8859-1 and encode in ASCII preserving extended characters
recipEmail = cgi.escape(recipEmail.decode('iso-8859-1')).encode('ascii','xmlcharrefreplace')
# fetch message dictionary from redis
mDictStr = rdb.hget('messages',msgID)
mDict = marshal.loads(mDictStr)
# add recipient information to the message
rTup = (recipEmail,recipOrder)
if mDict.has_key(recipType):
mDict[recipType].append(rTup)
else:
mDict[recipType] = [rTup]
# push message dictionary back to redis
rdb.hset('messages',msgID,marshal.dumps(mDict))
if (i+1) % 5000 == 0:
print "%d recipients processed." % (i+1)
# generate ordered lists for the message recipients and a sorted set of message ids
i = 0
for msgID in rdb.hkeys('messages'):
# fetch message dictionary from redis
mDictStr = rdb.hget('messages',msgID)
mDict = marshal.loads(mDictStr)
# sort the recipients by their position info and reduce the lists down to the addresses
# only, now in the proper order
fields = ['to','cc','bcc']
for field in fields:
mDict[field].sort(key=lambda tup: tup[1])
mDict[field] = [tup[0] for tup in mDict[field]]
# push message dictionary back to redis
rdb.hset('messages',msgID,marshal.dumps(mDict))
# add message id to the sorted set
rdb.zadd('sorted_message_ids',mDict['message_id'],mDict['epoch_secs'])
i += 1
if i % 500 == 0:
print "%d messages processed." % i
# fetch manager-subordinate relationship information from the MySQL database
print "Pulling manager-subordinate relationship information from the MySQL database."
cursor = UCBdb.cursor()
cursor.execute("select * from sub_mgr_pairs_in_collection;")
results = cursor.fetchall()
# for each relationship pair
print "Adding manager-subordinate relationships to the redis database."
for rpair in results:
# create the relationship in the redis database
relDict = { 'type' : 'directly reported to', 'target' : rpair[1], 'evidence_type' : 'interval',
'start_time' : '2000-01-01 00:00:00 +00:00', 'end_time' : '2001-11-31 23:59:59 +00:00',
'provenance' : 'http://tinyurl.com/cfsooc' }
rdb.hset('social_relationships',rpair[0],marshal.dumps([relDict]))
# generate three hash tables:
# - one mapping (sender email address, recipient email address) tuples to lists of
# (message id, epoch secs) tuples
# - one mapping sender email addresses to sets of (recipient email address,
# (min epoch secs, max epoch secs)) tuples
# - one mapping recipient email addresses to sets of (sender email address,
# (min epoch secs, max epoch secs)) tuples
i = 0
for msgID in rdb.hkeys('messages'):
# fetch message dictionary from redis
mDictStr = rdb.hget('messages',msgID)
mDict = marshal.loads(mDictStr)
# aggregate all of the recipient addresses into a set to avoid duplicates. some email
# addresses are added to more than one recipient field (to/cc/bcc).
recips = set()
fields = ['to','cc','bcc']
for field in fields:
for recip in mDict[field]:
recips.add(recip)
for recip in recips:
# if the sender and recipient email addresses are the same, skip this relationship
if mDict['sender'] == recip:
continue
# relationship
rel = (mDict['sender'],recip)
# grab existing relationship information if it exists
if rdb.exists('mids_per_directed_comm_relationship'):
if rdb.hexists('mids_per_directed_comm_relationship',str(rel)):
# get the list of (message id, epoch secs) tuples
tList = marshal.loads(rdb.hget('mids_per_directed_comm_relationship',str(rel)))
else:
tList = []
else:
tList = []
# push (message id, epoch secs) tuple into list such that tuples are in ascending order
# relative to epoch secs
k = 0
numTups = len(tList)
while k < numTups and tList[k][1] < mDict['epoch_secs']:
k += 1
tList.insert(k,(mDict['message_id'],mDict['epoch_secs']))
# save the relationship list
rdb.hset('mids_per_directed_comm_relationship',str(rel),marshal.dumps(tList))
# grab the set of recipients for the sender email address if it exists
if rdb.hexists('recipients_per_sender_address',mDict['sender']):
# get the set of recipients
rSet = marshal.loads(rdb.hget('recipients_per_sender_address',mDict['sender']))
else:
rSet = set()
# check to see if there is an element for the recipient
el = next((tup for tup in rSet if tup[0] == recip),None)
# if no element exists
if el == None:
# add the recipient to the set
rSet.add((recip,(mDict['epoch_secs'],mDict['epoch_secs'])))
# otherwise, check to see if modifications to the relationship time interval are
# needed
else:
# remove the original element from the set
rSet.remove(el)
# modify the element as needed
minTime = el[1][0]
maxTime = el[1][1]
msgTime = mDict['epoch_secs']
if msgTime < minTime:
el = (el[0],(msgTime,maxTime))
if msgTime > maxTime:
el = (el[0],(minTime,msgTime))
# add the new element to the set
rSet.add(el)
# push the set back into redis
rdb.hset('recipients_per_sender_address',mDict['sender'],marshal.dumps(rSet))
# grab the set of senders for the recipient email address if it exists
if rdb.hexists('senders_per_recipient_address',recip):
# get the set of senders
sSet = marshal.loads(rdb.hget('senders_per_recipient_address',recip))
else:
sSet = set()
# check to see if there is an element for the sender
el = next((tup for tup in sSet if tup[0] == mDict['sender']),None)
# if no element exists
if el == None:
# add the recipient to the set
sSet.add((mDict['sender'],(mDict['epoch_secs'],mDict['epoch_secs'])))
# otherwise, check to see if modifications to the relationship time interval are
# needed
else:
# remove the original element from the set
sSet.remove(el)
# modify the element as needed
minTime = el[1][0]
maxTime = el[1][1]
msgTime = mDict['epoch_secs']
if msgTime < minTime:
el = (el[0],(msgTime,maxTime))
if msgTime > maxTime:
el = (el[0],(minTime,msgTime))
# add the new element to the set
sSet.add(el)
# push the set back into redis
rdb.hset('senders_per_recipient_address',recip,marshal.dumps(sSet))
i += 1
if i % 500 == 0:
print "%d messages processed." % i