Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

linear regression via gradient descent with pig; zip udf for element …

…by element product of tuples (see ruby zip); ruby driver + sample script for illustrating gradient descent to fit a line
  • Loading branch information...
commit 4bd0bf98ccfbea80fb8261135f83d3f9f17664dc 1 parent bd18f5e
@thedatachef authored
View
10,001 scripts/analysis/linear_regression/data/data.tsv
10,001 additions, 0 deletions not shown
View
15 scripts/analysis/linear_regression/fit_line.pig
@@ -0,0 +1,15 @@
+import 'linear_model.pig';
+
+--
+-- Consider linear regression w/gradient descent for stupid simple line fitting case
+--
+weights = load '$weights' as (w0:double, w1:double);
+features = load 'data/data.tsv' as (x:double, y:double);
+
+weights = foreach weights generate TOTUPLE(w0,w1) as weights:tuple(w0:double, w1:double);
+features = foreach features generate y as response, TOTUPLE(1.0, x) as vector:tuple(f0:double, f1:double);
+
+new_weights = sounder_lm_gradient_descent(features, weights, 0.1);
+new_weights = foreach new_weights generate flatten($0);
+
+store new_weights into '$weights_new';
View
68 scripts/analysis/linear_regression/linear_model.pig
@@ -0,0 +1,68 @@
+register '../../../udf/target/sounder-1.0-SNAPSHOT.jar';
+
+define Zip sounder.pig.tuple.Zip();
+
+/*
+ * Run one iteration of gradient descent for the given weights and features
+ * with step size alpha. Returns the updated weights.
+ *
+ * features - Relation with the following schema:
+ * {response:double, vector:tuple(f0:double,f1:double,...,fN:double)}
+ *
+ * w - Relation with **exactly one tuple** with the following schema:
+ * {weights:tuple(w0:double,w1:double,...,wN:double)}
+ *
+ * alpha - Step size. Schema:
+ * alpha:double
+ */
+define sounder_lm_gradient_descent(features, w, alpha) returns new_weights {
+
+ --
+ -- Use a scalar cast to zip weights with their appropriate features
+ --
+ weighted = foreach $features {
+ zipped = Zip($w.weights, vector);
+ generate
+ response as response,
+ vector as vector,
+ zipped as zipped:bag{t:tuple(weight:double, feature:double, dimension:int)};
+ };
+
+ --
+ -- Compute dot product of weights with feature vectors. First part of
+ -- step adjustment.
+ --
+ dot_prod = foreach weighted {
+ dots = foreach zipped generate weight*feature as product;
+ dot_product = SUM(dots.product);
+ diff = (dot_product-response);
+
+ generate
+ flatten(zipped) as (weight, feature, dimension), diff as diff;
+ };
+
+ scaled = foreach dot_prod generate dimension, weight, feature*diff as feature_diff;
+
+ --
+ -- Compute step diff along each dimension. Uses combiners
+ --
+ steps = foreach (group scaled by (dimension,weight)) {
+ factor = ($alpha/(double)COUNT(scaled));
+ weight_step = factor*SUM(scaled.feature_diff);
+ new_weight = group.weight - weight_step;
+ generate
+ group.dimension as dimension,
+ new_weight as weight;
+ };
+
+ --
+ -- A group all is acceptable here since the previous step reduces the
+ -- size down to the number of features.
+ --
+ $new_weights = foreach (group steps all) {
+ in_order = order steps by dimension asc;
+ as_tuple = BagToTuple(in_order.weight);
+ generate
+ as_tuple;
+ };
+};
View
45 scripts/analysis/linear_regression/ruby_driver.rb
@@ -0,0 +1,45 @@
+#!/usr/bin/env ruby
+
+require 'rubygems'
+require 'fileutils'
+
+#
+# 1. initialize weights
+# 2. descend
+# 3. check convergence criteria
+# 4. repeat 2,3 until converged
+# 5. profit
+#
+
+def cleanup data_dir
+ Dir["#{data_dir}/weight-*"].each do |f|
+ FileUtils.rm_r(f)
+ end
+end
+
+#
+# Initialize the weights randomly
+#
+def initialize_weights data_dir
+ weights = [rand,rand]
+ f = File.open(File.join(data_dir, 'weight-0'), 'w')
+ f.puts(weights.join("\t"))
+ f.close
+end
+
+def descend data_dir, iteration
+ input_weights = File.join(data_dir, "weight-#{iteration}")
+ output_weights = File.join(data_dir, "weight-#{iteration+1}")
+
+ pig = File.join(ENV['PIG_HOME'], 'bin', 'pig')
+ pig_cmd = "#{pig} -x local -p weights=#{input_weights} -p weights_new=#{output_weights} gradient_descent.pig"
+ %x{#{pig_cmd}}
+end
+
+data_dir = "data"
+
+cleanup(data_dir)
+initialize_weights(data_dir)
+1000.times do |i|
+ descend(data_dir, i)
+end
View
13 udf/pom.xml
@@ -54,9 +54,16 @@
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <version>1.6.1</version>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.16</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
View
96 udf/src/main/java/sounder/pig/tuple/Zip.java
@@ -0,0 +1,96 @@
+package sounder.pig.tuple;
+
+import java.util.List;
+import java.util.ArrayList; // maybe use something else; LinkedList?
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ (w1,w2,...,wN) * (f1,f2,...,fN) => {(w1,f1),(w2,f2),...,(wN,fN)}
+ */
+public class Zip extends EvalFunc<DataBag> {
+
+ protected static TupleFactory tf = TupleFactory.getInstance();
+ protected static BagFactory bf = BagFactory.getInstance();
+
+ @Override
+ public Schema outputSchema(Schema input) {
+
+ try {
+ List<Schema.FieldSchema> outputTupleFields = new ArrayList<Schema.FieldSchema>(input.size()+1);
+
+ for (Schema.FieldSchema fieldSchema : input.getFields()) {
+ // Assumption is that fieldSchema is itself a tuple schema and all the
+ // fields have the same type as the first field
+ Schema.FieldSchema s = fieldSchema.schema.getField(0);
+ outputTupleFields.add(s);
+ }
+ outputTupleFields.add(new Schema.FieldSchema("order", DataType.INTEGER)); // Important to maintain position
+
+ Schema.FieldSchema tupleSchema = new Schema.FieldSchema("t", new Schema(outputTupleFields), DataType.TUPLE);
+ Schema.FieldSchema bagSchema = new Schema.FieldSchema("b", new Schema(tupleSchema), DataType.BAG);
+ return new Schema(bagSchema);
+ } catch (FrontendException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ if (input == null || input.size() < 2)
+ return null;
+
+ Tuple first = (Tuple)input.get(0);
+
+ List<Tuple> results = initialize(first);
+
+ for (int i = 1; i < input.size(); i++) {
+ Tuple next = (Tuple)input.get(i);
+ concatTuple(results, next);
+ }
+
+ return toBag(results);
+ }
+
+ private List<Tuple> initialize(Tuple first) throws ExecException {
+ List<Tuple> results = new ArrayList<Tuple>(first.size());
+ for (int i = 0; i < first.size(); i++) {
+ results.add(tf.newTuple(first.get(i)));
+ }
+ return results;
+ }
+
+ private void concatTuple(List<Tuple> tuples, Tuple input) throws ExecException {
+ for (int i = 0; i < tuples.size(); i++) {
+ Tuple t = tuples.get(i);
+ if (i < input.size()) {
+ t.append(input.get(i));
+ } else {
+ t.append(null);
+ }
+ }
+ }
+
+ private DataBag toBag(List<Tuple> tuples) {
+ DataBag result = bf.newDefaultBag();
+ int position = 0;
+ for (Tuple t : tuples) {
+ t.append(position);
+ result.add(t);
+ position++;
+ }
+ return result;
+ }
+
+}
View
42 udf/src/test/java/sounder/pig/tuple/ZipTest.java
@@ -0,0 +1,42 @@
+package sounder.pig.tuple;
+
+import static org.junit.Assert.*;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class ZipTest {
+
+ @Test
+ public void testUDF() throws Exception {
+ Zip udf = new Zip();
+
+ TupleFactory tf = TupleFactory.getInstance();
+
+ Tuple t1 = tf.newTuple(Arrays.asList("w1","w2","w3","w4"));
+ Tuple t2 = tf.newTuple(Arrays.asList("f1","f2","f3"));
+ Tuple input = tf.newTuple(Arrays.asList(t1,t2));
+
+ DataBag result = udf.exec(input);
+ // {(w1,f1,0),(w2,f2,1),(w3,f3,2),(w4,null,3)}
+
+ for (Tuple t : result) {
+ assertEquals(t.size(), 3);
+ assertFalse(t.isNull(0));
+ assertFalse(t.isNull(2));
+ if (t.get(2).equals(3)) { // (w4,null,3)
+ assertTrue(t.isNull(1));
+ }
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.