Permalink
Browse files

Add total pairs shit

  • Loading branch information...
1 parent 7807f7d commit 64376ec0b2d28083108728136e7fbac6312d2124 @jonahgeorge jonahgeorge committed Feb 17, 2017
@@ -3,3 +3,4 @@ output
target
*.iml
dependency-reduced-pom.xml
+intermediate_output
@@ -1,24 +1,30 @@
# Purchase Network
```sh
+rm -rf output intermediate_output
mvn package
-
hadoop jar target/cs499-1.0-SNAPSHOT.jar PurchaseNetwork input output
```
## Description
-3.2 [60 pts] Map-Reduce Task 2: BeaverMart a big supermarket chain wants to find out what
+3.2 [60 pts] Map-Reduce Task 2:
+
+BeaverMart a big supermarket chain wants to find out what
items in its stores are bought together so it can optimize the store layouts. It has records of customer purchases tracked
using store reward cards. Each record is a tuple of items that are bought in a single transaction (e.g., {item1, item2, item3, …}).
Write a Map-Reduce program to compute how many times a pair of items are bought together.
Input File: Will be a .txt file containing records, one per line. The length of the file need not be fixed.
Example:
-Whitey Toothpaste, Best Bread, Fluffy Pizza, BeavMoo MilkApples, BeavMoo Milk, Bananas, Best Bread
+```
+Whitey Toothpaste, Best Bread, Fluffy Pizza, BeavMoo Milk
+Apples, BeavMoo Milk, Bananas, Best Bread
+```
Output File: Pairs of items along with the number of times they have been purchased together
Example:
+```
(Whitey Toothpaste, Best Bread) 1
(Whitey Toothpaste, Fluffy Pizza) 1
(Whitey Toothpaste, BeavMoo Milk) 1
@@ -35,8 +41,9 @@ Example:
(BeavMoo Milk, Bananas) 1
(Apples, Bananas) 1
Total Pairs: 15
+```
Grading:
-Program compiles and runs without failure: 10 points
-Passes two test cases: 20 points each
-Design (how fast and scalable is your design): 10 points
+- Program compiles and runs without failure: 10 points
+- Passes two test cases: 20 points each
+- Design (how fast and scalable is your design): 10 points
@@ -1,102 +1,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
-import org.paukov.combinatorics3.Generator;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
public class PurchaseNetwork {
-
- static class PurchaseNetworkKey implements WritableComparable<PurchaseNetworkKey> {
- Text first;
- Text second;
-
- PurchaseNetworkKey() {
- this.first = new Text();
- this.second = new Text();
- }
-
- PurchaseNetworkKey(String first, String second) {
- if (first.compareTo(second) == 0) {
- this.first = new Text(first);
- this.second = new Text(second);
- } else {
- this.first = new Text(second);
- this.second = new Text(first);
- }
- }
-
- @Override
- public int compareTo(PurchaseNetworkKey o) {
- if (first.compareTo(o.first) == 0) {
- return second.compareTo(o.second);
- } else {
- return first.compareTo(o.first);
- }
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- first.write(dataOutput);
- second.write(dataOutput);
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- first.readFields(dataInput);
- second.readFields(dataInput);
- }
-
- public String toString() {
- return "(" + first.toString() + ", " + second.toString() + ")";
- }
- }
-
- static class PurchaseNetworkMapper extends Mapper<Object, Text, PurchaseNetworkKey, IntWritable> {
- protected void map(Object _, Text value, Context context)
- throws IOException, InterruptedException {
-
- List<String> items = Arrays.asList(value.toString().split(", "));
-
- List<PurchaseNetworkKey> keys = Generator.combination(items)
- .simple(2)
- .stream()
- .map((pair) -> new PurchaseNetworkKey(pair.get(0), pair.get(1)))
- .collect(Collectors.toList());
-
- for (PurchaseNetworkKey key : keys) {
- context.write(key, new IntWritable(1));
- }
- }
- }
-
- static class PurchaseNetworkReducer extends Reducer<PurchaseNetworkKey, IntWritable, Text, IntWritable> {
- IntWritable result = new IntWritable();
-
- protected void reduce(PurchaseNetworkKey key, Iterable<IntWritable> values, Context context)
- throws IOException, InterruptedException {
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
- }
- result.set(sum);
- context.write(new Text(key.toString()), result);
- }
- }
+ private static final String INTERMEDIATE_PATH = "intermediate_output";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
@@ -107,17 +20,24 @@ public static void main(String[] args) throws Exception {
System.exit(2);
}
- Job job = new Job(conf, "PurchaseNetwork");
- job.setJarByClass(PurchaseNetwork.class);
- job.setMapperClass(PurchaseNetworkMapper.class);
- job.setReducerClass(PurchaseNetworkReducer.class);
-
- job.setOutputKeyClass(PurchaseNetworkKey.class);
- job.setOutputValueClass(IntWritable.class);
-
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
+ Job job1 = new Job(conf, "PurchaseNetwork");
+ job1.setJarByClass(PurchaseNetwork.class);
+ job1.setMapperClass(PurchaseNetworkMapper.class);
+ job1.setReducerClass(PurchaseNetworkReducer.class);
+ job1.setOutputKeyClass(PurchaseNetworkKey.class);
+ job1.setOutputValueClass(IntWritable.class);
+ FileInputFormat.addInputPath(job1, new Path(otherArgs[0]));
+ FileOutputFormat.setOutputPath(job1, new Path(INTERMEDIATE_PATH));
+ job1.waitForCompletion(true);
+
+ Job job2 = new Job(conf, "PurchaseNetworkCounter");
+ job2.setJarByClass(PurchaseNetwork.class);
+ job2.setMapperClass(PurchaseNetworkCounterMapper.class);
+ job2.setReducerClass(PurchaseNetworkCounterReducer.class);
+ job2.setOutputKeyClass(NullWritable.class);
+ job2.setOutputValueClass(Text.class);
+ FileInputFormat.addInputPath(job2, new Path(INTERMEDIATE_PATH));
+ FileOutputFormat.setOutputPath(job2, new Path(otherArgs[1]));
+ job2.waitForCompletion(true);
}
}
@@ -0,0 +1,12 @@
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+public class PurchaseNetworkCounterMapper extends Mapper<Object, Text, NullWritable, Text> {
+ protected void map(Object _, Text value, Context context)
+ throws IOException, InterruptedException {
+ context.write(NullWritable.get(), value);
+ }
+}
@@ -0,0 +1,20 @@
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+public class PurchaseNetworkCounterReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
+ private Integer count = 0;
+
+ protected void reduce(NullWritable _, Iterable<Text> values, Context context)
+ throws IOException, InterruptedException {
+ for (Text value : values) {
+ count += 1;
+ context.write(NullWritable.get(), value);
+ }
+
+ context.write(NullWritable.get(), new Text("Total Pairs: " + count));
+ }
+}
@@ -0,0 +1,51 @@
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class PurchaseNetworkKey implements WritableComparable<PurchaseNetworkKey> {
+ private Text first;
+ private Text second;
+
+ PurchaseNetworkKey() {
+ this.first = new Text();
+ this.second = new Text();
+ }
+
+ PurchaseNetworkKey(String first, String second) {
+ if (first.compareTo(second) <= 0) {
+ this.first = new Text(first);
+ this.second = new Text(second);
+ } else {
+ this.first = new Text(second);
+ this.second = new Text(first);
+ }
+ }
+
+ @Override
+ public int compareTo(PurchaseNetworkKey o) {
+ if (first.compareTo(o.first) == 0) {
+ return second.compareTo(o.second);
+ } else {
+ return first.compareTo(o.first);
+ }
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ first.write(dataOutput);
+ second.write(dataOutput);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ first.readFields(dataInput);
+ second.readFields(dataInput);
+ }
+
+ public String toString() {
+ return "(" + first.toString() + ", " + second.toString() + ")";
+ }
+}
@@ -0,0 +1,27 @@
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.paukov.combinatorics3.Generator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PurchaseNetworkMapper extends Mapper<Object, Text, PurchaseNetworkKey, IntWritable> {
+ protected void map(Object _, Text value, Context context)
+ throws IOException, InterruptedException {
+
+ List<String> items = Arrays.asList(value.toString().split(", "));
+
+ List<PurchaseNetworkKey> keys = Generator.combination(items)
+ .simple(2)
+ .stream()
+ .map((pair) -> new PurchaseNetworkKey(pair.get(0), pair.get(1)))
+ .collect(Collectors.toList());
+
+ for (PurchaseNetworkKey key : keys) {
+ context.write(key, new IntWritable(1));
+ }
+ }
+}
@@ -0,0 +1,19 @@
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+public class PurchaseNetworkReducer extends Reducer<PurchaseNetworkKey, IntWritable, Text, IntWritable> {
+ IntWritable result = new IntWritable();
+
+ protected void reduce(PurchaseNetworkKey key, Iterable<IntWritable> values, Context context)
+ throws IOException, InterruptedException {
+ int sum = 0;
+ for (IntWritable val : values) {
+ sum += val.get();
+ }
+ result.set(sum);
+ context.write(new Text(key.toString()), result);
+ }
+}
@@ -0,0 +1,30 @@
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class PurchaseNetworkKeyTest {
+
+ @Test
+ public void testAlphabetical() {
+ PurchaseNetworkKey key = null;
+
+ key = new PurchaseNetworkKey("Apple", "Banana");
+ assertEquals("(Apple, Banana)", key.toString());
+
+ key = new PurchaseNetworkKey("Banana", "Apple");
+ assertEquals("(Apple, Banana)", key.toString());
+ }
+}
@@ -0,0 +1,24 @@
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class PurchaseNetworkMapperTest {
+ @Test
+ public void testMapper() throws InterruptedException, IOException {
+ PurchaseNetworkMapper mapper = new PurchaseNetworkMapper();
+ Mapper.Context context = mock(Mapper.Context.class);
+
+ mapper.map(NullWritable.get(), new Text("Whitey Toothpaste, Best Bread, Fluffy Pizza"), context);
+ verify(context, Mockito.times(3)).write(any(PurchaseNetworkKey.class), eq(new IntWritable(1)));
+ }
+}
Oops, something went wrong.

0 comments on commit 64376ec

Please sign in to comment.