Permalink
Browse files

Merge branch 'master' of github.com:reasonsolo/mmds_prj_repo

  • Loading branch information...
2 parents cf4b1e9 + 6b0bcaa commit 8b8514ecd8dfc1fc4a461428959faef19fa28d25 @hiprince hiprince committed May 27, 2012
View
@@ -1,6 +1,3 @@
-<<<<<<< HEAD
./kmean/bin/
-=======
-./kmean/bin/
->>>>>>> bd98474a42f869219673bba662de8010e566ebee
*.class
+*.txt
View

Large diffs are not rendered by default.

Oops, something went wrong.
View

Large diffs are not rendered by default.

Oops, something went wrong.
View

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -1,4 +1,4 @@
-package Canopy;
+package canopy;
import java.io.DataInput;
import java.io.DataOutput;
@@ -1,4 +1,4 @@
-package Canopy;
+package canopy;
import java.io.IOException;
import java.net.URI;
@@ -40,6 +40,10 @@ public CanopyClusterer(DistanceMeasure dm, double t1, double t2) {
nextID = 0;
}
+ public CanopyClusterer(Configuration configuration) {
+ // TODO Auto-generated constructor stub
+ }
+
public void loadClusters(String clusterPath, Configuration conf) // need to
// be
// checked
View
No changes.
View
@@ -6,7 +6,7 @@
import org.apache.hadoop.mapreduce.Reducer;
import utils.VectorDoubleWritable;
-import Canopy.Canopy;
+import canopy.Canopy;
public class CanopyCombiner
extends
@@ -0,0 +1,71 @@
+package mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+
+import canopy.*;
+import config.Constants;
+
+
+public class CanopyDriver {
+ public static void configure() {
+
+ }
+
+ public static void main(String[] args){
+
+ if (args.length < 5) {
+ System.out.println("Usage: " + args[0] + " <t1> <t2> <input> <output>");
+ System.exit(0);
+ }
+
+ Configuration conf = new Configuration();
+ Path in = new Path(args[3]);
+ Path out = new Path(args[4]);
+
+ Counter converge = null;
+ Counter total = null;
+
+ try {
+
+ while (converge != total) {
+ Job job = new Job(conf);
+ job.setNumReduceTasks(2);
+ job.setJobName("Canopy clustering");
+
+ job.setMapperClass(CanopyMapper.class);
+ job.setReducerClass(CanopyReducer.class);
+ job.setJarByClass(CanopyDriver.class);
+
+ SequenceFileInputFormat.addInputPath(job, in);
+ SequenceFileOutputFormat.setOutputPath(job, out);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(Canopy.class);
+
+ job.waitForCompletion(true);
+
+ converge = job.getCounters().getGroup(Constants.COUNTER_GROUP)
+ .findCounter(Constants.COUNTER_CONVERGED);
+ total = job.getCounters().getGroup(Constants.COUNTER_GROUP)
+ .findCounter(Constants.COUNTER_TOTAL);
+ }
+
+ } catch (Exception e) {
+ // TODO:
+ // a better error report routine
+ e.printStackTrace();
+ }
+
+ }
+
+}
+
View
@@ -8,9 +8,10 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Mapper;
+import canopy.Canopy;
+import canopy.CanopyClusterer;
+
import utils.VectorDoubleWritable;
-import Canopy.Canopy;
-import Canopy.CanopyClusterer;
public class CanopyMapper extends
Mapper<Text, VectorDoubleWritable, IntWritable, VectorDoubleWritable> {
@@ -44,10 +45,10 @@ protected void setup(Context context) throws IOException,
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
- for (Canopy canopy : canopies) {
- context.write(new Text("centroid"),
- new VectorWritable(canopy.center()));
+ for (Canopy canopy : canopies) {
+ context.write(new IntWritable(canopy.getId()),
+ canopy.getCentroid());
}
- super.cleanup(contex);
+ super.cleanup(context);
}
}
View
@@ -2,50 +2,55 @@
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
-import clusterer.Cluster;
-import clusterer.Clusterer;
-import config.ConfigConstants;
+import canopy.*;
+
+import utils.VectorDoubleWritable;
+
+import config.Constants;
import distanceMeasure.DistanceMeasure;
import distanceMeasure.EuclideanDistance;
-
public class CanopyReducer extends
- Reducer<Text, VectorWritable, Text, Canopy> {
+ Reducer<IntWritable, VectorDoubleWritable, Text, Canopy> {
- private final Collection<Canopy> canopies = new ArrayList<Canopy>();
+ private final ArrayList<Canopy> canopies = new ArrayList<Canopy>();
- private CanopyCluster canopyCluster;
+ private CanopyClusterer canopyClusterer;
- CanopyCluster getCanopyClusterer() {
+ CanopyClusterer getCanopyClusterer()
+ {
return canopyClusterer;
- }
+ }
+
@Override
- protected void reduce (Text arg0, Iterable<VectorWritable> values, Context contex)
- throw IOException, InterruptedException {
+ public void reduce (IntWritable key, Iterable<VectorDoubleWritable> values, Context context)
+ throws IOException, InterruptedException {
- for(VectorWritable value : vakues) {
- canopyClusterer.addPointToCanopies(point, canopies);
+ for(VectorDoubleWritable value : values) {
+ canopyClusterer.addPointToCanopies(value, canopies);
}
for(Canopy canopy: canopies){
- context.write(new Text(canopy.getID() ) , canopy);
+ context.write(new Text(canopy.getId().toString() ) , canopy);
}
}
@Override
- protected void setup(Context context) throw IOException, InterruptedExcetion {
+ public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- canopyCLusterer = new CanopyCluster ( context.getConfiguration());
+ canopyClusterer = new CanopyClusterer ( context.getConfiguration());
}
}
@@ -23,24 +23,31 @@ public static void main(String[] args) {
}
Configuration conf = new Configuration();
- Path in = new Path(args[1]);
- Path out = new Path(args[2]);
Counter converge = null;
Counter total = null;
+ Path in;
+ Path out;
+ int iterCounter = 0;
try {
-
- while (converge != total) {
+ while (iterCounter == 0 || converge != total) {
Job job = new Job(conf);
job.setNumReduceTasks(2);
job.setJobName("K-means clustering");
job.setMapperClass(KmeansMapper.class);
job.setReducerClass(KmeansReducer.class);
job.setJarByClass(KmeansDriver.class);
-
+
+ if (iterCounter == 0)
+ in = new Path(args[1]);
+ else
+ // load the output of last iteration
+ in = new Path(args[1] + ".part" + (iterCounter-1));
+ out = new Path(args[1] + ".part" + iterCounter);
SequenceFileInputFormat.addInputPath(job, in);
SequenceFileOutputFormat.setOutputPath(job, out);
+
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
@@ -53,6 +60,7 @@ public static void main(String[] args) {
.findCounter(Constants.COUNTER_CONVERGED);
total = job.getCounters().getGroup(Constants.COUNTER_GROUP)
.findCounter(Constants.COUNTER_TOTAL);
+ iterCounter++;
}
} catch (Exception e) {
@@ -45,9 +45,8 @@ public void setup(Context context) throws IOException, InterruptedException {
DistanceMeasure dm = null;
try {
dm = (DistanceMeasure) Class.forName(
- "distanceMeasure."
- + conf.get(Constants.DISTANCE_MEASURE,
- "EuclideanDistance")).newInstance();
+ "distanceMeasure." + conf.get(Constants.DISTANCE_MEASURE,
+ "EuclideanDistance")).newInstance();
} catch (InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
dm = new EuclideanDistance();

0 comments on commit 8b8514e

Please sign in to comment.