Skip to content
This repository has been archived by the owner on Jul 30, 2020. It is now read-only.

Commit

Permalink
CPI expressions for SA evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
mschwarzer committed Jul 24, 2017
1 parent badef3e commit d171735
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 211 deletions.
19 changes: 19 additions & 0 deletions src/main/java/org/wikipedia/citolytics/WikiSimAbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.utils.ParameterTool;
Expand Down Expand Up @@ -198,4 +199,22 @@ public ParameterTool getParams() {
}
return params;
}

protected void summarize(int[] fields) {
enableSingleOutputFile();

if(fields.length > 0) {
AggregateOperator<T> tmp = null;

for (int i = 0; i < fields.length; i++) {

if (i == 0)
tmp = result.sum(fields[i]);
else
tmp = tmp.andSum(fields[i]);
}

result = tmp;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
Expand All @@ -17,7 +17,6 @@
import org.wikipedia.citolytics.clickstream.utils.ClickStreamHelper;
import org.wikipedia.citolytics.cpa.io.WikiOutputFormat;
import org.wikipedia.citolytics.cpa.io.WikiSimReader;
import org.wikipedia.citolytics.cpa.types.RecommendationPair;
import org.wikipedia.citolytics.cpa.types.RecommendationSet;
import org.wikipedia.citolytics.cpa.utils.WikiSimConfiguration;
import org.wikipedia.citolytics.seealso.operators.MLTInputMapper;
Expand All @@ -37,14 +36,7 @@ public class ClickStreamEvaluation extends WikiSimAbstractJob<ClickStreamResult>
private static String lang = null;
private static boolean summary = false;
private static String cpiExpr;
private static int fieldScore;
private static int fieldPageA;
private static int fieldPageB;
int fieldPageIdA;
int fieldPageIdB;


private int topK = 10;
private boolean mltResults = false;
public static DataSet<Tuple2<String, HashSet<String>>> links;

Expand All @@ -55,31 +47,22 @@ public static void main(String[] args) throws Exception {

public void init() {

ParameterTool params = ParameterTool.fromArgs(args);

wikiSimInputFilename = params.getRequired("wikisim");
clickStreamInputFilename = params.getRequired("gold");
outputFilename = params.getRequired("output");
topK = params.getInt("topk", WikiSimConfiguration.DEFAULT_TOP_K);

langLinksInputFilename = params.get("langlinks");
lang = params.get("lang");
summary = params.has("summary");
cpiExpr = params.get("cpi");
articleStatsFilename = params.get("article-stats");
idTitleMappingFilename = params.get("id-title-mapping");
topRecommendationsFilename = params.get("top-recommendations");

fieldScore = params.getInt("score", RecommendationPair.CPI_LIST_KEY);
fieldPageA = params.getInt("page-a", RecommendationPair.PAGE_A_KEY);
fieldPageB = params.getInt("page-b", RecommendationPair.PAGE_B_KEY);
fieldPageIdA = getParams().getInt("page-id-a", RecommendationPair.PAGE_A_ID_KEY);
fieldPageIdB = getParams().getInt("page-id-b", RecommendationPair.PAGE_B_ID_KEY);
wikiSimInputFilename = getParams().getRequired("wikisim");
clickStreamInputFilename = getParams().getRequired("gold");
outputFilename = getParams().getRequired("output");

langLinksInputFilename = getParams().get("langlinks");
lang = getParams().get("lang");
summary = getParams().has("summary");
cpiExpr = getParams().get("cpi");
articleStatsFilename = getParams().get("article-stats");
idTitleMappingFilename = getParams().get("id-title-mapping");
topRecommendationsFilename = getParams().get("top-recommendations");

}

public void plan() throws Exception {


// Name
setJobName("ClickStreamEvaluation");

Expand All @@ -90,27 +73,25 @@ public void plan() throws Exception {

// WikiSim
DataSet<RecommendationSet> recommendationSets;
Configuration config = WikiSimReader.getConfigFromArgs(getParams());

// CPA or MLT results?
if (fieldScore >= 0 && fieldPageA >= 0 && fieldPageB >= 0) {
// CPA
jobName += " CPA Score=" + fieldScore + "; Page=[" + fieldPageA + ";" + fieldPageB + "]";
int topK = getParams().getInt("topk", WikiSimConfiguration.DEFAULT_TOP_K);

recommendationSets = WikiSimReader.buildRecommendationSets(env,
WikiSimReader.readWikiSimOutput(env, wikiSimInputFilename, fieldPageA, fieldPageB, fieldScore,
fieldPageIdA, fieldPageIdB),
topK, cpiExpr, articleStatsFilename, false);

} else {
// CPA or MLT results?
if (getParams().has("mlt")) {
// MLT
jobName += " MLT";

Configuration config = new Configuration();
config.setInteger("topK", topK);

recommendationSets = env.readTextFile(wikiSimInputFilename)
.flatMap(new MLTInputMapper())
.withParameters(config);
} else {
// CPA
jobName += " CPA";

recommendationSets = WikiSimReader.buildRecommendationSets(env,
WikiSimReader.readWikiSimOutput(env, wikiSimInputFilename, config),
topK, cpiExpr, articleStatsFilename, false);
}

// Evaluation
Expand All @@ -123,71 +104,63 @@ public void plan() throws Exception {
// Top recommended articles (only #1 recommendations)
// TODO limit out
if(topRecommendationsFilename != null) {
DataSet<Tuple2<String, Long>> topRecommendations = result.flatMap(new FlatMapFunction<ClickStreamResult, Tuple2<String, Long>>() {
@Override
public void flatMap(ClickStreamResult r, Collector<Tuple2<String, Long>> out) throws Exception {
if (r.getRecommendationsCount() > 0) {
out.collect(new Tuple2<>(r.getRecommendations().get(0).getRecommendedArticle(),
1L));
}
}
})
.groupBy(0)
.sum(1)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> a, Tuple2<String, Long> b) throws Exception {
// Keep article name
return a.f1 > b.f1 ? a : b;
}
});

// Distinct recommendations
DataSet<Tuple2<String, Long>> distinctRecommendations = result.flatMap(new FlatMapFunction<ClickStreamResult, Tuple2<String, Long>>() {
@Override
public void flatMap(ClickStreamResult clickStreamResult, Collector<Tuple2<String, Long>> out) throws Exception {
for(ClickStreamRecommendationResult r: clickStreamResult.getRecommendations()) {
out.collect(new Tuple2<>(r.getRecommendedArticle(), 1L));
}

}
}).distinct(0)
.sum(1)
.map(new MapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Tuple2<String, Long> in) throws Exception {
in.setField("Distinct recommendations", 0);
return in;
}
});

DataSet<Tuple2<String, Long>> count = env.fromElements(new Tuple2<String, Long>(
"Article count", result.count())
);

topRecommendations = topRecommendations
.union(distinctRecommendations)
.union(count);

topRecommendations
.write(new WikiOutputFormat<>(topRecommendationsFilename), topRecommendationsFilename, FileSystem.WriteMode.OVERWRITE)
.setParallelism(1);
saveTopRecommendations(env, result, topRecommendationsFilename);
}


// Summarize results if requested
if(summary) {
enableSingleOutputFile();

result = result.sum(ClickStreamResult.IMPRESSIONS_KEY)
.andSum(ClickStreamResult.CLICKS_KEY)
.andSum(ClickStreamResult.CLICKS_K1_KEY)
.andSum(ClickStreamResult.CLICKS_K2_KEY)
.andSum(ClickStreamResult.CLICKS_K3_KEY)
.andSum(ClickStreamResult.RECOMMENDATIONS_COUNT_KEY)
.andSum(ClickStreamResult.OPTIMAL_CLICKS);
summarize(ClickStreamResult.getSummaryFields());
}
}

private static void saveTopRecommendations(ExecutionEnvironment env, DataSet<ClickStreamResult> result, String topRecommendationsFilename) throws Exception {
DataSet<Tuple2<String, Long>> topRecommendations = result.flatMap(new FlatMapFunction<ClickStreamResult, Tuple2<String, Long>>() {
@Override
public void flatMap(ClickStreamResult t, Collector<Tuple2<String, Long>> out) throws Exception {
if (t.getRecommendationsCount() > 0) {
out.collect(new Tuple2<>(t.getTopRecommendations(), 1L));
}
}
})
.groupBy(0)
.sum(1)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> a, Tuple2<String, Long> b) throws Exception {
// Keep article name
return a.f1 > b.f1 ? a : b;
}
});

