-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
adding pagerank implementation with convergence criteria
- Loading branch information
1 parent
9ea5e42
commit 54c77c1
Showing
1 changed file
with
58 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -0,0 +1,58 @@ | |||
#!/usr/bin/python | |||
from org.apache.pig.scripting import * | |||
|
|||
P = Pig.compile(""" | |||
previous_pagerank = | |||
LOAD '$docs_in' | |||
AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } ); | |||
outbound_pagerank = | |||
FOREACH previous_pagerank | |||
GENERATE | |||
pagerank / COUNT ( links ) AS pagerank, | |||
FLATTEN ( links ) AS to_url; | |||
new_pagerank = | |||
FOREACH | |||
( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER ) | |||
GENERATE | |||
group AS url, | |||
( 1 - $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank, | |||
FLATTEN ( previous_pagerank.links ) AS links, | |||
FLATTEN ( previous_pagerank.pagerank ) AS previous_pagerank; | |||
pagerank_diff = FOREACH new_pagerank GENERATE ABS ( previous_pagerank - pagerank ); | |||
max_diff = | |||
FOREACH | |||
( GROUP pagerank_diff ALL ) | |||
GENERATE | |||
MAX ( pagerank_diff ); | |||
STORE new_pagerank | |||
INTO '$docs_out'; | |||
STORE max_diff | |||
INTO '$max_diff'; | |||
""") | |||
|
|||
d = 0.5 | |||
docs_in= "data/pagerank_data_simple" | |||
|
|||
for i in range(10): | |||
docs_out = "out/pagerank_data_" + str(i + 1) | |||
max_diff = "out/max_diff_" + str(i + 1) | |||
Pig.fs("rmr " + docs_out) | |||
Pig.fs("rmr " + max_diff) | |||
stats = P.bind().runSingle() | |||
if not stats.isSuccessful(): | |||
raise 'failed' | |||
max_diff_value = float(str(stats.result("max_diff").iterator().next().get(0))) | |||
print " max_diff_value = " + str(max_diff_value) | |||
if max_diff_value < 0.01: | |||
print "done at iteration " + str(i) | |||
break | |||
docs_in = docs_out | |||
|
|||
|