In [0]:
import time
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import sum as _sum, size

In [0]:
def load_table_df(path):
    """
    return df from csv file
    """
    return spark.read.format("csv").option("header","true").option("inferSchema", "true").load(path)

In [0]:
df = load_table_df('/FileStore/tables/web_Google.txt')

In [0]:
def clean_table_df(df):
    """
    clean the df imported
    """
    col = df.columns[0]
    return df.withColumn('key', split(df[col], '\t').getItem(0).cast(IntegerType())) \
             .withColumn('value', split(df[col], '\t').getItem(1).cast(IntegerType())) \
             .drop(col)

In [0]:
df = clean_table_df(df)

In [0]:
def fccg_iterate(df):
    # MAP
    map_df = df.union(df.select(col("value").alias("key"), col("key").alias("value")))

    # REDUCE
    # Group by 'key' and aggregate values into a list, and find minimum value in each group
    df_agg = map_df.groupBy("key").agg(
        collect_set("value").alias("adj_list"),
        array_min(collect_set("value")).alias("min_value")
        )

    # Filter the rows that have min_value lower than key
    df_agg_filtered = df_agg.filter(col("key")>col("min_value"))

    # Calculate the new pairs created (sum of values in the adjacency list)
    new_count = df_agg_filtered.select(_sum(size("adj_list"))).first()[0] - df_agg_filtered.count()

    # Concat the key column with the adj_list one into one array
    df_concat = df_agg_filtered.select(concat(array(col("key")), col("adj_list")).alias("adj_list"), col("min_value"))
    # Explode the 'values' list to separate rows and include 'min_value' for comparison
    df_exploded = df_concat.select(
        explode("adj_list").alias("key"),
        col("min_value").alias("value")
    )

    # DEDUP
    df_final = df_exploded.distinct()

    return df_final, new_count

In [0]:
def compute_fccg(df):
    """Main computation loop to find connected components, removing global variable usage."""
    nb_iteration = 0
    
    while True:
        nb_iteration += 1
        
        df, count = fccg_iterate(df)

        print(f"Number of new pairs for iteration #{nb_iteration}:\t{count}")
        if count == 0:
            print("\nNo new pair, end of computation")
            break
            
    return df

In [0]:
final_df = compute_fccg(df)

Number of new pairs for iteration #1:	7223780
Number of new pairs for iteration #2:	4881437
Number of new pairs for iteration #3:	3333689
Number of new pairs for iteration #4:	3914963
Number of new pairs for iteration #5:	1909847
Number of new pairs for iteration #6:	87132
Number of new pairs for iteration #7:	1326


In [0]:
final_df.show()

+---+-----+
|key|value|
+---+-----+
|  1|    0|
|  6|    0|
|  4|    0|
| 16|    0|
|  0|    0|
| 14|    0|
| 21|    0|
| 12|    0|
|  2|    0|
| 11|    0|
| 20|    0|
| 13|    0|
|  5|    0|
|  8|    0|
|  9|    0|
|  3|    0|
| 10|    0|
| 19|    0|
| 17|    0|
| 15|    0|
+---+-----+
only showing top 20 rows



In [0]:
# Number of clusters
final_df.select('value').distinct().count()

Out[31]: 1