Skip to content

Commit

Permalink
transaction freq and recency fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranab Ghosh committed Feb 12, 2016
1 parent e1ad3c7 commit 19807a1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 82 deletions.
86 changes: 23 additions & 63 deletions pom.xml
Expand Up @@ -10,6 +10,20 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma

<packaging>jar</packaging>

<profiles>
<profile>
<id>compiler</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<log4j.version>1.2.16</log4j.version>
<hadoop.version>0.20.2-cdh3u2</hadoop.version>
<jackson.version>1.9.13</jackson.version>
<jdk.level>1.6</jdk.level>
</properties>
</profile>
</profiles>

<build>
<pluginManagement>
Expand Down Expand Up @@ -43,7 +57,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
<version>${log4j.version}</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
Expand All @@ -55,6 +69,12 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.testng</groupId>
Expand All @@ -67,13 +87,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
<version>${jackson.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -154,67 +174,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>

</repositories>

<profiles>
<profile>
<id>default</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.5.0-mr1-cdh5.3.1</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.0-mr1-cdh5.3.1</version>
</dependency>

</dependencies>
</profile>

<profile>
<id>yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.5.0-cdh5.3.1</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.5.0-cdh5.3.1</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<version>2.5.0-cdh5.3.1</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.5.0-cdh5.3.1</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.5.0-cdh5.3.1</version>
</dependency>

</dependencies>
</profile>

</profiles>
</project>

Expand Up @@ -32,6 +32,8 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.chombo.mr.TimeGapSequenceGenerator;
import org.chombo.util.SecondarySort;
import org.chombo.util.Tuple;
Expand All @@ -45,6 +47,7 @@
*/
public class TransactionFrequencyRecencyValue extends Configured implements Tool {
private static String configDelim = ",";
private static final Logger LOG = Logger.getLogger(TransactionFrequencyRecencyValue.class);

@Override
public int run(String[] args) throws Exception {
Expand Down Expand Up @@ -114,6 +117,9 @@ public static class TransactionReducer extends Reducer<Tuple, Tuple, NullWritab
*/
protected void setup(Context context) throws IOException, InterruptedException {
Configuration config = context.getConfiguration();
if (config.getBoolean("debug.on", false)) {
LOG.setLevel(Level.DEBUG);
}
fieldDelim = config.get("field.delim.out", ",");
timeGapUnit = config.get("trf.time.gap.unit");
numIDFields = Utility.intArrayFromString(config.get("trf.id.field.ordinals"), configDelim).length;
Expand Down Expand Up @@ -147,7 +153,7 @@ protected void cleanup(Context context) throws IOException, InterruptedException
stBld.append("globalStdDevTimeGap=").append(Utility.formatDouble(globalStdDevTimeGap, 3)).append('\n');

Configuration config = context.getConfiguration();
Utility.appendToFile(config, "trf.xaction.stats.file.path", stBld.toString());
Utility.writeToFile(config, "trf.xaction.stats.file.path", stBld.toString());
}

/* (non-Javadoc)
Expand All @@ -168,25 +174,31 @@ protected void reduce(Tuple key, Iterable<Tuple> values, Context context)
xactionValues.add(Double.parseDouble(val.getString(1)));
}

//average time gap
averageTimeGap();

//average value
averageValue();

//recency
recency = now - lastTimeStamp;
recency = convertTimeUnit(recency);

stBld.delete(0, stBld.length());
for (int i = 0; i < numIDFields; ++i) {
stBld.append(key.getString(i)).append(fieldDelim);
//continue only if there are enough samples
if (xactionTimeGaps.size() >= 2) {
//average time gap
averageTimeGap();

//average value
averageValue();

//recency
recency = now - lastTimeStamp;
recency = convertTimeUnit(recency);

stBld.delete(0, stBld.length());
for (int i = 0; i < numIDFields; ++i) {
stBld.append(key.getString(i)).append(fieldDelim);
}
stBld.append(avTimeGap).append(fieldDelim);
stBld.append(recency).append(fieldDelim);
stBld.append(Utility.formatDouble(avValue, 2));
outVal.set(stBld.toString());
context.write(NullWritable.get(), outVal);
} else {
LOG.debug("skipping not enough samples id:" + key.toString(0, numIDFields));
context.getCounter("Entity","Not enough samples ").increment(1);
}
stBld.append(avTimeGap).append(fieldDelim);
stBld.append(recency).append(fieldDelim);
stBld.append(Utility.formatDouble(avValue, 2));
outVal.set(stBld.toString());
context.write(NullWritable.get(), outVal);
}

/**
Expand Down

0 comments on commit 19807a1

Please sign in to comment.