In [None]:
# df.repartition(2).select(F.spark_partition_id().alias("pid")).collect()

# Подготовка

## Настройка Spark

In [None]:
import sys

from collections import namedtuple
from typing import NamedTuple, Callable
from enum import StrEnum, verify, UNIQUE
from dataclasses import dataclass
from random import randrange

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame, Column

In [None]:
spark = (
    SparkSession
        .builder
        .appName("graphs")
        .master("local[4]")
        .config("spark.sql.warehouse.dir", "/tmp/warehouse")
        .getOrCreate()
)
sc = spark.sparkContext
sc.setCheckpointDir("/tmp/plan/checkpoint")

## Настройка графов

In [None]:
class Graph(NamedTuple):
    edges: DataFrame
    vertices: DataFrame

In [None]:
def graph() -> Graph:
    start = 0
    end = 500
    edges = []
    for x in range(1000):
        edges += [ (x, x + 1, randrange(10)) for x in range(start, end) ] + [ (randrange(start, end), randrange(start, end)) for _ in range(1000) ]
        start += 1000
        end += 1000
    edges_df = (
        spark
            .createDataFrame(edges)
            .toDF("src", "dst", "weight")
    )

    vertices = set()

    for x, y in edges:
        vertices.add(x)
        vertices.add(y)

    vertices_df = (
        spark
            .createDataFrame([(x,) for x in vertices])
            .toDF("id")
    )
    return Graph(edges_df, vertices_df)

In [None]:
def small_graph() -> Graph:
    verties = [ (x,) for x in range(1, 7) ]
    verties_df = (
        spark
            .createDataFrame(verties)
            .toDF("id")
    )
    edges = [ (1, 2, 1), (1, 3, 5), (2, 3, 1), (3, 4, 1), (4, 5, 1), (6, 6, 1) ]
    edges_df = (
        spark
            .createDataFrame(edges)
            .toDF("src", "dst", "weight")
    )
    return Graph(edges_df, verties_df)

In [None]:
from datetime import datetime
def timer(handler):
    start = datetime.now()
    handler()
    print(datetime.now() - start)

In [None]:
@dataclass
class Scope:
    @verify(UNIQUE)
    class Direction(StrEnum):
        IN = "IN"
        OUT = "OUT"
        INOUT = "INOUT"

    def __init__(self, edges: DataFrame):
        self.inward = edges_df.withColumnRenamed("dst", "id").withColumnRenamed("src", "nbr").withColumn("dir", F.lit("IN"))
        self.outward = edges_df.withColumnRenamed("src", "id").withColumnRenamed("dst", "nbr").withColumn("dir", F.lit("OUT"))
        self.all_nbr = self.outward.union(self.inward)

    inward: DataFrame
    outward: DataFrame
    all_nbr: DataFrame

    def __call__(self, dir: 'Scope.Direction') -> DataFrame:
        match dir:
            case Scope.Direction.IN:
                return self.inward
            case Scope.Direction.OUT:
                return self.outward
            case Scope.Direction.INOUT:
                return self.all_nbr
            case _:
                raise ValueError(f"Unknown {dir}")

    def __iter__(self):
        yield self.inward
        yield self.outward
        yield self.all_nbr
        

# Компоненты связности - Connected Components

## Классический алгоритм

### Подготовка данных

In [None]:
edges_df, vertices_df = graph()

In [None]:
edges = (
    edges_df
        .groupBy(col('src'))
        .agg(F.collect_list('dst').alias('dsts'))
        .collect()
)
edges = { src: dsts for src, dsts in edges }

In [None]:
vertices = { row.id: row for row in vertices_df.collect() }

### Вычисление

In [None]:
visited = set()
def dfs_cc(k: int, current_component: int, components: dict) -> None:
    global edges
    global visited

    visited.add(k)
    if current_component in components:
        components[current_component].append(k)
    else:
        components[current_component] = [k]

    if k not in edges: return
    for v in edges[k]:
        if v in visited: continue
        dfs_cc(v, current_component, components)

def run_cc() -> dict:
    global vertices
    global visited
    component = 0
    components = {}
    for k in vertices.keys():
        if k in visited: continue
        dfs_cc(k, component, components)
        component += 1

    return components