// Distinct recommendations
DataSet<Tuple2<String, Long>> distinctRecommendations = result.flatMap(new FlatMapFunction<ClickStreamResult, Tuple2<String, Long>>() {
@Override
public void flatMap(ClickStreamResult clickStreamResult, Collector<Tuple2<String, Long>> out) throws Exception {
for (ClickStreamRecommendationResult r : clickStreamResult.getRecommendations()) {
out.collect(new Tuple2<>(r.getRecommendedArticle(), 1L));
}

}
}).distinct(0)
.sum(1)
.map(new MapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Tuple2<String, Long> in) throws Exception {
in.setField("Distinct recommendations", 0);
return in;
}
});

DataSet<Tuple2<String, Long>> count = env.fromElements(new Tuple2<String, Long>(
"Article count", result.count())
);

topRecommendations = topRecommendations
.union(distinctRecommendations)
.union(count);

topRecommendations
.write(new WikiOutputFormat<>(topRecommendationsFilename), topRecommendationsFilename, FileSystem.WriteMode.OVERWRITE)
.setParallelism(1);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.wikipedia.citolytics.clickstream.types;

import org.apache.flink.api.java.tuple.Tuple9;
import org.wikipedia.citolytics.seealso.types.EvaluationResult;

import java.util.ArrayList;

