Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 158 lines (133 sloc) 5.55 kB
2bd250d Added pagerank example
Gabor Szabo authored
1 #
2 # Copyright 2011 Twitter, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15
93b2573 Cleaned up pagerank
Gabor Szabo authored
16 """Calculates PageRank for a given graph.
2bd250d Added pagerank example
Gabor Szabo authored
17
93b2573 Cleaned up pagerank
Gabor Szabo authored
18 We assume that there are no dangling pages with no outgoing links.
19 """
2bd250d Added pagerank example
Gabor Szabo authored
20
93b2573 Cleaned up pagerank
Gabor Szabo authored
21 import os
22 from pycascading.helpers import *
2bd250d Added pagerank example
Gabor Szabo authored
23
24
25 def test(graph_file, d, iterations):
898ad13 Examples rewritten using the new operators and decorators
Gabor Szabo authored
26 """This is the Python implementation of PageRank."""
2bd250d Added pagerank example
Gabor Szabo authored
27 in_links = {}
28 out_degree = {}
29 pagerank = {}
30 file = open(graph_file)
31 for line in file:
32 (source, dest) = line.rstrip().split()
33 try:
34 in_links[dest].add(source)
35 except KeyError:
36 in_links[dest] = set(source)
37 try:
38 out_degree[source] += 1
39 except KeyError:
40 out_degree[source] = 1
41 pagerank[source] = 1.0
42 pagerank[dest] = 1.0
43 file.close()
44 old_pr = pagerank
45 new_pr = {}
46 for iteration in xrange(0, iterations):
47 for node in old_pr:
93b2573 Cleaned up pagerank
Gabor Szabo authored
48 new_pr[node] = (1 - d)
2bd250d Added pagerank example
Gabor Szabo authored
49 try:
93b2573 Cleaned up pagerank
Gabor Szabo authored
50 new_pr[node] += \
51 d * sum([old_pr[n] / out_degree[n] for n in in_links[node]])
2bd250d Added pagerank example
Gabor Szabo authored
52 except KeyError:
53 pass
54 tmp = old_pr
55 old_pr = new_pr
56 new_pr = tmp
93b2573 Cleaned up pagerank
Gabor Szabo authored
57 return old_pr
58
2bd250d Added pagerank example
Gabor Szabo authored
59
60 def main():
64d0ecd Added comments to pagerank
Gabor Szabo authored
61 """The PyCascading job."""
93b2573 Cleaned up pagerank
Gabor Szabo authored
62 # The damping factor
63 d = 0.85
64 # The number of iterations
2bd250d Added pagerank example
Gabor Szabo authored
65 iterations = 5
93b2573 Cleaned up pagerank
Gabor Szabo authored
66
67 # The directed, unweighted graph in a space-separated file, in
68 # <source_node> <destination_node> format
69 graph_file = 'pycascading_data/graph.txt'
2bd250d Added pagerank example
Gabor Szabo authored
70
71 graph_source = Hfs(TextDelimited(Fields(['from', 'to']), ' ',
93b2573 Cleaned up pagerank
Gabor Szabo authored
72 [String, String]), graph_file)
2bd250d Added pagerank example
Gabor Szabo authored
73
74 out_links_file = 'pycascading_data/out/pagerank/out_links'
75 pr_values_1 = 'pycascading_data/out/pagerank/iter1'
76 pr_values_2 = 'pycascading_data/out/pagerank/iter2'
77
78 # Some setup here: we'll need the ougoing degree of nodes, and we will
79 # initialize the pageranks of nodes to 1.0
80 flow = Flow()
81 graph = flow.source(graph_source)
82
83 # Count the number of outgoing links for every node that is a source,
84 # and store it in a field called 'out_degree'
543bf90 Changed count in pagerank to native count
Gabor Szabo authored
85 graph | group_by('from') | native.count('out_degree') | \
2bd250d Added pagerank example
Gabor Szabo authored
86 flow.binary_sink(out_links_file)
87
88 # Initialize the pageranks of all nodes to 1.0
89 # This file has fields 'node' and 'pagerank', and is stored to pr_values_1
90 @udf
91 def constant(tuple, c):
92 """Just a field with a constant value c."""
93 yield [c]
94 @udf
95 def both_nodes(tuple):
96 """For each link returns both endpoints."""
97 yield [tuple.get(0)]
98 yield [tuple.get(1)]
99 graph | map_replace(both_nodes, 'node') | \
100 native.unique(Fields.ALL) | map_add(constant(1.0), 'pagerank') | \
101 flow.binary_sink(pr_values_1)
102
103 flow.run(num_reducers=1)
104
105 pr_input = pr_values_1
106 pr_output = pr_values_2
107 for iteration in xrange(0, iterations):
108 flow = Flow()
109
110 graph = flow.source(graph_source)
111 pageranks = flow.meta_source(pr_input)
112 out_links = flow.meta_source(out_links_file)
113
114 # Decorate the graph's source nodes with their pageranks and the
115 # number of their outgoing links
116 # We could have joined graph & out_links outside of the loop, but
117 # in order to demonstrate joins with multiple streams, we do it here
118 p = (graph & pageranks & (out_links | rename('from', 'from_out'))) | \
119 inner_join(['from', 'node', 'from_out']) | \
120 rename(['pagerank', 'out_degree'], ['from_pagerank', 'from_out_degree']) | \
121 retain('from', 'from_pagerank', 'from_out_degree', 'to')
122
123 # Distribute the sources' pageranks to their out-neighbors equally
124 @udf
125 def incremental_pagerank(tuple, d):
126 yield [d * tuple.get('from_pagerank') / tuple.get('from_out_degree')]
898ad13 Examples rewritten using the new operators and decorators
Gabor Szabo authored
127 p = p | map_replace(['from', 'from_pagerank', 'from_out_degree'],
128 incremental_pagerank(d), 'incr_pagerank') | \
2bd250d Added pagerank example
Gabor Szabo authored
129 rename('to', 'node') | retain('node', 'incr_pagerank')
130
131 # Add the constant jump probability to all the pageranks that come
132 # from the in-links
133 p = (p & (pageranks | map_replace('pagerank', constant(1.0 - d), 'incr_pagerank'))) | group_by()
134 p = p | group_by('node', 'incr_pagerank', native.sum('pagerank'))
135
136 if iteration == iterations - 1:
137 # Only store the final result in a TSV file
138 p | flow.tsv_sink(pr_output)
139 else:
140 # Store intermediate results in a binary format for faster IO
141 p | flow.binary_sink(pr_output)
142
143 # Swap the input and output folders for the next iteration
144 tmp = pr_input
145 pr_input = pr_output
146 pr_output = tmp
147
148 flow.run(num_reducers=1)
149
93b2573 Cleaned up pagerank
Gabor Szabo authored
150 print 'Results from PyCascading:', pr_input
151 os.system('cat %s/.pycascading_header %s/part*' % (pr_input, pr_input))
152
153 print 'The test values:'
23e16db Small change in pagerank
Gabor Szabo authored
154 test_pr = test(graph_file, d, iterations)
93b2573 Cleaned up pagerank
Gabor Szabo authored
155 print 'node\tpagerank'
156 for n in sorted(test_pr.iterkeys()):
157 print '%s\t%g' % (n, test_pr[n])
Something went wrong with that request. Please try again.