Skip to content

Commit

Permalink
Graph: Refactor graph utilities
Browse files Browse the repository at this point in the history
There are some functions (such as validation, generating grouping based query
strings etc.) that are duplicated in various files. This commit consolidates
them into generic utility functions.

Closes apache#177
  • Loading branch information
orhankislal committed Aug 31, 2017
1 parent 64c12a4 commit 5b155c0
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 134 deletions.
4 changes: 2 additions & 2 deletions src/ports/postgres/modules/graph/apsp.py_in
Expand Up @@ -31,10 +31,10 @@
import plpy
from graph_utils import validate_graph_coding
from graph_utils import get_graph_usage
from graph_utils import _grp_from_table
from graph_utils import _check_groups
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import _grp_from_table
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
Expand Down
5 changes: 3 additions & 2 deletions src/ports/postgres/modules/graph/bfs.py_in
Expand Up @@ -30,10 +30,11 @@
import plpy
from graph_utils import validate_graph_coding
from graph_utils import get_graph_usage
from graph_utils import _grp_from_table
from graph_utils import _grp_null_checks, _check_groups
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import _grp_from_table
from utilities.utilities import _grp_null_checks
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
Expand Down
36 changes: 0 additions & 36 deletions src/ports/postgres/modules/graph/graph_utils.py_in
Expand Up @@ -34,42 +34,6 @@ from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty


def _grp_null_checks(grp_list):
"""
Helper function for generating NULL checks for grouping columns
to be used within a WHERE clause
Args:
@param grp_list The list of grouping columns
"""
return ' AND '.join([" {i} IS NOT NULL ".format(**locals())
for i in grp_list])


def _check_groups(tbl1, tbl2, grp_list):
"""
Helper function for joining tables with groups.
Args:
@param tbl1 Name of the first table
@param tbl2 Name of the second table
@param grp_list The list of grouping columns
"""

return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
for i in grp_list])


def _grp_from_table(tbl, grp_list):
"""
Helper function for selecting grouping columns of a table
Args:
@param tbl Name of the table
@param grp_list The list of grouping columns
"""
return ' , '.join([" {tbl}.{i} ".format(**locals())
for i in grp_list])


def validate_output_and_summary_tables(model_out_table, module_name,
out_table=None):
"""
Expand Down
128 changes: 68 additions & 60 deletions src/ports/postgres/modules/graph/pagerank.py_in
Expand Up @@ -30,6 +30,8 @@
import plpy
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import _grp_from_table
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
Expand Down Expand Up @@ -128,7 +130,8 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
edge_temp_table = unique_string(desp='temp_edge')
grouping_cols_comma = grouping_cols + ',' if grouping_cols else ''
distribution = ('' if is_platform_pg() else
"DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma, dest))
"DISTRIBUTED BY ({0}{1})".format(
grouping_cols_comma, dest))
plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
plpy.execute("""
CREATE TEMP TABLE {edge_temp_table} AS
Expand All @@ -139,7 +142,8 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
# GPDB and HAWQ have distributed by clauses to help them with indexing.
# For Postgres we add the index explicitly.
if is_platform_pg():
plpy.execute("CREATE INDEX ON {0}({1})".format(edge_temp_table, src))
plpy.execute("CREATE INDEX ON {0}({1})".format(
edge_temp_table, src))

# Intermediate tables required.
cur = unique_string(desp='cur')
Expand All @@ -154,10 +158,14 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
cur_distribution = cnts_distribution = ''
else:
cur_distribution = cnts_distribution = \
"DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma, vertex_id)
cur_join_clause = "{edge_temp_table}.{dest} = {cur}.{vertex_id}".format(**locals())
out_cnts_join_clause = "{out_cnts}.{vertex_id} = {edge_temp_table}.{src}".format(**locals())
v1_join_clause = "{v1}.{vertex_id} = {edge_temp_table}.{src}".format(**locals())
"DISTRIBUTED BY ({0}{1})".format(
grouping_cols_comma, vertex_id)
cur_join_clause = """{edge_temp_table}.{dest} = {cur}.{vertex_id}
""".format(**locals())
out_cnts_join_clause = """{out_cnts}.{vertex_id} =
{edge_temp_table}.{src} """.format(**locals())
v1_join_clause = """{v1}.{vertex_id} = {edge_temp_table}.{src}
""".format(**locals())