Expand All @@ -12,7 +13,7 @@
*/
public class ClickStreamResult extends
// Tuple10<String, ArrayList<Tuple4<String, Double, Integer, Double>>, Integer, Integer, Integer, Double, Integer, Double, Integer, Double>
Tuple9<String, ArrayList<ClickStreamRecommendationResult>, Integer, Integer, Integer, Integer, Integer, Integer, Integer> {
Tuple9<String, ArrayList<ClickStreamRecommendationResult>, Integer, Integer, Integer, Integer, Integer, Integer, Integer> implements EvaluationResult {
public final static int ARTICLE_KEY = 0;
public final static int RECOMMENDATIONS_LIST_KEY = 1;
public final static int RECOMMENDATIONS_COUNT_KEY = 2;
Expand Down Expand Up @@ -47,6 +48,7 @@ public ArrayList<ClickStreamRecommendationResult> getRecommendations() {
return getField(RECOMMENDATIONS_LIST_KEY);
}

@Override
public int getRecommendationsCount() {
return getField(RECOMMENDATIONS_COUNT_KEY);
}
Expand Down Expand Up @@ -74,4 +76,25 @@ public int getClicks3() {
public int getOptimalClicks() {
return getField(OPTIMAL_CLICKS);
}

@Override
public String getTopRecommendations() {
if(getRecommendations().size() > 0) {
return getRecommendations().get(0).getRecommendedArticle();
} else {
return null;
}
}

public static int[] getSummaryFields() {
return new int[]{
ClickStreamResult.IMPRESSIONS_KEY,
ClickStreamResult.CLICKS_KEY,
ClickStreamResult.CLICKS_K1_KEY,
ClickStreamResult.CLICKS_K2_KEY,
ClickStreamResult.CLICKS_K3_KEY,
ClickStreamResult.RECOMMENDATIONS_COUNT_KEY,
ClickStreamResult.OPTIMAL_CLICKS
};
}
}
18 changes: 18 additions & 0 deletions src/main/java/org/wikipedia/citolytics/cpa/io/WikiSimReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.wikipedia.citolytics.cpa.operators.ComputeComplexCPI;
Expand Down Expand Up @@ -92,6 +93,23 @@ public static void collectRecommendationsFromPair(RecommendationPair pair, Colle

}

public static Configuration getConfigFromArgs(ParameterTool args) {

Configuration config = new Configuration();

config.setInteger("fieldPageA", args.getInt("page-a", RecommendationPair.PAGE_A_KEY));
config.setInteger("fieldPageIdA", args.getInt("page-id-a", RecommendationPair.PAGE_A_ID_KEY));

config.setInteger("fieldPageB", args.getInt("page-b", RecommendationPair.PAGE_B_KEY));
config.setInteger("fieldPageIdB", args.getInt("page-id-b", RecommendationPair.PAGE_B_ID_KEY));

config.setInteger("fieldScore", args.getInt("score", RecommendationPair.CPI_LIST_KEY));

config.setInteger("topK", args.getInt("topk", WikiSimConfiguration.DEFAULT_TOP_K));

return config;
}

public static DataSet<Recommendation> readWikiSimOutput(ExecutionEnvironment env, String filename, Configuration config) throws Exception {
Log.info("Reading WikiSim from " + filename);

Expand Down

0 comments on commit d171735

Please sign in to comment.