Permalink
Browse files

first commit, dont crucify me, not ready yet

  • Loading branch information...
0 parents commit 6de88b42ed7a95ed83e037506dd71f9291aa4bd6 @thedatachef committed Apr 25, 2011
45 .gitignore
@@ -0,0 +1,45 @@
+## OS
+.DS_Store
+Icon?
+nohup.out
+.bak
+
+## EDITORS
+\#*
+.\#*
+*~
+*.swp
+REVISION
+TAGS*
+tmtags
+*_flymake.*
+*_flymake
+*.tmproj
+.project
+.settings
+
+## COMPILED
+a.out
+*.o
+*.pyc
+*.so
+
+## OTHER SCM
+.bzr
+.hg
+.svn
+
+## PROJECT::GENERAL
+coverage
+rdoc
+doc
+pkg
+.yardoc
+*private*
+
+## PROJECT::SPECIFIC
+
+*.rdb
+target
+*.RData
+*.Rhistory
19 README.textile
@@ -0,0 +1,19 @@
+h1. Varaha
+
+A set of Apache Pig scripts and UDFs (User Defined Functions) for machine learning and natural language processing. Why should Mahout have all the fun?
+
+h2. Build
+
+You'll want to build the UDFs before doing anything else. To do that simply do:
+
+<pre><code>
+mvn clean package
+</code></pre>
+
+h2. The rest
+
+See individual readme files under the scripts directory for how to run.
+
+h2. Why is it called Varaha?
+
+Evidently, Varaha is an avatar of the Hindu god Vishnu, in the form of a Boar.
51 pom.xml
@@ -0,0 +1,51 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>varaha</groupId>
+ <artifactId>varaha</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>varaha</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pig</artifactId>
+ <version>0.8.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.5</source>
+ <target>1.5</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
51 scripts/cluster_documents.pig
@@ -0,0 +1,51 @@
+register 'target/varaha-1.0-SNAPSHOT.jar';
+
+vectors = LOAD '$TFIDF-vectors' AS (doc_id:chararray, vector:bag {t:tuple (token:chararray, weight:double)});
+
+--
+-- Choose K random centers. This is kind of a hacky process. Since we can't really use
+-- parameters for the sampler we have to precompute S. Here S=(K+5)/NDOCS. This way we're
+-- guaranteed to get greater than (but not too much so) K vectors. Then we limit it to K.
+--
+-- sampled = SAMPLE vectors $S;
+-- k_centers = LIMIT sampled $K;
+--
+-- STORE k_centers INTO '$TFIDF-centers-0';
+
+-- k_centers = LOAD '$TFIDF-centers-0' AS (doc_id:chararray, vector:bag {t:tuple (token:chararray, weight:double)});
+-- with_centers = CROSS k_centers, vectors;
+-- similarities = FOREACH with_centers GENERATE
+-- k_centers::doc_id AS center_id,
+-- k_centers::vector AS center,
+-- vectors::doc_id AS doc_id,
+-- vectors::vector AS vector,
+-- varaha.text.TermVectorSimilarity(k_centers::vector, vectors::vector) AS similarity;
+
+-- STORE similarities INTO '$TFIDF-similarities-0';
+-- similarities = LOAD '$TFIDF-similarities-0' AS (
+-- center_id:chararray,
+-- center:bag {t:tuple (token:chararray, weight:double)},
+-- doc_id:chararray,
+-- vector:bag {t:tuple (token:chararray, weight:double)},
+-- similarity:double
+-- );
+--
+-- finding_nearest = GROUP similarities BY doc_id;
+-- only_nearest = FOREACH finding_nearest {
+-- nearest_center = TOP(1, 4, similarities);
+-- GENERATE
+-- FLATTEN(nearest_center) AS (center_id, center, doc_id, vector, similarity)
+-- ;
+-- };
+-- cut_nearest = FOREACH only_nearest GENERATE center_id, vector;
+-- clusters = GROUP cut_nearest BY center_id; -- this gets worse as K/NDOCS gets smaller
+--
+-- cut_clusters = FOREACH clusters GENERATE group AS center_id, cut_nearest.vector AS vector_collection;
+-- STORE cut_clusters INTO '$TFIDF-clusters-0';
+
+clusters = LOAD '$TFIDF-clusters-0' AS (center_id:chararray, vectors:bag {t:tuple (vector:bag {s:tuple (token:chararray, weight:double)})});
+centroids = FOREACH clusters GENERATE
+ center_id,
+ varaha.text.TermVectorCentroid(vectors) -- implement this
+ ;
+STORE centroids INTO '$TFIDF-centroids-0';
16 scripts/sample_k_centers.pig
@@ -0,0 +1,16 @@
+--
+-- Load the vectors from the tfidf process.
+--
+vectors = LOAD '$TFIDF-vectors' AS (doc_id:chararray, vector:bag {t:tuple (token:chararray, weight:double)});
+
+--
+-- Choose K random centers. This is kind of a hacky process. Since we can't really use
+-- parameters for the sampler we have to precompute S. Here a good heuristic for choosing S
+-- is S=(K+10)/NDOCS where NDOCS is the number of documents in the input corpus. This way
+-- we're "guaranteed" to get greater than (but not too much so) K vectors. Then we limit
+-- it to K.
+
+sampled = SAMPLE vectors $S;
+k_centers = LIMIT sampled $K;
+
+STORE k_centers INTO '$TFIDF-centers-0';
65 scripts/tfidf.pig
@@ -0,0 +1,65 @@
+--
+-- Define a wukong tokenization script for breaking the raw text data into tokens
+--
+DEFINE tokenize_docs `ruby tokenize_documents.rb --id_field=0 --text_field=1 --map` SHIP('tokenize_documents.rb');
+
+--
+-- Load and tokenize the raw documents
+--
+raw_documents = LOAD '$DOCS' AS (doc_id:chararray, text:chararray);
+tokenized = STREAM raw_documents THROUGH tokenize_docs AS (doc_id:chararray, token:chararray);
+
+--
+-- Count the number of times each (doc_id,token) pair occurs. (term counts)
+--
+doc_tokens = GROUP tokenized BY (doc_id, token);
+doc_token_counts = FOREACH doc_tokens GENERATE FLATTEN(group) AS (doc_id, token), COUNT(tokenized) AS num_doc_tok_usages;
+
+--
+-- Attach the document size to each record
+--
+doc_usage_bag = GROUP doc_token_counts BY doc_id;
+doc_usage_bag_fg = FOREACH doc_usage_bag GENERATE
+ group AS doc_id,
+ FLATTEN(doc_token_counts.(token, num_doc_tok_usages)) AS (token, num_doc_tok_usages),
+ SUM(doc_token_counts.num_doc_tok_usages) AS doc_size
+ ;
+
+--
+-- Next, generate the term frequencies
+--
+term_freqs = FOREACH doc_usage_bag_fg GENERATE
+ doc_id AS doc_id,
+ token AS token,
+ ((double)num_doc_tok_usages / (double)doc_size) AS term_freq;
+ ;
+
+--
+-- Then, find the number of documents that contain at least one occurrence of term
+--
+term_usage_bag = GROUP term_freqs BY token;
+token_usages = FOREACH term_usage_bag GENERATE
+ FLATTEN(term_freqs) AS (doc_id, token, term_freq),
+ COUNT(term_freqs) AS num_docs_with_token
+ ;
+
+--
+-- Generate the tf-idf for each (doc_id, token, pair)
+--
+tfidf_all = FOREACH token_usages {
+ idf = LOG((double)$NDOCS/(double)num_docs_with_token);
+ tf_idf = (double)term_freq*idf;
+ GENERATE
+ doc_id AS doc_id,
+ token AS token,
+ tf_idf AS tf_idf
+ ;
+ };
+
+--
+-- Finally generate term vectors for later processing
+--
+grouped = GROUP tfidf_all BY doc_id;
+vectors = FOREACH grouped GENERATE group AS doc_id, tfidf_all.(token, weight) AS vector;
+
+STORE vectors INTO '$OUT';
114 scripts/tokenize_documents.rb
@@ -0,0 +1,114 @@
+#!/usr/bin/env ruby
+# -*- coding: utf-8 -*-
+
+require 'rubygems'
+require 'wukong'
+require 'wukong/encoding'
+require 'configliere' ; Configliere.use(:commandline, :env_var, :define)
+require 'set'
+
+Settings.define :id_field, :type => Integer, :default => 0, :required => true, :description => "What field to use as the document id. (-1) to assign ids"
+Settings.define :text_field, :type => Integer, :default => 1, :required => true, :description => "Which field is the text field?"
+Settings.resolve!
+
+STOPWORDS = %w[
+a about above across after again against all almost alone along already also
+although always among an and another any anybody anyone anything anywhere apos
+are area areas around as ask asked asking asks at away
+
+back backed backing backs be became because become becomes been before began
+behind being beings best better between big both but by
+
+came can cannot case cases certain certainly clear clearly come could
+
+did differ different differently do does done down down downed downing downs
+during
+
+each early either end ended ending ends enough even evenly ever every everybody
+everyone everything everywhere
+
+face faces fact facts far felt few find finds first for four from full fully
+further furthered furthering furthers
+
+gave general generally get gets give given gives go going good goods got great
+greater greatest group grouped grouping groups
+
+had has have having he her here herself high high high higher highest him
+himself his how however i if important in interest interested interesting
+interests into is it its it's itself
+
+just
+
+keep keeps kind knew know known knows
+
+large largely last later latest least less let lets like likely long longer
+longest
+
+made make making man many may me member members men might more most mostly mr
+mrs much must my myself
+
+nbsp necessary need needed needing needs never new new newer newest next no
+nobody non noone not nothing now nowhere number numbers
+
+of off often old older oldest on once one only open opened opening opens or
+order ordered ordering orders other others our out over
+
+part parted parting parts per perhaps place places point pointed pointing points
+possible present presented presenting presents problem problems put puts
+
+quite quot
+
+rather really right right room rooms
+
+said same saw say says second seconds see seem seemed seeming seems sees several
+shall she should show showed showing shows side sides since small smaller
+smallest so some somebody someone something somewhere state states still still
+such sure
+
+take taken than that the their them then there therefore these they thing things
+think thinks this those though thought thoughts three through thus to today
+together too took toward turn turned turning turns two
+
+under until up upon us use used uses
+
+very
+
+want wanted wanting wants was way ways we well wells went were what when where
+whether which while who whole whose why will with within without work worked
+working works would
+
+year years yet you young younger youngest your yours
+].to_set
+
+class GeneralTextTokenizer < Wukong::Streamer::RecordStreamer
+ def tokenize text
+ return [] if text.blank?
+ text = text.gsub(%r{[^[:alpha:]\w\']+}, " ")
+ text.gsub!(%r{([[:alpha:]\w])\'([st])}, '\1!\2')
+ text.gsub!(%r{[\s\']}, " ")
+ text.gsub!(%r{!}, "'")
+ # words = text.strip.wukong_encode.split(/\s+/)
+ words = text.strip.split(/\s+/)
+ words.reject!{|w| w.blank? || (w.length < 3) }
+ words
+ end
+
+ def tokenize_text_chunk text_chunk
+ return [] if text_chunk.blank?
+ text_chunk = text_chunk.wukong_decode.downcase
+ tokenize(text_chunk.strip)
+ end
+
+ def process *args
+ tokenize_text_chunk(args[Settings.text_field]).each do |token|
+ yield [document_id(args), token] unless STOPWORDS.include?(token)
+ end
+ end
+
+ def document_id fields
+ fields[Settings.id_field] unless Settings.id_field == -1
+ end
+
+end
+
+Wukong::Script.new(GeneralTextTokenizer, nil).run
84 src/main/java/varaha/text/TermVector.java
@@ -0,0 +1,84 @@
+package varaha.text;
+
+import java.util.Iterator;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
+
+/**
+ * TermVector is a wrapper around a Pig DataBag
+ */
+public class TermVector implements Iterable<Tuple>{
+
+ private static DataBag vector;
+ private static Double norm;
+
+ public TermVector() {
+ this(BagFactory.getInstance().newDefaultBag());
+ }
+
+ public TermVector(DataBag vector) {
+ this.vector = vector;
+ }
+
+ public Iterator<Tuple> iterator() {
+ return vector.iterator();
+ }
+
+ public DataBag toDataBag() {
+ return vector;
+ }
+
+ /**
+ Computes the cosine similarity between this and another term vector.
+
+ @param other: Another TermVector
+
+ @return the cosine similarity between the this and the other term vector
+ */
+ public Double cosineSimilarity(TermVector other) throws ExecException {
+ return dotProduct(other)/(norm()*other.norm());
+ }
+
+ /**
+ Returns the scalar inner product of this and the other term vector by
+ multiplying each entry for the same term.
+ <p>
+ There are undoubtedly ways to optimize this. Please, enlighten me.
+
+ @param other: Another term vector
+
+ @return the dot product
+ */
+ public Double dotProduct(TermVector other) throws ExecException {
+ Double result = 0.0;
+ for (Tuple x_i : this) {
+ for (Tuple y_i : other) {
+ if (x_i.get(0).toString().equals(y_i.get(0).toString())) {
+ result += (Double)x_i.get(1)*(Double)y_i.get(1);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ Computes the norm of this vector.
+
+ @return the norm of this vector
+ */
+ public Double norm() throws ExecException {
+ if (norm != null) {
+ return norm;
+ } else {
+ Double result = 0.0;
+ for (Tuple x_i : vector) {
+ result += (Double)x_i.get(1)*(Double)x_i.get(1);
+ }
+ this.norm = Math.sqrt(result);
+ return norm;
+ }
+ }
+}
74 src/main/java/varaha/text/TermVectorCentroid.java
@@ -0,0 +1,74 @@
+package varaha.text;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
+
+/**
+ * TermVectorCentroid calculates the centroid of term vectors.
+ *
+ * <dt><b>Example:</b></dt>
+ * <dd><code>
+ * register varaha.jar;<br/>
+ * clusters = LOAD 'clusters' AS (center_id:chararray, vectors:bag {t:tuple (vector:bag {s:tuple (token:chararray, weight:double)})});
+ * centroids = FOREACH clusters GENERATE
+ * center_id AS center_id,
+ * varaha.text.TermVectorCentroid(vectors) AS centroid:bag {t:tuple (token:chararray, weight:double)}
+ * ;
+ * </code></dd>
+ * </dl>
+ *
+ * @see
+ * @author Jacob Perkins
+ *
+ */
+public class TermVectorCentroid extends EvalFunc<DataBag> {
+
+ private static TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ public DataBag exec(Tuple input) throws IOException {
+ if (input == null || input.size() < 1 || input.isNull(0))
+ return null;
+
+ DataBag bagOfVectors = (DataBag)input.get(0);
+ DataBag centroid = BagFactory.getInstance().newDefaultBag();
+ HashMap termSums = new HashMap<String, Double>();
+
+ //
+ // Add each unique term to a hashmap and sum the entries
+ //
+ for (Tuple t : bagOfVectors) {
+ DataBag v = (DataBag)t.get(0);
+ for (Tuple v_i : v) {
+ String term = v_i.get(0).toString();
+ Object currentValue = termSums.get(term);
+ if (currentValue == null) {
+ termSums.put(term, v_i.get(1));
+ } else {
+ termSums.put(term, (Double)v_i.get(1) + (Double)currentValue);
+ }
+ }
+ }
+
+ //
+ // Go back through the hashmap and make the values averages
+ //
+ Iterator mapIterator = termSums.entrySet().iterator();
+ while (mapIterator.hasNext()) {
+ Map.Entry pair = (Map.Entry)mapIterator.next();
+ Tuple termWeightPair = tupleFactory.newTuple(2);
+ termWeightPair.set(0, pair.getKey());
+ termWeightPair.set(1, (Double)pair.getValue()/bagOfVectors.size());
+ centroid.add(termWeightPair);
+ }
+ return centroid;
+ }
+
+}
39 src/main/java/varaha/text/TermVectorSimilarity.java
@@ -0,0 +1,39 @@
+package varaha.text;
+
+import java.io.IOException;
+
+import org.apache.pig.builtin.Base;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+
+/**
+ * TermVectorSimilarity calculates the cosine similarity between two
+ * term vectors represented as Pig bags.
+ *
+ * <dt><b>Example:</b></dt>
+ * <dd><code>
+ * register varaha.jar;<br/>
+ * vectors = LOAD 'vectors' AS (doc_id:chararray, vector:bag {t:tuple (token:chararray, weight:double)});<br/>
+ * k_centers = LOAD 'k_centers' AS (doc_id:chararray, vector:bag {t:tuple (token:chararray, weight:double)});<br/>
+ * with_centers = CROSS k_centers, vectors;
+ * similarities = FOREACH with_centers GENERATE varaha.text.TermVectorSimilarity(k_centers::vector, vectors::vector) AS (sim:double);<br/>
+ * </code></dd>
+ * </dl>
+ *
+ * @see
+ * @author Jacob Perkins
+ *
+ */
+public class TermVectorSimilarity extends Base {
+
+ public Double exec(Tuple input) throws IOException {
+ if (input == null || input.size() < 2 || input.isNull(0) || input.isNull(1))
+ return null;
+
+ TermVector t1 = new TermVector((DataBag)input.get(0));
+ TermVector t2 = new TermVector((DataBag)input.get(1));
+
+ return t1.cosineSimilarity(t2);
+ }
+
+}

0 comments on commit 6de88b4

Please sign in to comment.