Permalink
Browse files

initial commit

  • Loading branch information...
0 parents commit 46b16811a0bf4585d9d97d070ffdb3c7283526b7 Pranab Ghosh committed Mar 23, 2012
5 .gitignore
@@ -0,0 +1,5 @@
+target/**/*
+.settings/
+.project
+.classpath
+
10 README
@@ -0,0 +1,10 @@
+Set of Hadoop based tools for web analytis. Currently includes the following
+
+- Bayesian discriminant analysis for visitor conversion prediction
+
+Will add the following
+
+- Extend bayesian discriminant analysis for multiple variables
+- Simple visit statistics
+
+
3 manifest.mf
@@ -0,0 +1,3 @@
+Manifest-Version: 1.0
+X-COMMENT: Main-Class will be added automatically by build
+
127 pom.xml
@@ -0,0 +1,127 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>mawazo</groupId>
+ <artifactId>visitante</artifactId>
+ <version>1.0</version>
+ <name>visitante</name>
+
+ <packaging>jar</packaging>
+
+ <profiles>
+ <profile>
+ <id>compiler</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <log4j.version>1.2.16</log4j.version>
+ <hadoop.version>0.20.2-cdh3u2</hadoop.version>
+ <jackson.version>1.6.3</jackson.version>
+ <jdk.level>1.6</jdk.level>
+ </properties>
+ </profile>
+ </profiles>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.1</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.3.1</version>
+ <configuration>
+ <archive>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <classifier>jdk15</classifier>
+ <version>5.8</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-lgpl </artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-lgpl </artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>mawazo</groupId>
+ <artifactId>chombo</artifactId>
+ <version>1.0</version>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>central</id>
+ <url>http://repo1.maven.org/maven2</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ </repository>
+ <repository>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <id>cloudera-releases</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ </repository>
+ </repositories>
+
+</project>
+
15 src/main/java/org/visitante/Main.java
@@ -0,0 +1,15 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.visitante;
+
+/**
+ *
+ * @author pranab
+ */
+public class Main {
+
+
+}
141 src/main/java/org/visitante/mr/bda/BayesDiscriminator.java
@@ -0,0 +1,141 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.visitante.mr.bda;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.visitante.util.Util;
+
+/**
+ *
+ * @author pranab
+ */
+public class BayesDiscriminator implements Tool {
+ private Configuration conf;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = new Job(getConf());
+ String jobName = "Bayesian discriminator MR";
+ job.setJobName(jobName);
+
+ job.setJarByClass(BayesDiscriminator.class);
+
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.setMapperClass(BayesDiscriminator.BayesDiscriminatorMapper.class);
+ job.setReducerClass(BayesDiscriminator.BayesDiscriminatorReducer.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setNumReduceTasks(1);
+
+ int status = job.waitForCompletion(true) ? 0 : 1;
+ return status;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new BayesDiscriminator(), args);
+ System.exit(exitCode);
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public static class BayesDiscriminatorMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
+ private Text keyHolder = new Text();
+ private IntWritable valueHolder = new IntWritable(1);
+ private Map<String, Integer> clickCount = new HashMap<String, Integer>();
+
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ for (String keyVal : clickCount.keySet()){
+ keyHolder.set(keyVal);
+ valueHolder.set(clickCount.get(keyVal));
+ context.write(keyHolder, valueHolder);
+ }
+ }
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String[] items = value.toString().split(",");
+ String keyVal = items[2] + "," + items[0];
+ Integer count = clickCount.get(keyVal);
+ if (null == count){
+ count = 0;
+ }
+ count = count + 1;
+ clickCount.put(keyVal, count);
+
+ }
+ }
+
+ public static class BayesDiscriminatorReducer extends Reducer<Text, IntWritable, NullWritable, Text> {
+ private Text valueHolder = new Text();
+ private Map<String, Integer> totalCount = new HashMap<String, Integer>();
+
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ for (String classValue : totalCount.keySet()){
+ int count = totalCount.get(classValue);
+ valueHolder.set(classValue + "," + count);
+ context.write(NullWritable.get(), valueHolder);
+ }
+ }
+
+ protected void reduce(Text key, Iterable<IntWritable> values, Context context)
+ throws IOException, InterruptedException {
+ int count = 0;
+ for (IntWritable value : values){
+ count += value.get();
+ }
+ String[] items = key.toString().split(",");
+ String classVal = items[0];
+ Integer classCount = totalCount.get(classVal);
+ if (null == classCount){
+ classCount = 0;
+ }
+ classCount = classCount + count;
+ totalCount.put(classVal, classCount);
+
+ valueHolder.set(key.toString() + "," + count);
+ context.write(NullWritable.get(), valueHolder);
+
+ }
+ }
+
+}
144 src/main/java/org/visitante/mr/bda/ClassBoundary.java
@@ -0,0 +1,144 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.visitante.mr.bda;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ * @author pranab
+ */
+public class ClassBoundary {
+ private List<String> lines = new ArrayList<String>();
+ private Histogram[] hists = new Histogram[2];
+
+ public void process(String fileName){
+ try {
+ BufferedReader in = new BufferedReader(new FileReader(fileName));
+ String str;
+ while ((str = in.readLine()) != null) {
+ lines.add(str);
+ }
+ in.close();
+
+ hists[0] = new Histogram();
+ hists[1] = new Histogram();
+
+ int totalCount = 0;
+ int min = Integer.MAX_VALUE;
+ int max = Integer.MIN_VALUE;
+ for (String line : lines){
+ //System.out.println("processing:" + line);
+ String[] items = line.split(",");
+ if (items.length == 2){
+ int classVal = Integer.parseInt(items[0]);
+ int classCount = Integer.parseInt(items[1]);
+ Histogram hist = hists[classVal];
+ hist.setTotal(classCount);
+ totalCount += classCount;
+ } else {
+ int classVal = Integer.parseInt(items[0]);
+ int value = Integer.parseInt(items[1]);
+ int count = Integer.parseInt(items[2]);
+ Histogram hist = hists[classVal];
+ hist.addCount(value, count);
+
+ if (value < min) {
+ min = value;
+ }
+ if (value > max) {
+ max = value;
+ }
+ }
+ }
+
+ for (Histogram hist : hists){
+ hist.setSampleTotal(totalCount);
+ hist.calculateProbability();
+ }
+
+ findBoundary(min, max);
+ } catch (IOException e) {
+ }
+ }
+
+ private void findBoundary(int min, int max){
+ Histogram hist0 = hists[0];
+ Histogram hist1 = hists[1];
+
+ for (int value = min; value <= max; ++value){
+ //System.out.println("processing value:" + value);
+ double h0 = hist0.getProbability(value) * hist0.getClassProb();
+ double h1 = hist1.getProbability(value) * hist1.getClassProb();
+ double postprob = h1 / (h0 + h1);
+ System.out.println("" + value + "\t" + postprob);
+ }
+ }
+
+
+ private static class Histogram {
+ private Map<Integer, Integer> countDist = new HashMap<Integer, Integer>();
+ private Map<Integer, Double> probDensity = new HashMap<Integer, Double>();
+ private int total;
+ private int sampleTotal;
+ private double classProb;
+
+ /**
+ * @param total the total to set
+ */
+ public void setTotal(int total) {
+ this.total = total;
+ System.out.println("total: " + total);
+ }
+
+ public void addCount(int value, int count) {
+ countDist.put(value, count);
+ //System.out.println("value: " + value + "count: " + count);
+ }
+
+ public void calculateProbability(){
+ classProb = ((double)total) / sampleTotal;
+
+ for (int value : countDist.keySet()){
+ int count = countDist.get(value);
+ double prob = ((double)count) / total;
+ probDensity.put(value, prob);
+ //System.out.println("value: " + value + "prob: " + prob);
+
+ }
+
+ }
+
+ public double getProbability(int value){
+ return probDensity.get(value);
+ }
+
+ /**
+ * @param sampleTotal the sampleTotal to set
+ */
+ public void setSampleTotal(int sampleTotal) {
+ this.sampleTotal = sampleTotal;
+ }
+
+ /**
+ * @return the classProb
+ */
+ public double getClassProb() {
+ return classProb;
+ }
+ }
+
+ public static void main(String[] args){
+ new ClassBoundary().process(args[0]);
+ }
+
+}
91 src/main/java/org/visitante/mr/logit/LogitEstimator.java
@@ -0,0 +1,91 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.visitante.mr.logit;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ * @author pranab
+ */
+public class LogitEstimator implements Tool {
+ private Configuration conf;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = new Job(getConf());
+ String jobName = "Logit estimator MR";
+ job.setJobName(jobName);
+
+ job.setJarByClass(LogitEstimator.class);
+
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.setMapperClass(LogitEstimator.LogitEstimatorMapper.class);
+ //job.setCombinerClass(DomainRange.DomainRangeReducer.class);
+ job.setReducerClass(LogitEstimator.LogitEstimatorReducer.class);
+
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setNumReduceTasks(1);
+
+ int status = job.waitForCompletion(true) ? 0 : 1;
+ return status;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new LogitEstimator(), args);
+ System.exit(exitCode);
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public static class LogitEstimatorMapper extends Mapper<LongWritable, Text, Text, Text> {
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ }
+ }
+
+ public static class LogitEstimatorReducer extends Reducer<Text, Text, NullWritable, Text> {
+ protected void reduce(Text key, Iterable<Text> values, Context context)
+ throws IOException, InterruptedException {
+ }
+ }
+
+}
68 src/main/java/org/visitante/util/Util.java
@@ -0,0 +1,68 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.visitante.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ *
+ * @author pranab
+ */
+public class Util {
+ public static List<String> splitTokens(String text) {
+ return splitTokens(text, ",");
+ }
+
+ public static List<String> splitTokens(String text, String delim){
+ List<String> tokens = new ArrayList<String>();
+ String[] tokenArr = text.split(delim);
+ for (String token : tokenArr){
+ tokens.add(token.trim());
+ }
+ return tokens;
+ }
+
+ public static String concatTokens(Collection<?> tokens){
+ return concatTokens( tokens, ",");
+ }
+
+ public static String concatTokens(Collection<?> tokens, String delim){
+ String text="";
+ StringBuilder stBuilder = new StringBuilder();
+ for (Object token : tokens){
+ stBuilder.append(token).append(delim);
+ }
+ text = stBuilder.substring(0, stBuilder.length() - 1);
+ return text;
+ }
+
+ public static <T> List<T> getSubList(List<T> list, int start, int end){
+ List<T> subList = new ArrayList<T>();
+ int len = list.size();
+ if (start >= 0 && end > start && end <= len ){
+ for (int i = start; i < end; ++i){
+ subList.add(list.get(i));
+ }
+ }
+ return subList;
+ }
+
+ public static <T> List<T> getSubList(List<T> list, List<Integer> selectors){
+ List<T> subList = new ArrayList<T>();
+ int len = list.size();
+ for (int i : selectors){
+ if (i >= len){
+ throw new IllegalArgumentException("Index out of range of source list");
+ }
+ subList.add(list.get(i));
+ }
+ return subList;
+
+ }
+}
+
BIN target/visitante-1.0.jar
Binary file not shown.

0 comments on commit 46b1681

Please sign in to comment.