In [None]:
classic_cc_result = run_cc()

## Pregel

### Подготовка данных

In [None]:
edges_df, vertices_df = graph()

In [None]:
vertices_df.printSchema()

In [None]:
edges_df.printSchema()

In [None]:
cc_df = vertices_df.select(
    col("id"),
    col("id").alias("value")
)
cc_df.printSchema()

### Идея

In [None]:
edges_df.createOrReplaceTempView("edges")
cc_df.createOrReplaceTempView("vertices")
spark.sql("""
WITH messages AS (
    SELECT e.dst id
         , MIN(v.value) message
      FROM edges e
      JOIN vertices v
        ON (e.src = v.id)
     GROUP BY e.dst
),
superstep AS (
    SELECT id
         , LEAST(message, value) value
      FROM vertices v
      LEFT JOIN messages m
     USING (id)
)
SELECT *
  FROM superstep
""")

### Реализация

In [None]:
for i in range(15):
    cc_df = (
        edges_df.join(cc_df, col("src") == col("id"))
            .select(col("dst").alias("id"), col("value"))
            .groupby(col("id")).agg(F.min("value").alias("message"))
            .join(cc_df, "id", "right")
            .select("id", F.coalesce("message", "value").alias("message"), "value")
            .select("id", F.least("message", "value").alias("value"))
    )
    if i % 5 == 0:
        cc_df = cc_df.checkpoint()

In [None]:
cc_df.groupBy("value").count().count()

## GraphLab

### Подготовка данных

In [None]:
edges_df, vertices_df = graph()

In [None]:
vertices_df.printSchema()

In [None]:
edges_df.printSchema()

In [None]:
outward = edges_df.select(col("src").alias("id"), col("dst").alias("nbr"), F.lit("OUT").alias("dir"))
inward = edges_df.select(col("dst").alias("id"), col("src").alias("nbr"), F.lit("IN").alias("dir"))
scope_df = outward.union(inward)

In [None]:
gl_cc_result_df = vertices_df.select(col("id"), col("id").alias("cc"))

In [None]:
for i in range(10):
    min_nbr_df = (
        scope_df.alias("s")
            .join(gl_cc_result_df.alias("r"), col("s.nbr") == col("r.id"))
            .groupBy(col("s.id"))
            .agg(F.min("cc").alias("acc_cc"))
    )

    gl_cc_result_df = (
        min_nbr_df.alias("a")
            .join(gl_cc_result_df.alias("r"), "id", "right")
            .select("id", F.least(col("r.cc"), col("a.acc_cc")).alias("cc"))
    )

    if i % 5 == 0:
        gl_cc_result_df = gl_cc_result_df.checkpoint()

In [None]:
gl_cc_result_df.groupBy("cc").count().count()

### Реализация

In [None]:
for i in range(10):
    # list outward nodes
    outward_cc_df = (
        edges_df.join(result_df, col("dst") == col("id"))
            .select(
                col("src").alias("id"),
                col("dst"),
                col("value"))
    )
    # list inward nodes
    inward_cc_df = (
        edges_df.join(result_df, col("src") == col("id"))
            .select(
                col("dst").alias("id"),
                col("src"),
                col("value"))
    )
    inward_result_df = (
        inward_cc_df
            .groupBy("id")
            .agg(F.min("value").alias("min_in_cc"))
    )
    outward_result_df = (
        outward_cc_df
            .groupBy("id")
            .agg(F.min("value").alias("min_out_cc"))
    )
    result_df = (
        result_df
            .join(inward_result_df, "id", "left")
            .join(outward_result_df, "id", "left")
            .select("id", F.least("value", "min_in_cc", "min_out_cc").alias("value"))
    )
    if i % 5 == 0:
        result_df = result_df.checkpoint()

In [None]:
result_df.groupBy("value").count().count()

In [None]:
from datetime import datetime
start = datetime.now()

