# Imports

In [None]:
import pip
import json
import pandas as pd

try:
    import psycopg2
except:
    pip.main(['install','psycopg2'])
    import psycopg2

try:
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.context import SparkContext
except:
    pip.main(['install','pyspark'])
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.context import SparkContext

# Gets AWS credentials

In [None]:
credentials_file = open("./awscredentials.json")
aws_creds = json.load(credentials_file)

# Refresh the Materialized Views

In [20]:
# Refresh the materialized views
try:
    conn = psycopg2.connect(host=aws_creds["ENDPOINT"], port=aws_creds["PORT"],
                            database=aws_creds["DBNAME"], user=aws_creds["USR"],
                            password=aws_creds["password"])
    cur = conn.cursor()
    cur.execute("DROP TABLE IF EXISTS pagerank_results")
    cur.close()
    conn.commit()
    conn.close()
except Exception as e:
    print("Database connection failed due to {}".format(e))

# Define PageRank Queries

In [4]:
# Queries

cleaned_links_url_query = '''
    WITH non_existent_urls AS (
        SELECT links_url.source, links_url.dest
        FROM links_url
        WHERE links_url.source IN (SELECT url FROM urls) OR links_url.dest IN (SELECT url FROM urls)
    ), no_self_loops AS (
        SELECT non_existent_urls.source, non_existent_urls.dest
        FROM non_existent_urls
        WHERE non_existent_urls.source <> non_existent_urls.dest
    ), sinks AS (
        SELECT no_self_loops.dest AS sink
        FROM no_self_loops
        WHERE no_self_loops.dest NOT IN ( 
            SELECT DISTINCT no_self_loops_1.source
            FROM no_self_loops no_self_loops_1
        )
    ), back_edges AS (
        SELECT n.dest AS source,
        n.source AS dest
        FROM no_self_loops n JOIN sinks s ON n.dest = s.sink
    )
    SELECT back_edges.source, back_edges.dest
    FROM back_edges
        UNION
    SELECT no_self_loops.source, no_self_loops.dest
    FROM no_self_loops'''

out_weights_query = '''
    SELECT cleaned_links_url.source, 1.0 / COUNT(*) AS out_weight
    FROM cleaned_links_url
    GROUP BY cleaned_links_url.source'''

initialize_ranks_query = '''
    SELECT DISTINCT(source) AS node, 1 AS rank
    FROM cleaned_links_url'''

curr_iter_ranks_query = '''
    SELECT l.dest AS node, SUM(o.out_weight * p.rank) * 0.85 + 0.15 AS rank
    FROM cleaned_links_url l JOIN out_weights o ON l.source = o.source 
            JOIN prev_iter_ranks p ON l.source = p.node
    GROUP BY l.dest'''

max_diff_query = '''
    SELECT MAX(ABS(p.rank - c.rank)) AS max_diff
    FROM prev_iter_ranks p JOIN curr_iter_ranks c ON p.node = c.node'''

id_normalization_query = '''
    WITH min_rank AS (
        SELECT MIN(rank) AS min_rank
        FROM curr_iter_ranks
    ), max_minus_min AS (
        SELECT MAX(rank) - (SELECT * FROM min_rank) AS max_min_diff
        FROM curr_iter_ranks
    ) 
    SELECT u.id AS id, (c.rank - (SELECT * FROM min_rank)) / ((SELECT * FROM max_minus_min)) AS rank
    FROM curr_iter_ranks c JOIN urls u ON u.url=c.node'''

# Initialize Spark

In [5]:
jdbcUrl = "jdbc:postgresql://" + aws_creds["ENDPOINT"] + ":" + aws_creds["PORT"] + "/" + \
                    aws_creds["DBNAME"] + "?user=" + aws_creds["USR"] + "&password=" + aws_creds["password"]

spark = SparkSession.builder \
        .appName('PageRank') \
        .master('local[*]') \
        .config("spark.driver.extraClassPath", "postgresql-42.2.20.jar") \
        .getOrCreate()

# Actual PageRank

In [6]:
# Load data from RDS
cleaned_links_url_sdf = spark.read.format("jdbc") \
        .option("url", jdbcUrl) \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "cleaned_links_url") \
        .load()

cleaned_links_url_sdf.createOrReplaceTempView("cleaned_links_url")

out_weights_sdf = spark.read.format("jdbc") \
        .option("url", jdbcUrl) \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "out_weights") \
        .load()

out_weights_sdf.createOrReplaceTempView("out_weights")

In [7]:
# Actual PageRank
prev_iter_ranks_sdf = spark.sql(initialize_ranks_query)
prev_iter_ranks_sdf.createOrReplaceTempView("prev_iter_ranks")

convergence = False
i = 0
while not convergence or i == 10000:
    curr_iter_ranks_sdf = spark.sql(curr_iter_ranks_query)
    curr_iter_ranks_sdf.createOrReplaceTempView("curr_iter_ranks")

    max_diff_df = spark.sql(max_diff_query).toPandas()
    max_diff_result = pd.read_json(max_diff_df.to_json())

    if max_diff_result["max_diff"][0] < 0.1:
        convergence = True
        curr_iter_ranks_sdf.write.format("jdbc") \
                .option("url", jdbcUrl) \
                .option("driver", "org.postgresql.Driver") \
                .option("dbtable", "pagerank_results") \
                .save()
    else:
        curr_iter_ranks_sdf.createOrReplaceTempView("prev_iter_ranks")
        i += 1