In [None]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
import time

spark = SparkSession.builder.getOrCreate()

In [None]:
datadir = "/opt/lsde/dataset-sf100-bidirectional/"

In [None]:
def reorg(datadir):
    t0 = time.monotonic()
    
    # nothing here (yet)
    
    t1 = (time.monotonic() - t0)*1000
    print(f"Reorg time: {t1:.4f} ms")

In [None]:
def cruncher(datadir, a1, a2, a3, a4, d1, d2):
    t0 = time.monotonic()
    
    person   = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").load(datadir + "/person*.csv*")
    interest = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").load(datadir + "/interest*.csv*")
    knows    = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").load(datadir + "/knows*.csv*")

    # find p1 candidates
    p1 = interest \
        .where(col("interest") == a1) \
        .select(col("personId")) \
        .join(person, on="personId") \
        .withColumn("bday", month(col("birthday"))*100 + dayofmonth(col("birthday"))) \
        .filter((d1 <= col("bday")) & (col("bday") <= d2))

    # precompute scores of p2/p3 and filter 
    scores = interest \
        .where(col("interest").isin(a2, a3, a4)) \
        .groupBy("personId") \
        .agg(count("interest").alias("score")) \
        .filter(col("score") >= 2) \
        .join(interest.filter(col("interest") == a1), on="personId", how="left_anti") \
        .select("personId", "score")

    # to avoid confusion with person.csv's header and with p1/p2/3,
    # we rename the header of knows to pA and pB (person A and person B)
    same_city_knows = knows \
        .select(col("personId").alias("pA"), col("friendId").alias("pB")) \
        .join(person, on=(col("pA") == col("personId"))) \
        .select("pA", "pB", col("locatedIn").alias("pAcity")) \
        .join(person, on=(col("pB") == col("personId"))) \
        .select("pA", "pB", "pAcity", col("locatedIn").alias("pBcity")) \
        .filter(col("pAcity") == col("pBcity"))

    # find p1-p2 candidates
    p1p2 = p1 \
        .join(same_city_knows, col("personId") == col("pA")) \
        .select(col("pA").alias("p1"), col("pB").alias("p2")) \
        .join(scores, col("p2") == col("personId")) \
        .select("p1", "p2", col("score").alias("p2score"))

    # find p1-p2-p3 triangles
    p1p2p3 = p1p2 \
        .join(same_city_knows, (col("p2") == col("pA"))) \
        .select("p1", "p2", "p2score", col("pB").alias("p3")) \
        .filter(col("p2") < col("p3")) \
        .join(scores, col("p3") == col("personId")) \
        .select("p1", "p2", "p3", "p2score", col("score").alias("p3score")) \
        .join(same_city_knows, (col("p1") == col("pA")) & (col("p3") == col("pB")))

    # sort to produce final results
    result = p1p2p3 \
        .selectExpr("p2score + p3score AS score", "p1", "p2", "p3") \
        .orderBy(desc("score"), asc("p1"), asc("p2"), asc("p3"))

    
    t1 = (time.monotonic() - t0)*1000
    print(f"Cruncher time: {t1:.4f} ms")
    
    return result

In [None]:
params = [
    {"q":  1, "a1": 1989, "a2":  1990, "a3": 5183, "a4": 1749, "d1":  409, "d2":  509},
    {"q":  2, "a1": 2788, "a2":   568, "a3": 2820, "a4": 6945, "d1":  730, "d2":  829},
    {"q":  3, "a1":  775, "a2":  2008, "a3": 1022, "a4":    8, "d1":  827, "d2":  926},
    {"q":  4, "a1": 2788, "a2":  1989, "a3": 1023, "a4": 7380, "d1":  924, "d2": 1024},
    {"q":  5, "a1":  139, "a2":  2837, "a3":  808, "a4": 7509, "d1":  423, "d2":  523},
    {"q":  6, "a1":  468, "a2":  2812, "a3": 9474, "a4": 2803, "d1":  819, "d2":  918},
    {"q":  7, "a1":  780, "a2": 12002, "a3": 1180, "a4": 2870, "d1": 1126, "d2": 1226},
    {"q":  8, "a1": 1993, "a2":  9141, "a3": 6704, "a4": 2800, "d1":  910, "d2": 1010},
    {"q":  9, "a1": 2794, "a2":  1989, "a3": 1023, "a4": 7380, "d1":  924, "d2": 1024},
    {"q": 10, "a1": 1178, "a2":  2059, "a3": 1179, "a4": 1985, "d1":  806, "d2":  905}
]

# by default, only test on first param
params = params[0:1]

for p in params:
    df = cruncher(datadir, p["a1"], p["a2"], p["a3"], p["a4"], p["d1"], p["d2"])
    df = df.withColumn("q", lit(p["q"])).select("q", "p1", "p2", "p3", "score")
    df.show()