for i in range(10):
    # list outward nodes
    outward_cc_df = (
        edges_df.join(result_df, col("dst") == col("id"))
            .select(
                col("src").alias("id"),
                col("dst"),
                col("value"))
    )
    # list inward nodes
    inward_cc_df = (
        edges_df.join(result_df, col("src") == col("id"))
            .select(
                col("dst").alias("id"),
                col("src"),
                col("value"))
    )
    inward_result_df = (
        inward_cc_df
            .groupBy("id")
            .agg(F.min("value").alias("min_in_cc"))
    )
    outward_result_df = (
        outward_cc_df
            .groupBy("id")
            .agg(F.min("value").alias("min_out_cc"))
    )
    result_df = (
        result_df
            .join(inward_result_df, "id", "left")
            .join(outward_result_df, "id", "left")
            .select("id", F.least("value", "min_in_cc", "min_out_cc").alias("value"))
    )
    if i % 5 == 0:
        result_df = result_df.checkpoint()

print(result_df.groupBy("value").count().count())
print(datetime.now() - start)

## PowerGraph

In [None]:
edges_df, vertices_df = small_graph()

In [None]:
vertices_df.printSchema()

In [None]:
edges_df.printSchema()

In [None]:
scope = Scope(edges_df)

In [None]:
pg_cc_result_df = vertices_df.select(col("id"), col("id").alias("cc"))
all_nbr = scope.all_nbr

for i in range(8):
    gather_df = (
        all_nbr
            .join(
                pg_cc_result_df,
                pg_cc_result_df.id == all_nbr.nbr,
                "right")
            .select(all_nbr.id, pg_cc_result_df.cc.alias("nbr_cc"))
    )
    sum_df = gather_df.groupBy("id").agg(F.min("nbr_cc").alias("acc_cc"))
    apply_df = (
        sum_df
            .join(pg_cc_result_df, "id", "right")
            .select(col("id"), F.least("acc_cc", "cc").alias("cc"))
    )
    scatter_df = apply_df
    pg_cc_result_df = scatter_df
    if i % 5 == 0:
        pg_cc_result_df = pg_cc_result_df.checkpoint()

pg_cc_result_df.groupBy("cc").count().count()

# Крачайшее расстояние - Single Source Shortest Path

## Pregel

### Подготовка данных

In [None]:
edges_df, vertices_df = small_graph()

In [None]:
vertices_df.printSchema()

In [None]:
edges_df.printSchema()

