Permalink
Browse files

Macro is ready

  • Loading branch information...
1 parent 532f306 commit 9cc018700d4da68ee47c599cb14f1cbe89c9ca9c @rjurney committed Oct 1, 2012
Showing with 74 additions and 0 deletions.
  1. +18 −0 tf_idf_macro_test.pig
  2. +18 −0 tf_idf_macro_test.pig.expanded
  3. +38 −0 tfidf.macro
View
@@ -0,0 +1,18 @@
+/* AvroStorage */
+register /me/Software/pig/build/ivy/lib/Pig/avro-1.5.3.jar
+register /me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar
+register /me/Software/pig/contrib/piggybank/java/piggybank.jar
+
+define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
+
+import 'tfidf.macro';
+
+emails = load '/me/Data/enron.avro' using AvroStorage();
+emails = filter emails by message_id is not null;
+/* Limit to 100 documents for local mode, or go bake a cake in the meanwhile */
+emails = limit emails 100;
+id_body = foreach emails generate message_id, body;
+
+rmf /tmp/macro_test
+my_tf_idf_scores = tf_idf(id_body, 'message_id', 'body');
+store my_tf_idf_scores into '/tmp/macro_test';
@@ -0,0 +1,18 @@
+register '/me/Software/pig/build/ivy/lib/Pig/avro-1.5.3.jar';
+register '/me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar';
+register '/me/Software/pig/contrib/piggybank/java/piggybank.jar';
+define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
+emails = load '/me/Data/enron.avro' USING AvroStorage();
+emails = filter emails BY (message_id IS not null);
+emails = limit emails 100;
+id_body = foreach emails generate message_id, body;
+rmf /tmp/macro_test
+macro_tf_idf_token_records_0 = foreach id_body generate message_id, FLATTEN(TOKENIZE(body)) AS tokens;
+macro_tf_idf_doc_word_totals_0 = foreach (group macro_tf_idf_token_records_0 by (message_id, tokens)) generate flatten(group) AS (message_id, token), COUNT_STAR(macro_tf_idf_token_records_0) AS doc_total;
+macro_tf_idf_pre_term_counts_0 = foreach (group macro_tf_idf_doc_word_totals_0 by (message_id)) generate group AS message_id, FLATTEN(macro_tf_idf_doc_word_totals_0.(token, doc_total)) AS (token, doc_total), SUM(macro_tf_idf_doc_word_totals_0.(doc_total)) AS doc_size;
+macro_tf_idf_term_freqs_0 = foreach macro_tf_idf_pre_term_counts_0 generate message_id AS message_id, token AS token, ((double)doc_total / (double)doc_size) AS term_freq;
+macro_tf_idf_token_usages_0 = foreach (group macro_tf_idf_term_freqs_0 by (token)) generate FLATTEN(macro_tf_idf_term_freqs_0) AS (message_id, token, term_freq), COUNT_STAR(macro_tf_idf_term_freqs_0) AS num_docs_with_token;
+macro_tf_idf_just_ids_0 = foreach id_body generate message_id;
+macro_tf_idf_ndocs_0 = foreach (group macro_tf_idf_just_ids_0 all) generate COUNT_STAR(macro_tf_idf_just_ids_0) AS total_docs;
+my_tf_idf_scores = foreach macro_tf_idf_token_usages_0 { idf = LOG((double)macro_tf_idf_ndocs_0.(total_docs) / (double)num_docs_with_token); tf_idf = (double)term_freq * idf; generate message_id AS message_id, token AS score, (chararray)tf_idf AS value:chararray; } ;
+store my_tf_idf_scores INTO '/tmp/macro_test';
View
@@ -0,0 +1,38 @@
+DEFINE tf_idf(in_relation, id_field, text_field) RETURNS out_relation {
+
+ token_records = foreach $in_relation generate $id_field, FLATTEN(TOKENIZE($text_field)) as tokens;
+
+ /* Calculate the term count per document */
+ doc_word_totals = foreach (group token_records by ($id_field, tokens)) generate
+ flatten(group) as ($id_field, token),
+ COUNT_STAR(token_records) as doc_total;
+
+ /* Calculate the document size */
+ pre_term_counts = foreach (group doc_word_totals by $id_field) generate
+ group AS $id_field,
+ FLATTEN(doc_word_totals.(token, doc_total)) as (token, doc_total),
+ SUM(doc_word_totals.doc_total) as doc_size;
+
+ /* Calculate the TF */
+ term_freqs = foreach pre_term_counts generate $id_field as $id_field,
+ token as token,
+ ((double)doc_total / (double)doc_size) AS term_freq;
+
+ /* Get count of documents using each token, for idf */
+ token_usages = foreach (group term_freqs by token) generate
+ FLATTEN(term_freqs) as ($id_field, token, term_freq),
+ COUNT_STAR(term_freqs) as num_docs_with_token;
+
+ /* Get document count */
+ just_ids = foreach $in_relation generate $id_field;
+ ndocs = foreach (group just_ids all) generate COUNT_STAR(just_ids) as total_docs;
+
+ /* Note the use of Pig Scalars to calculate idf */
+ $out_relation = foreach token_usages {
+ idf = LOG((double)ndocs.total_docs/(double)num_docs_with_token);
+ tf_idf = (double)term_freq * idf;
+ generate $id_field as $id_field,
+ token as score,
+ (chararray)tf_idf as value:chararray;
+ };
+};

0 comments on commit 9cc0187

Please sign in to comment.