Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

major fixes all around, turns out everything is parallelizable and th…

…eres no need for UDFS to compute similarities or centroids
  • Loading branch information...
commit 17f5ce8457b7b0370159948d1a77d56d3fc0753a 1 parent fa9ad21
@thedatachef authored
View
78 scripts/document_clustering/cluster_documents.pig
@@ -19,62 +19,88 @@
register '../../target/varaha-1.0-SNAPSHOT.jar';
register '../../lib/lucene-core-3.1.0.jar';
-vectors = LOAD '$TFIDF' AS (doc_id:chararray, vector:bag {t:tuple (token:chararray, weight:double)});
-k_centers = LOAD '$CURR_CENTERS' AS (doc_id:chararray, vector:bag {t:tuple (token:chararray, weight:double)});
-
-
---
--- Compute the similarity between all document vectors and each of the K centers
---
+vectors = LOAD '$TFIDF' AS (doc_id:chararray, norm_sq:double, vector:bag {t:tuple (token:chararray, weight:double)});
+k_centers = LOAD '$CURR_CENTERS' AS (doc_id:chararray, norm_sq:double, vector:bag {t:tuple (token:chararray, weight:double)});
--
--- FIXME: this can be optimized for K large, cross is dangerous
+-- Generate similarities
--
-with_centers = CROSS k_centers, vectors;
-similarities = FOREACH with_centers GENERATE
- k_centers::doc_id AS center_id,
- k_centers::vector AS center,
- vectors::doc_id AS doc_id,
- vectors::vector AS vector,
- varaha.text.TermVectorSimilarity(k_centers::vector, vectors::vector) AS similarity
+vectors_flat = FOREACH vectors GENERATE doc_id, norm_sq, FLATTEN(vector) AS (token, weight);
+centers_flat = FOREACH k_centers GENERATE doc_id, norm_sq, FLATTEN(vector) AS (token, weight);
+common_token = JOIN centers_flat BY token, vectors_flat BY token;
+intersection = FOREACH common_token GENERATE
+ centers_flat::doc_id AS center_id,
+ centers_flat::norm_sq AS center_norm_sq,
+ vectors_flat::doc_id AS doc_id,
+ vectors_flat::norm_sq AS doc_norm_sq,
+ vectors_flat::token AS token,
+ centers_flat::weight*vectors_flat::weight AS product
;
+grouped_pairs = GROUP intersection BY (center_id, doc_id);
+similarities = FOREACH grouped_pairs {
+ divisor_sq = MAX(intersection.center_norm_sq)*MAX(intersection.doc_norm_sq);
+ similarity = ((double)SUM(intersection.product))/SQRT(divisor_sq);
+ GENERATE
+ FLATTEN(group) AS (center_id, doc_id),
+ similarity AS similarity
+ ;
+ };
+
--
--- Foreach vector, find the nearest center
+-- Get the nearest center associated with each document and reattach the vectors
--
-finding_nearest = GROUP similarities BY doc_id;
+finding_nearest = COGROUP similarities BY doc_id INNER, vectors_flat BY doc_id INNER;
only_nearest = FOREACH finding_nearest {
- nearest_center = TOP(1, 4, similarities);
+ --
+ -- FIXME: Ocassionally, TOP throws an NPE
+ -- see: http://issues.apache.org/jira/browse/PIG-2031
+ --
+ ordrd_centers = ORDER similarities BY similarity DESC;
+ nearest_center = LIMIT ordrd_centers 1;
GENERATE
- FLATTEN(nearest_center) AS (center_id, center, doc_id, vector, similarity)
+ FLATTEN(nearest_center) AS (center_id, doc_id, similarity),
+ vectors_flat.(token, weight) AS vector
;
};
--
--- Get the number of vectors associated with the center
+-- Count the number of documents associated with each center
--
center_grpd = GROUP only_nearest BY center_id;
-center_cnts = FOREACH center_grpd GENERATE FLATTEN(only_nearest) AS (center_id, center, doc_id, vector, similarity), COUNT(only_nearest) AS num_vectors;
+center_cnts = FOREACH center_grpd GENERATE FLATTEN(only_nearest.(center_id, vector)) AS (center_id, vector), COUNT(only_nearest) AS num_vectors;
--
--- Calculate the centroids in a distributed fashion
+-- Get new counts
--
cut_nearest = FOREACH center_cnts GENERATE center_id AS center_id, num_vectors AS num_vectors, FLATTEN(vector) AS (token:chararray, weight:double);
centroid_start = GROUP cut_nearest BY (center_id, token);
-weight_avgs = FOREACH centroid_start GENERATE FLATTEN(group) AS (center_id, token), (double)SUM(cut_nearest.weight)/(double)MAX(cut_nearest.num_vectors) AS weight_avg;
+weight_avgs = FOREACH centroid_start {
+ weight_avg = (double)SUM(cut_nearest.weight)/(double)MAX(cut_nearest.num_vectors);
+ weight_avg_sq = ((double)SUM(cut_nearest.weight)/(double)MAX(cut_nearest.num_vectors))*(double)SUM(cut_nearest.weight)/(double)MAX(cut_nearest.num_vectors);
+ GENERATE
+ FLATTEN(group) AS (center_id, token),
+ weight_avg AS tf_idf,
+ weight_avg_sq AS tf_idf_sq
+ ;
+ };
+
centroids_grp = GROUP weight_avgs BY center_id;
--
-- Need to keep from having humungous vectors
--
centroids = FOREACH centroids_grp {
- --
- -- FIXME: for some reason TOP($MAX_CENTER_SIZE, 2, weight_avgs) throws NPE
--
- ordrd = ORDER weight_avgs BY weight_avg DESC;
+ -- FIXME: Ocassionally, TOP throws an NPE
+ -- see: http://issues.apache.org/jira/browse/PIG-2031
+ --
+ ordrd = ORDER weight_avgs BY tf_idf DESC;
shortened_vector = LIMIT ordrd $MAX_CENTER_SIZE;
+ norm_sq = SUM(shortened_vector.tf_idf_sq);
GENERATE
group AS center_id,
+ norm_sq AS norm_sq,
shortened_vector.($1, $2) AS vector
;
};
View
2  scripts/document_clustering/sample_k_centers.pig
@@ -19,7 +19,7 @@
--
-- Load the vectors from the tfidf process.
--
-vectors = LOAD '$TFIDF' AS (doc_id:chararray, vector:bag {t:tuple (token:chararray, weight:double)});
+vectors = LOAD '$TFIDF' AS (doc_id:chararray, norm_sq:double, vector:bag {t:tuple (token:chararray, weight:double)});
--
-- Choose K random centers. This is kind of a hacky process. Since we can't really use
View
24 scripts/document_clustering/tfidf.pig
@@ -60,22 +60,32 @@ token_usages = FOREACH term_usage_bag GENERATE
;
--
--- Generate the tf-idf for each (doc_id, token, pair)
+-- Generate the tf-idf and tf-idf squared for each (doc_id, token, pair)
--
tfidf_all = FOREACH token_usages {
idf = LOG((double)$NDOCS/(double)num_docs_with_token);
tf_idf = (double)term_freq*idf;
- GENERATE
- doc_id AS doc_id,
- token AS token,
- tf_idf AS tf_idf
+ tf_idf_sq = (double)term_freq*idf*term_freq*idf;
+ GENERATE
+ doc_id AS doc_id,
+ token AS token,
+ tf_idf AS tf_idf,
+ tf_idf_sq AS tf_idf_sq
;
};
--
--- Finally generate term vectors for later processing
+-- Now create the term vectors and attach certain attributes to it we would not like to
+-- calculate again such as the size, the norm, and the norm squared.
--
grouped = GROUP tfidf_all BY doc_id;
-vectors = FOREACH grouped GENERATE group AS doc_id, tfidf_all.(token, tf_idf) AS vector;
+vectors = FOREACH grouped {
+ norm_sq = (double)SUM(tfidf_all.tf_idf_sq);
+ GENERATE
+ group AS id,
+ norm_sq AS norm_sq,
+ tfidf_all.(token, tf_idf) AS vector
+ ;
+ };
STORE vectors INTO '$TFIDF';
Please sign in to comment.
Something went wrong with that request. Please try again.