In [None]:
def get_min_dist_init_df(start:int = 1) -> DataFrame:
    df = vertices_df.withColumn("value",
        F.when(
            col("id") == F.lit(start),
            F.lit(0)
        )
        .otherwise(F.lit(sys.maxsize // 2))
    )
    return df

min_dist_df = get_min_dist_init_df()
min_dist_df.printSchema()

### Реализация

In [None]:
min_dist_df = get_min_dist_init_df(start=1)
for _ in range(5):
    min_dist_df = (
        edges_df.join(min_dist_df, col("src") == col("id"))
            .select(col("dst").alias("id"), F.expr("dist + weight").alias("message"))
            .groupby(col("id")).agg(F.min("message").alias("message"))
            .join(min_dist_df, "id", "right")
            .select("id", F.least("message", "dist").alias("dist"))
    )

min_dist_df.show()

## GraphLab

In [None]:
edges_df, vertices_df = small_graph()

In [None]:
vertices_df.printSchema()

In [None]:
edges_df.printSchema()

In [None]:
scope = Scope(edges_df)

In [None]:
min_dist_df = get_min_dist_df(start=1)
inward = scope.inward

for _ in range(5):
    min_dist_nbr_df = (
        inward.alias("s")
            .join(min_dist_df.alias("r"), col("s.nbr") == col("r.id"))
            .select(col("s.id"), F.expr("dist + weight").alias("dist"))
            .groupBy(col("s.id"))
            .agg(F.min("dist").alias("acc_min_dist"))
    )
    
    min_dist_df = (
        min_dist_nbr_df.alias("a")
            .join(min_dist_df.alias("r"), "id", "right")
            .select("id", F.least(col("r.dist"), col("a.acc_min_dist")).alias("dist"))
    )
min_dist_df.show()

## PowerGraph

In [None]:
edges_df, vertices_df = small_graph()

In [None]:
vertices_df.printSchema()

In [None]:
edges_df.printSchema()

In [None]:
scope = Scope(edges_df)

In [None]:
pg_min_dist_result_df = get_min_dist_df(start=1)
inward = scope.inward
for _ in range(5):
    gather_df = (
        inward.alias("s")
            .join(
                pg_min_dist_result_df.alias("r"),
                col("r.id") == col("s.nbr"))
            .select(col("s.id"), F.expr("r.dist + s.weight").alias("dist"))
    )
    sum_df = gather_df.groupBy("id").agg(F.min("dist").alias("acc_dist"))
    apply_df = (
        sum_df
            .join(pg_min_dist_result_df, "id", "right")
            .select(col("id"), F.least("acc_dist", "dist").alias("dist"))
    )
    scatter_df = apply_df
    pg_min_dist_result_df = scatter_df

pg_min_dist_result_df.show()

# Топологическая сортировка - Topological Sort

## Pregel

### Подготовка данных

In [None]:
edges_df, vertices_df = small_graph()

In [None]:
vertices_df.printSchema()

In [None]:
edges_df.printSchema()

### Реализация

In [None]:
sorted_df = vertices_df.withColumn("ord", F.lit(1))

for _ in range(5):
    sorted_df = (
        edges_df.where(col("src") != col("dst"))
            .join(sorted_df, col("src") == col("id"))
            .select(col("dst").alias("id"), col("ord").alias("message"))
            .groupby(col("id")).agg(F.max("message").alias("message"))
            .join(sorted_df, "id", "right")
            .select("id", F.greatest(F.expr("message + 1"), "ord").alias("ord"))
    )

sorted_df.orderBy("ord").show()

## GraphLab

### Подготовка данных

In [None]:
edges_df, vertices_df = small_graph()

In [None]:
vertices_df.printSchema()

In [None]:
edges_df.printSchema()

In [None]:
scope = Scope(edges_df)

### Реализация I

In [None]:
sorted_df = vertices_df.withColumn("ord", F.lit(1))
inward = scope.inward

for _ in range(5):
    sorted_nbr_df = (
        inward.where("src != dst").alias("s")
            .join(sorted_df.alias("r"), col("s.nbr") == col("r.id"))
            .select(col("s.id"), F.col("r.ord"))
            .groupBy(col("s.id"))
            .agg(F.max("ord").alias("acc_ord"))
    )
    
    sorted_df = (
        sorted_nbr_df.alias("a")
            .join(sorted_df.alias("r"), "id", "right")
            .select("id", F.greatest(col("r.ord"), F.expr("acc_ord + 1")).alias("ord"))
    )

sorted_df.orderBy("ord").show()

### Реализация II

In [None]:
inward = scope.inward
outward = scope.outward

sorted_df = vertices_df.withColumn("ord", F.rand())

for _ in range(5):
    inward_nbr_df = (
        inward.alias("s")
            .join(sorted_df.alias("r"), col("s.nbr") == col("r.id"))
            .select(col("s.id"), F.expr("ord").alias("ord"), col("r.id"))
            .groupBy(col("s.id"))
            .agg(F.max("ord").alias("in_acc_ord"))
    )
    outward_nbr_df = (
        outward.alias("s")
            .join(sorted_df.alias("r"), col("s.nbr") == col("r.id"))
            .select(col("s.id"), F.expr("ord").alias("ord"), col("r.id"))
            .groupBy(col("s.id"))
            .agg(F.min("ord").alias("out_acc_ord"))
    )
    sorted_df = (
        inward_nbr_df.join(outward_nbr_df, "id", "full")
            .join(sorted_df, "id", "right")
            .select("id", F.coalesce("in_acc_ord", F.lit(0)).alias("max_in"), F.coalesce("out_acc_ord", F.lit(1)).alias("min_out"), "ord")
            .select("id", F.abs(F.expr("min_out - max_in")).alias("diff"), "ord")
            .select("id", F.expr("diff / 2").alias("ord"))
    )

sorted_df.orderBy("ord").show()

## PowerGraph

In [None]:
edges_df, vertices_df = small_graph()

In [None]:
vertices_df.printSchema()

In [None]:
edges_df.printSchema()

In [None]:
scope = Scope(edges_df)

In [None]:
sorted_df = vertices_df.withColumn("ord", F.lit(1))

inward = scope.inward
for _ in range(5):
    gather_df = (
        inward.where("src != dst").alias("s")
            .join(
                sorted_df.alias("r"),
                col("r.id") == col("s.nbr"))
            .select(col("s.id"), col("r.ord"))
    )
    sum_df = gather_df.groupBy("id").agg(F.max("ord").alias("acc_ord"))
    apply_df = (
        sum_df
            .join(sorted_df, "id", "right")
            .select(col("id"), F.greatest(F.expr("acc_ord + 1"), "ord").alias("ord"))
    )
    scatter_df = apply_df
    sorted_df = scatter_df

sorted_df.orderBy("ord").show()

# Общий алгоритм

## Pregel

In [None]:
def pregel_superstep(edges: DataFrame, values: DataFrame, message: Column, reducer: Callable[[Column], Column], computer: Column, **columns: Column) -> DataFrame:
    additional_columns = columns.values()
    return (
        edges.where(col("src") != col("dst"))
            .join(values, col("src") == col("id"))
            .select(col("dst").alias("id"), message.alias("message"))
            .groupby(col("id"))
            .agg(reducer(col("message")).alias("message"))
            .join(values, "id", "right")
            .select("id", computer.alias("value"), *additional_columns)
    )

In [None]:
edges_df, vertices_df = small_graph()

### Связные компоненты - Connected Components

In [None]:
cc_df = vertices_df.withColumn("value", col("id"))

for _ in range(5):
    cc_df = pregel_superstep(
        edges=edges_df,
        values=cc_df,
        message=col("value"),
        reducer=F.min,
        computer=F.least(col("value"), col("message"))
    )

cc_df.show()

### Кратчайщее расстояние - Single Point Shortest Path

In [None]:
pregel_min_dist_df = get_min_dist_init_df(start=1)

for _ in range(5):
    pregel_min_dist_df = pregel_superstep(
        edges=edges_df,
        values=pregel_min_dist_df,
        message=F.expr("value + weight"),
        reducer=F.min,
        computer=F.least("message", "value")
    )

new_pregel_min_dist_df.show()

### Топологическая сортировка - Topological Sort

In [None]:
sorted_df = vertices_df.withColumn("value", F.lit(1))

for _ in range(5):
    sorted_df = (
        edges_df.where(col("src") != col("dst"))
            .join(sorted_df, col("src") == col("id"))
            .select(col("dst").alias("id"), col("value").alias("message"))
            .groupby(col("id")).agg(F.max(col("message")).alias("message"))
            .join(sorted_df, "id", "right")
            .select("id", F.greatest(F.expr("message + 1"), "value").alias("value"))
    )
sorted_df.orderBy("value").show()

In [None]:
pregel_sort_df = vertices_df.withColumn("value", F.lit(1))

for _ in range(5):
    pregel_sort_df = pregel_superstep(
        edges=edges_df,
        values=pregel_sort_df,
        message=col("value"),
        reducer=F.max,
        computer=F.greatest(F.expr("message + 1"), "value")
    )

pregel_sort_df.orderBy("value").show()

### PageRank

In [None]:
in_deg = edges_df.groupBy("dst").agg(F.count(F.lit(1)).alias("in_deg")).withColumnRenamed("dst", "id")
out_deg = edges_df.groupBy("src").agg(F.count(F.lit(1)).alias("out_deg")).withColumnRenamed("src", "id")
deg_vert_df = vertices_df.join(in_deg, "id", "left").join(out_deg, "id", "left").select("id", F.coalesce("in_deg", F.lit(1)).alias("in_deg"), F.coalesce("out_deg", F.lit(1)).alias("out_deg"))

In [None]:
page_rank_result_df = deg_vert_df.withColumn("value", F.lit(1))
page_rank_result_df.show()

In [None]:
for i in range(10):
    page_rank_result_df = pregel_superstep(
        edges=edges_df,
        values=page_rank_result_df,
        message=col("value"),
        reducer=F.sum,
        computer=F.expr("(0.15 + 0.85 * nvl(message, value)) / out_deg"),
        out_deg=col("out_deg")
    )
    if i % 5 == 0:
        page_rank_result_df = page_rank_result_df.checkpoint()

page_rank_result_df.show()