random_probability = (1.0 - damping_factor) / n_vertices
######################################################################
Expand Down Expand Up @@ -201,18 +209,18 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
random_prob = unique_string(desp='rand')
subq = unique_string(desp='subquery')
rand_damp = 1 - damping_factor
grouping_where_clause = ' AND '.join(
[distinct_grp_table + '.' + col + '=' + subq + '.' + col
for col in grouping_cols_list])
group_by_clause = ', '.join([distinct_grp_table + '.' + col
for col in grouping_cols_list])
grouping_where_clause = _check_groups(
distinct_grp_table, subq, grouping_cols_list)
group_by_clause = _grp_from_table(
distinct_grp_table, grouping_cols_list)
# Find number of vertices in each group, this is the normalizing factor
# for computing the random_prob
plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
SELECT {distinct_grp_table}.*,
1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
{rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS {random_prob}
{rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION
AS {random_prob}
FROM {distinct_grp_table} INNER JOIN (
SELECT {grouping_cols}, {src} AS __vertices__
FROM {edge_temp_table}
Expand All @@ -223,15 +231,15 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
GROUP BY {group_by_clause}
""".format(**locals()))

grouping_where_clause = ' AND '.join(
[vertices_per_group + '.' + col + '=' + subq + '.' + col
for col in grouping_cols_list])
group_by_clause = ', '.join([vertices_per_group + '.' + col
for col in grouping_cols_list])
grouping_where_clause = _check_groups(
vertices_per_group, subq, grouping_cols_list)
group_by_clause = _grp_from_table(
vertices_per_group, grouping_cols_list)

plpy.execute("""
CREATE TEMP TABLE {cur} AS
SELECT {group_by_clause}, {subq}.__vertices__ as {vertex_id},
{init_pr} AS pagerank
SELECT {group_by_clause}, {subq}.__vertices__
AS {vertex_id}, {init_pr} AS pagerank
FROM {vertices_per_group} INNER JOIN (
SELECT {grouping_cols}, {src} AS __vertices__
FROM {edge_temp_table}
Expand All @@ -253,35 +261,32 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
""".format(grouping_cols_select=grouping_cols + ','
if grouping_cols else '', **locals()))

message_grp = ' AND '.join(
["{cur}.{col}={message}.{col}".format(**locals())
for col in grouping_cols_list])
cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
for col in grouping_cols_list])
out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND '.join(
["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
for col in grouping_cols_list])
v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
for col in grouping_cols_list])
vpg_join_clause = ' AND '.join(
["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
for col in grouping_cols_list])
vpg_t1_join_clause = ' AND '.join(
["__t1__.{col}={vpg}.{col}".format(**locals())
for col in grouping_cols_list])
message_grp = _check_groups(cur, message, grouping_cols_list)
cur_join_clause = cur_join_clause + ' AND ' + _check_groups(
cur, edge_temp_table, grouping_cols_list)

out_cnts_join_clause = out_cnts_join_clause + ' AND ' \
+ _check_groups( out_cnts, edge_temp_table, grouping_cols_list)

v1_join_clause = v1_join_clause + ' AND ' + _check_groups(
v1, edge_temp_table, grouping_cols_list)

vpg_join_clause = _check_groups(
vpg, edge_temp_table, grouping_cols_list)
vpg_t1_join_clause = _check_groups(
vpg, '__t1__', grouping_cols_list)

# join clause specific to populating random_prob for nodes without any
# incoming edges.
edge_grouping_cols_select = ', '.join(
["{edge_temp_table}.{col}".format(**locals())
for col in grouping_cols_list])
cur_grouping_cols_select = ', '.join(
["{cur}.{col}".format(**locals()) for col in grouping_cols_list])
edge_grouping_cols_select = _grp_from_table(
edge_temp_table, grouping_cols_list)
cur_grouping_cols_select = _grp_from_table(cur, grouping_cols_list)

# Create output summary table:
cols_names_types = get_cols_and_types(edge_table)
grouping_cols_clause = ', '.join([c_name + " " + c_type
for (c_name, c_type) in cols_names_types
for (c_name, c_type)
in cols_names_types
if c_name in grouping_cols_list])
plpy.execute("""
CREATE TABLE {summary_table} (
Expand Down Expand Up @@ -310,12 +315,14 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
# Strings required for the main PageRank computation query
grouping_cols_select_pr = edge_grouping_cols_select + ', '
random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
vertices_per_group_inner_join_pr = """INNER JOIN {vertices_per_group}
vertices_per_group_inner_join_pr = """
INNER JOIN {vertices_per_group}
AS {vpg} ON {vpg_join_clause}""".format(**locals())
ignore_group_clause_pr = ' WHERE ' + get_ignore_groups(summary_table,
edge_temp_table, grouping_cols_list)
ignore_group_clause_pr = ' WHERE ' + get_ignore_groups(
summary_table, edge_temp_table, grouping_cols_list)
ignore_group_clause_ins_noincoming = ' WHERE ' + get_ignore_groups(
summary_table, nodes_with_no_incoming_edges, grouping_cols_list)
summary_table, nodes_with_no_incoming_edges,
grouping_cols_list)
# Strings required for updating PageRank scores of vertices that have
# no incoming edges
grouping_cols_select_ins = cur_grouping_cols_select + ','
Expand All @@ -324,15 +331,15 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
**locals())
message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
ignore_group_clause_ins = ' AND ' + get_ignore_groups(summary_table,
cur, grouping_cols_list)
ignore_group_clause_ins = ' AND ' + get_ignore_groups(
summary_table, cur, grouping_cols_list)
# Strings required for convergence test query
grouping_cols_select_conv = cur_grouping_cols_select
group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
cur_grouping_cols_select)
message_grp_clause_conv = '{0} AND '.format(message_grp)
ignore_group_clause_conv = ' AND ' + get_ignore_groups(summary_table,
cur, grouping_cols_list)
ignore_group_clause_conv = ' AND ' + get_ignore_groups(
summary_table, cur, grouping_cols_list)
limit = ''

# Find all nodes, in each group, that have no incoming edges. The PageRank
Expand All @@ -350,11 +357,12 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
FROM {edge_temp_table} AS __t2__
WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause}
) {vpg_where_clause_ins}
""".format(select_group_cols=','.join(['__t1__.{0}'.format(col)
for col in grouping_cols_list]),
where_group_clause=' AND '.join(['__t1__.{0}=__t2__.{0}'.format(col)
for col in grouping_cols_list]),
**locals()))
""".format(
select_group_cols =
_grp_from_table('__t1__', grouping_cols_list),
where_group_clause=
_check_groups('__t1__', '__t2__', grouping_cols_list),
**locals()))
else:
# cur and out_cnts tables can be simpler when no grouping is involved.
init_value = 1.0 / n_vertices
Expand Down Expand Up @@ -416,7 +424,8 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
# that have converged.
plpy.execute("""
CREATE TABLE {message} AS
SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS {vertex_id},
SELECT {grouping_cols_select_pr}
{edge_temp_table}.{dest} AS {vertex_id},
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob} AS pagerank
FROM {edge_temp_table}
INNER JOIN {cur} ON {cur_join_clause}
Expand Down Expand Up @@ -576,8 +585,7 @@ def get_ignore_groups(first_table, second_table, grouping_cols_list):
This function generates the necessary clause to only select the
groups that appear in second_table and not in first_table.
"""
second_table_cols = ', '.join(["{0}.{1}".format(second_table, col)
for col in grouping_cols_list])
second_table_cols = _grp_from_table(second_table, grouping_cols_list)
grouping_cols = ', '.join([col for col in grouping_cols_list])
return """({second_table_cols}) NOT IN
(SELECT {grouping_cols}
Expand All @@ -601,7 +609,7 @@ def pagerank_help(schema_madlib, message, **kwargs):
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
help_string = get_graph_usage(schema_madlib, 'PageRank',
"""out_table TEXT, -- Name of the output table for PageRank
"""out_table TEXT, -- Name of the output table for PageRank
damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model
-- (DEFAULT = 0.85)
max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100)
Expand Down
4 changes: 2 additions & 2 deletions src/ports/postgres/modules/graph/sssp.py_in
Expand Up @@ -30,11 +30,11 @@
import plpy
from graph_utils import validate_graph_coding
from graph_utils import get_graph_usage
from graph_utils import _grp_from_table
from graph_utils import _check_groups
from utilities.control import MinWarning

from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import _grp_from_table
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
Expand Down

0 comments on commit 5b155c0

Please sign in to comment.