Skip to content

Commit

Permalink
Working on getting writing the avro file on ec2, reading it in pig an…
Browse files Browse the repository at this point in the history
…d getting it into elasticsearch to work - at scale
  • Loading branch information
rjurney committed Mar 25, 2012
1 parent 5e9dce8 commit 7ac47b1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
25 changes: 25 additions & 0 deletions src/pig/search_emails.pig
@@ -0,0 +1,25 @@
/* Load ElasticSearch integration */
register '/me/wonderdog/target/wonderdog-1.0-SNAPSHOT.jar';
register '/me/elasticsearch-0.18.6/lib/elasticsearch-0.18.6.jar';
register '/me/elasticsearch-0.18.6/lib/jline-0.9.94.jar';
register '/me/elasticsearch-0.18.6/lib/jna-3.2.7.jar';
register '/me/elasticsearch-0.18.6/lib/log4j-1.2.16.jar';
register '/me/elasticsearch-0.18.6/lib/lucene-analyzers-3.5.0.jar';
register '/me/elasticsearch-0.18.6/lib/lucene-core-3.5.0.jar';
register '/me/elasticsearch-0.18.6/lib/lucene-highlighter-3.5.0.jar';
register '/me/elasticsearch-0.18.6/lib/lucene-memory-3.5.0.jar';
register '/me/elasticsearch-0.18.6/lib/lucene-queries-3.5.0.jar';

/* Load Avro jars */
register /me/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/pig/contrib/piggybank/java/piggybank.jar
register /me/pig/build/ivy/lib/Pig/jackson-core-asl-1.7.3.jar
register /me/pig/build/ivy/lib/Pig/jackson-mapper-asl-1.7.3.jar
register /me/pig/build/ivy/lib/Pig/joda-time-1.6.jar

define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
define ElasticSearch com.infochimps.elasticsearch.pig.ElasticSearchStorage();

emails = load '/tmp/python3' using AvroStorage();
store emails into 'es://email/email?json=false&size=1000' using ElasticSearch('/me/elasticsearch-0.18.6/config/elasticsearch.yml', '/me/elasticsearch-0.18.6/plugins');
26 changes: 13 additions & 13 deletions src/pig/wonderdog.pig
Expand Up @@ -11,26 +11,26 @@ register '/me/elasticsearch-0.18.6/lib/lucene-memory-3.5.0.jar';
register '/me/elasticsearch-0.18.6/lib/lucene-queries-3.5.0.jar';

/* Load Avro jars */
register /me/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/pig/contrib/piggybank/java/piggybank.jar
register /me/pig/build/ivy/lib/Pig/jackson-core-asl-1.7.3.jar
register /me/pig/build/ivy/lib/Pig/jackson-mapper-asl-1.7.3.jar
register /me/pig/build/ivy/lib/Pig/joda-time-1.6.jar
register /me/newpig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/newpig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/newpig/contrib/piggybank/java/piggybank.jar
register /me/newpig/build/ivy/lib/Pig/jackson-core-asl-1.7.3.jar
register /me/newpig/build/ivy/lib/Pig/jackson-mapper-asl-1.7.3.jar
register /me/newpig/build/ivy/lib/Pig/joda-time-1.6.jar

define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
define ElasticSearch com.infochimps.elasticsearch.pig.ElasticSearchStorage();
-- define ElasticSearch com.infochimps.elasticsearch.pig.ElasticSearchStorage();

messages = load '/tmp/python' using AvroStorage();
messages = load '/me/tmp/emails.avro' using AvroStorage();
messages = FILTER messages BY (from IS NOT NULL) AND (to IS NOT NULL);
smaller = FOREACH messages GENERATE FLATTEN(from) as from, FLATTEN(to) as to;
pairs = FOREACH smaller GENERATE LOWER(from) AS from, LOWER(to) AS to;

froms = GROUP pairs BY (from, to) PARALLEL 10;
STORE froms INTO 'es://by_froms/by_froms?json=false&size=1000' USING
ElasticSearch('/me/Downloads/elasticsearch-0.18.6/config/elasticsearch.yml', '/me/Downloads/elasticsearch-0.18.6/plugins');
sent_counts = FOREACH froms GENERATE FLATTEN(group) AS (from, to), COUNT(pairs) AS total;

-- sent_counts = FOREACH froms GENERATE FLATTEN(group) AS (from, to), COUNT(pairs) AS total;
STORE sent_counts INTO '/tmp/sent_counts';

-- STORE sent_counts INTO 'es://sent_counts/sent_counts?json=false&size=1000' USING
-- ElasticSearch('/me/Downloads/elasticsearch-0.18.6/config/elasticsearch.yml', '/me/Downloads/elasticsearch-0.18.6/plugins');
/*STORE sent_counts INTO 'es://sent_counts/sent_counts?json=false&size=1000' USING
com.infochimps.elasticsearch.pig.ElasticSearchStorage('/me/elasticsearch-0.18.6/config/elasticsearch.yml', '/me/elasticsearch-0.18.6/plugins');
*/
24 changes: 12 additions & 12 deletions src/python/gmail.py
Expand Up @@ -85,6 +85,10 @@ def parse_date(date_string):
return iso_time

def process_email(msg):

subject = msg['Subject']
body = get_body(msg)

avro_parts = {
'message_id': msg['Message-ID'],
'from': parse_addrs(msg['From']),
Expand All @@ -93,9 +97,9 @@ def process_email(msg):
'bcc': parse_addrs(msg['Bcc']),
'reply_to': parse_addrs(msg['Reply-To']),
'in_reply_to': parse_addrs(msg['In-Reply-To']),
'subject': msg['Subject'],
'subject': subject,
'date': parse_date(msg['Date']),
'body': get_body(msg)
'body': body
}
return avro_parts

Expand All @@ -104,7 +108,7 @@ def get_body(msg):
if msg:
for part in msg.walk():
if part.get_content_type() == 'text/plain':
body += unicode(part.get_payload().encode('utf-8'), errors='ignore')
body += unicode(part.get_payload().encode('utf-8'), errors='replace')
return body

class TimeoutException(Exception):
Expand Down Expand Up @@ -134,9 +138,10 @@ class TimeoutException(Exception):
status, email_hash = fetch_email(imap, str(id))
if(status == 'OK'):
try:
avro_writer.append(email_hash)
except UnicodeDecodeError:
sys.stderr.write('AVRO')
avro_writer.append(email_hash)
except UnicodeDecodeError, e:
sys.stderr.write('AVRO APPEND PROBLEM with id [' + str(id) + "]\n")
exit()
if email_hash['subject']:
print str(id) + ": " + email_hash['subject']
else:
Expand All @@ -146,16 +151,11 @@ class TimeoutException(Exception):
continue
elif (status == 'ABORT' or status == 'TIMEOUT'):
sys.stderr.write("resetting imap for " + status + "\n")
try:
imap.close()
imap.logout()
except:
pass
imap, count = init_imap(username, password, imap_folder)
system.stderr.write("IMAP RESET\n")
else:
continue

avro_writer.close()
imap.close()
imap.logout()

0 comments on commit 7ac47b1

Please sign in to comment.