Permalink
Browse files

Topic sensitive page rank via node biasing

  • Loading branch information...
1 parent f3e8af4 commit b8dc70888611d5850b1cce5fa7e28cf76f92cfdb @matthayes matthayes committed Mar 30, 2012
@@ -48,13 +48,14 @@
private long nodeCount;
// the damping factor
- private static float ALPHA = 0.85f;
+ private float alpha = 0.85f;
// edge weights (which are doubles) are multiplied by this value so they can be stored as integers internally
private static float EDGE_WEIGHT_MULTIPLIER = 100000;
private final Int2IntOpenHashMap nodeIndices = new Int2IntOpenHashMap();
- private final FloatArrayList nodeData = new FloatArrayList(); // rank, total weight, contribution, (repeat)
+ private final FloatArrayList nodeData = new FloatArrayList(); // rank, total weight, contribution, bias(optional), (repeat)
+ private int nodeFieldCount = 3; // unless biasing is enabled
private final IntArrayList danglingNodes = new IntArrayList();
@@ -63,6 +64,7 @@
private boolean shouldHandleDanglingNodes = false;
private boolean shouldCacheEdgesOnDisk = false;
private long edgeCachingThreshold;
+ private boolean nodeBiasingEnabled = false;
private File edgesFile;
private DataOutputStream edgeDataOutputStream;
@@ -95,6 +97,42 @@ public void clear() throws IOException
}
/**
+ * Gets the page rank alpha value.
+ * @return alpha
+ */
+ public float getAlpha()
+ {
+ return alpha;
+ }
+
+ /**
+ * Sets the page rank alpha value (default is 0.85);
+ * @param alpha
+ */
+ public void setAlpha(float alpha)
+ {
+ this.alpha = alpha;
+ }
+
+ public boolean isNodeBiasingEnabled()
+ {
+ return this.nodeBiasingEnabled;
+ }
+
+ public void enableNodeBiasing()
+ {
+ this.nodeBiasingEnabled = true;
+ this.nodeFieldCount = 4;
+ }
+
+ public void disableNodeBiasing()
+ {
+ this.nodeBiasingEnabled = false;
+ this.nodeFieldCount = 3;
+ }
+
+
+ /**
* Gets whether disk is being used to cache edges.
* @return True if the edges are cached on disk.
*/
@@ -201,18 +239,61 @@ private void maybeCreateNode(int nodeId)
this.nodeData.add(0.0f); // total weight
this.nodeData.add(0.0f); // contribution
+ if (this.nodeBiasingEnabled)
+ {
+ this.nodeData.add(0.0f); // bias
+ }
+
this.nodeIndices.put(nodeId, index);
this.nodeCount++;
}
}
- public void addEdges(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges) throws IOException
+ public float getNodeBias(int nodeId)
+ {
+ if (!this.nodeBiasingEnabled)
+ {
+ throw new IllegalArgumentException("Node biasing not enable");
+ }
+ int nodeIndex = this.nodeIndices.get(nodeId);
+ return this.nodeData.get(nodeIndex+3);
+ }
+
+ public void setNodeBias(int nodeId, float bias)
+ {
+ if (!this.nodeBiasingEnabled)
+ {
+ throw new IllegalArgumentException("Node biasing not enable");
+ }
+
+ int nodeIndex = this.nodeIndices.get(nodeId);
+ this.nodeData.set(nodeIndex+3, bias);
+ }
+
+ public void addNode(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges) throws IOException
+ {
+ // with bias of 1.0, all nodes have an equal bias (that is, no bias)
+ addNode(sourceId, sourceEdges, 1.0f);
+ }
+
+ public void addNode(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges, float bias) throws IOException
{
int source = sourceId.intValue();
maybeCreateNode(source);
+ if (this.nodeBiasingEnabled)
+ {
+ setNodeBias(source, bias);
+ }
+ else if (bias != 1.0f)
+ {
+ // with node biasing disabled, all nodes implicitly have a bias of 1.0, which means no bias, so if anything else was specified
+ // it won't take effect.
+ throw new IllegalArgumentException("Bias was specified but node biasing not enabled");
+ }
+
if (this.shouldCacheEdgesOnDisk && !usingEdgeDiskCache && (sourceEdges.size() + this.edgeCount) >= this.edgeCachingThreshold)
{
writeEdgesToDisk();
@@ -263,12 +344,29 @@ public void init(ProgressIndicator progressIndicator) throws IOException
// initialize all nodes to an equal share of the total rank (1.0)
float nodeRank = 1.0f / this.nodeCount;
- for (int j=0; j<this.nodeData.size(); j+=3)
+ float totalBias = 0.0f;
+ for (int j=0; j<this.nodeData.size(); j+=this.nodeFieldCount)
{
nodeData.set(j, nodeRank);
progressIndicator.progress();
+ if (this.nodeBiasingEnabled)
+ {
+ totalBias += nodeData.getFloat(j+3);
+ }
}
+ // if node biasing enabled, need to normalize the bias by the total bias across all nodes so it represents
+ // the share of bias.
+ if (this.nodeBiasingEnabled)
+ {
+ for (int j=0; j<this.nodeData.size(); j+=this.nodeFieldCount)
+ {
+ float bias = nodeData.getFloat(j+3);
+ bias /= totalBias;
+ nodeData.set(j+3,bias);
+ }
+ }
+
Iterator<Integer> edgeData = getEdgeData();
while(edgeData.hasNext())
@@ -358,7 +456,7 @@ public void distribute(ProgressIndicator progressIndicator) throws IOException
// distribute the dangling node ranks to all the nodes in the graph
// note: the alpha factor is applied in the commit stage
float contributionIncrease = totalRank / this.nodeCount;
- for (int i=2; i<nodeData.size(); i += 3)
+ for (int i=2; i<nodeData.size(); i += this.nodeFieldCount)
{
float contribution = nodeData.getFloat(i);
contribution += contributionIncrease;
@@ -371,12 +469,23 @@ public void commit(ProgressIndicator progressIndicator)
{
this.totalRankChange = 0.0f;
- for (int id : nodeIndices.keySet())
- {
- int nodeIndex = this.nodeIndices.get(id);
+ float oneMinusAlpha = (1.0f - this.alpha);
+ float oneMinusAlphaOverNodeCount = oneMinusAlpha / nodeCount;
+
+ for (int nodeIndex=0; nodeIndex<this.nodeData.size(); nodeIndex += this.nodeFieldCount)
+ {
+ float oldRank = this.nodeData.get(nodeIndex+2);
+ float newRank;
- float alpha = datafu.linkanalysis.PageRank.ALPHA;
- float newRank = (1.0f - alpha)/nodeCount + alpha * this.nodeData.get(nodeIndex+2);
+ if (this.nodeBiasingEnabled)
+ {
+ float bias = this.nodeData.get(nodeIndex+3);
+ newRank = bias * oneMinusAlpha + alpha * oldRank;
+ }
+ else
+ {
+ newRank = oneMinusAlphaOverNodeCount + alpha * oldRank;
+ }
this.nodeData.set(nodeIndex+2, 0.0f);
@@ -78,7 +78,9 @@
private int maxIters = 150;
private boolean useEdgeDiskStorage = false;
private boolean enableDanglingNodeHandling = false;
+ private boolean enableNodeBiasing = false;
private boolean aborted = false;
+ private float alpha = 0.85f;
TupleFactory tupleFactory = TupleFactory.getInstance();
BagFactory bagFactory = BagFactory.getInstance();
@@ -123,18 +125,21 @@ else if (parameterName.equals("dangling_nodes"))
{
enableDanglingNodeHandling = Boolean.parseBoolean(value);
}
+ else if (parameterName.equals("node_biasing"))
+ {
+ enableNodeBiasing = Boolean.parseBoolean(value);
+ }
+ else if (parameterName.equals("alpha"))
+ {
+ alpha = Float.parseFloat(value);
+ }
}
initialize();
}
private void initialize()
{
- long heapSize = Runtime.getRuntime().totalMemory();
- long heapMaxSize = Runtime.getRuntime().maxMemory();
- long heapFreeSize = Runtime.getRuntime().freeMemory();
-// System.out.println(String.format("Heap size: %d, Max heap size: %d, Heap free size: %d", heapSize, heapMaxSize, heapFreeSize));
-
if (useEdgeDiskStorage)
{
this.graph.enableEdgeDiskCaching();
@@ -152,8 +157,18 @@ private void initialize()
{
this.graph.disableDanglingNodeHandling();
}
+
+ if (enableNodeBiasing)
+ {
+ this.graph.enableNodeBiasing();
+ }
+ else
+ {
+ this.graph.disableNodeBiasing();
+ }
this.graph.setEdgeCachingThreshold(maxEdgesInMemory);
+ this.graph.setAlpha(alpha);
}
@Override
@@ -172,6 +187,11 @@ public void accumulate(Tuple t) throws IOException
{
Integer sourceId = (Integer)sourceTuple.get(0);
DataBag edges = (DataBag)sourceTuple.get(1);
+ Double nodeBias = null;
+ if (enableNodeBiasing)
+ {
+ nodeBias = (Double)sourceTuple.get(2);
+ }
ArrayList<Map<String,Object>> edgesMapList = new ArrayList<Map<String, Object>>();
@@ -185,7 +205,14 @@ public void accumulate(Tuple t) throws IOException
edgesMapList.add(edgeMap);
}
- graph.addEdges(sourceId, edgesMapList);
+ if (enableNodeBiasing)
+ {
+ graph.addNode(sourceId, edgesMapList, nodeBias.floatValue());
+ }
+ else
+ {
+ graph.addNode(sourceId, edgesMapList);
+ }
if (graph.nodeCount() + graph.edgeCount() > maxNodesAndEdges)
{
@@ -321,6 +348,21 @@ public Schema outputSchema(Schema input)
Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+ if (!this.enableNodeBiasing)
+ {
+ if (inputTupleSchema.size() != 2)
+ {
+ throw new RuntimeException("Expected two fields for the node data");
+ }
+ }
+ else
+ {
+ if (inputTupleSchema.size() != 3)
+ {
+ throw new RuntimeException("Expected three fields for the node data");
+ }
+ }
+
if (inputTupleSchema.getField(0).type != DataType.INTEGER)
{
throw new RuntimeException(String.format("Expected source to be an INTEGER, but instead found %s",
@@ -331,6 +373,12 @@ public Schema outputSchema(Schema input)
{
throw new RuntimeException(String.format("Expected edges to be represented with a BAG"));
}
+
+ if (this.enableNodeBiasing && inputTupleSchema.getField(2).type != DataType.DOUBLE)
+ {
+ throw new RuntimeException(String.format("Expected node bias to be a DOUBLE, but instead found %s",
+ DataType.findTypeName(inputTupleSchema.getField(2).type)));
+ }
Schema.FieldSchema edgesFieldSchema = inputTupleSchema.getField(1);
@@ -342,6 +390,11 @@ public Schema outputSchema(Schema input)
Schema edgesTupleSchema = edgesFieldSchema.schema.getField(0).schema;
+ if (edgesTupleSchema.size() != 2)
+ {
+ throw new RuntimeException("Expected two fields for the edge data");
+ }
+
if (edgesTupleSchema.getField(0).type != DataType.INTEGER)
{
throw new RuntimeException(String.format("Expected destination edge ID to an INTEGER, but instead found %s",
@@ -72,15 +72,20 @@ protected PigTest createPigTest(String scriptPath) throws IOException
protected String getJarPath()
{
- String jarDir = "dist";
+ System.out.println("Getting jar path");
+
+ String jarDir = null;
+
if (System.getProperty("datafu.jar.dir") != null)
{
jarDir = System.getProperty("datafu.jar.dir");
}
+ else
+ {
+ jarDir = new File(System.getProperty("user.dir"), "dist").getAbsolutePath();
+ }
- String jarDirPath = new File(System.getProperty("user.dir"), jarDir).getAbsolutePath();
-
- File userDir = new File(jarDirPath);
+ File userDir = new File(jarDir);
String[] files = userDir.list(new FilenameFilter() {
@@ -92,7 +97,7 @@ public boolean accept(File dir, String name)
});
- if (files.length == 0)
+ if (files == null || files.length == 0)
{
throw new RuntimeException("Could not find JAR file");
}
@@ -193,7 +193,7 @@ public void hubAndSpokeDiskCacheTest() throws Exception {
for (Map.Entry<Integer, ArrayList<Map<String,Object>>> e : nodeEdgesMap.entrySet())
{
- graph.addEdges(e.getKey(), e.getValue());
+ graph.addNode(e.getKey(), e.getValue());
}
return nodeIdsMap;

0 comments on commit b8dc708

Please sign in to comment.