## Spark setup & Libraries

In [1]:
# spark setup
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import MinHashLSH, VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, when, explode, lit, array_contains


# libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json

# show all rows with df.head
pd.options.display.max_columns = None
# remove warnings
import warnings
warnings.filterwarnings('ignore')



## create spark session

In [2]:
spark = SparkSession.builder \
  .appName("DIS_project_5") \
  .master("local[*]") \
  .config("spark.driver.memory", "10G") \
  .config("spa\rk.driver.maxResultSize", "40g") \
  .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
  .getOrCreate()
spark
sc = spark.sparkContext

your 131072x1 screen size is bogus. expect trouble
24/11/01 16:25:39 WARN Utils: Your hostname, MadioLaptop resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/01 16:25:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/01 16:25:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [15]:
sc.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.driver.port', '42773'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '10G'),
 ('

## load community data

In [3]:
json_file_path = "../Community Detection/10K.json"
with open(json_file_path, 'r') as f:
    community_data = json.load(f)

community_rdd = sc.parallelize(community_data)
print(community_rdd.take(3))
print(community_rdd.count())
print(type(community_rdd))

                                                                                

[{'community_id': 1, 'nodes': [371, 340, 378, 667, 974, 368, 627, 64, 168, 257, 925, 767, 549, 890, 899, 226, 45, 71, 72, 381, 707, 973, 812, 839, 96, 433, 670, 468, 466, 876, 402, 646, 593, 366, 612], 'edges': [{'node1': 371, 'node2': 549, 'begintijd': 20240318013535, 'eindtijd': 20240318030219}, {'node1': 371, 'node2': 812, 'begintijd': 20240412020025, 'eindtijd': 20240412053609}, {'node1': 371, 'node2': 670, 'begintijd': 20240606074446, 'eindtijd': 20240606110610}, {'node1': 340, 'node2': 378, 'begintijd': 20240918161950, 'eindtijd': 20240918172041}, {'node1': 378, 'node2': 466, 'begintijd': 20240918071254, 'eindtijd': 20240918140257}, {'node1': 667, 'node2': 767, 'begintijd': 20241101133614, 'eindtijd': 20241101175233}, {'node1': 368, 'node2': 627, 'begintijd': 20240112043512, 'eindtijd': 20240112122716}, {'node1': 368, 'node2': 767, 'begintijd': 20240205225900, 'eindtijd': 20240206062846}, {'node1': 64, 'node2': 627, 'begintijd': 20240407064043, 'eindtijd': 20240407074414}, {'node

## lsh

In [4]:
# Step 1: Collect unique nodes and edges across all communities
all_nodes = sorted({node for community in community_rdd.collect() for node in community['nodes']})
all_edges = sorted({(edge['node1'], edge['node2']) for community in community_rdd.collect() for edge in community['edges']})

# Step 2: Convert RDD to DataFrame with binary columns for each unique node and edge
def create_binary_features(community):
    features = {}
    # Binary columns for nodes
    for node in all_nodes:
        features[f'node_{node}'] = 1 if node in community['nodes'] else 0
    # Binary columns for edges
    for edge in all_edges:
        features[f'edge_{edge[0]}_{edge[1]}'] = 1 if edge in community['edges'] else 0
    # Include the community_id for reference
    features['community_id'] = community['community_id']
    return Row(**features)

# Apply create_binary_features to each community in the RDD
binary_features_rdd = community_rdd.map(create_binary_features)

# Convert the RDD to a DataFrame
data_df = spark.createDataFrame(binary_features_rdd)


#step 3; assemble all the node and edge columns into a single feature column using vector assembler
feature_cols = [f"node_{node}" for node in all_nodes] + [f"edge_{edge[0]}_{edge[1]}" for edge in all_edges]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
vector_df = assembler.transform(data_df)
print((vector_df.select(["community_id"]).take(2)))

24/11/01 16:25:47 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 5:>                                                          (0 + 1) / 1]

[Row(community_id=1), Row(community_id=2)]


                                                                                

In [5]:
# step 4; initialise MinHashLSH and fit the model
minhash = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = minhash.fit(vector_df)

# transform data and show hashes
transformed_df = model.transform(vector_df)
print(transformed_df.show())


24/11/01 16:25:50 WARN DAGScheduler: Broadcasting large task binary with size 1429.1 KiB
24/11/01 16:25:52 WARN DAGScheduler: Broadcasting large task binary with size 1429.1 KiB
24/11/01 16:25:53 WARN DAGScheduler: Broadcasting large task binary with size 1429.1 KiB
                                                                                

+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----

In [6]:
# test lsh model
# key_test_community = {'community_id': 1, 'nodes': [1, 2, 9, 7, 3, 8], 'edges': [{'node1': 1, 'node2': 2, 'begintijd': 20240921180004, 'eindtijd': 20240921225419}, {'node1': 1, 'node2': 9, 'begintijd': 20240318051804, 'eindtijd': 20240318115006}, {'node1': 1, 'node2': 7, 'begintijd': 20240112134348, 'eindtijd': 20240112143558}, {'node1': 1, 'node2': 3, 'begintijd': 20240928030750, 'eindtijd': 20240928065938}, {'node1': 2, 'node2': 9, 'begintijd': 20241204014916, 'eindtijd': 20241204073145}, {'node1': 2, 'node2': 7, 'begintijd': 20240417225406, 'eindtijd': 20240418030243}, {'node1': 2, 'node2': 3, 'begintijd': 20240726103240, 'eindtijd': 20240726171152}, {'node1': 2, 'node2': 8, 'begintijd': 20241022091933, 'eindtijd': 20241022141128}, {'node1': 7, 'node2': 9, 'begintijd': 20240326143919, 'eindtijd': 20240326184538}, {'node1': 7, 'node2': 8, 'begintijd': 20240225113935, 'eindtijd': 20240225140514}, {'node1': 3, 'node2': 9, 'begintijd': 20240710155403, 'eindtijd': 20240710164441}, {'node1': 3, 'node2': 7, 'begintijd': 20240625050041, 'eindtijd': 20240625050327}, {'node1': 8, 'node2': 9, 'begintijd': 20240218112750, 'eindtijd': 20240218145533}]}

first_key_vector = transformed_df.filter(transformed_df.community_id == 2).select("features").first()["features"]
print(first_key_vector)
# print(transformed_df.show(8))

# remove vector from dataframe with id 2
no_target_transformed_df = transformed_df.filter(transformed_df.community_id != 2)

number_of_neighbours = 5
# model.approxNearestNeighbors(no_target_transformed_df, first_key_vector, numNearestNeighbors=number_of_neighbours).show()
model.approxNearestNeighbors(no_target_transformed_df, first_key_vector, numNearestNeighbors=number_of_neighbours).select("community_id").show()

24/11/01 16:25:55 WARN DAGScheduler: Broadcasting large task binary with size 1190.4 KiB


(1722,[14,17,67,88,93,94,115,137,151,203,243,300,301,365,384,388,397,423,446,451,549,655,671,676,775,829],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])


24/11/01 16:25:57 WARN DAGScheduler: Broadcasting large task binary with size 1205.8 KiB
[Stage 10:>                                                       (0 + 12) / 12]

+------------+
|community_id|
+------------+
+------------+



                                                                                

In [7]:
# nearest neighbours multiple at ones
# print(model.approxSimilarityJoin(transformed_df, transformed_df, 0.6).show())
print(transformed_df.show(1))
print(model.approxSimilarityJoin(transformed_df, transformed_df.show(1), 100).select("datasetA.community_id", "datasetB.community_id").show())

24/11/01 16:25:59 WARN DAGScheduler: Broadcasting large task binary with size 1429.1 KiB
                                                                                

+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----

24/11/01 16:26:00 WARN DAGScheduler: Broadcasting large task binary with size 1429.1 KiB


+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----

Py4JJavaError: An error occurred while calling o128.approxSimilarityJoin.
: java.lang.NullPointerException
	at org.apache.spark.ml.feature.LSHModel.processDataset(LSH.scala:224)
	at org.apache.spark.ml.feature.LSHModel.approxSimilarityJoin(LSH.scala:279)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
# find N nearest neighbors, by looping through each record in the data and finding the nearest neighbors for each record and saving them
lsh_communities = []
for key in df:
  # key_ = key

In [None]:
# pass data of community and the nearest neighbor communities to Nils function
#todo add Nils function here

## test method, alternative data

In [8]:
vec_df1 = vector_df.take(2)
print(f"features in sparsevector of community 1: \n{vec_df1[1].features}")
# sparse_vec_df1 = Vectors.sparse(vec_df1)
# print(sparse_vec_df1)

formatted_data = vector_df.select("community_id", "features").rdd.map(lambda row: (row.community_id, row.features))
print(formatted_data.collect())

# Row(nodes, edges, community_id=1, features=SparseVector(1722, {39: 1.0, 55: 1.0, 60: 1.0, 61: 1.0, 82: 1.0, 142: 1.0, 195: 1.0, 223: 1.0, 296: 1.0, 320: 1.0, 322: 1.0, 325: 1.0, 330: 1.0, 332: 1.0, 351: 1.0, 375: 1.0, 405: 1.0, 407: 1.0, 475: 1.0, 512: 1.0, 528: 1.0, 541: 1.0, 558: 1.0, 579: 1.0, 582: 1.0, 614: 1.0, 664: 1.0, 702: 1.0, 727: 1.0, 760: 1.0, 772: 1.0, 780: 1.0, 804: 1.0, 846: 1.0, 847: 1.0})), 

24/11/01 16:26:28 WARN DAGScheduler: Broadcasting large task binary with size 1189.6 KiB


features in sparsevector of community 1: 
(1722,[14,17,67,88,93,94,115,137,151,203,243,300,301,365,384,388,397,423,446,451,549,655,671,676,775,829],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])


24/11/01 16:26:28 WARN DAGScheduler: Broadcasting large task binary with size 1186.2 KiB


[(1, SparseVector(1722, {39: 1.0, 55: 1.0, 60: 1.0, 61: 1.0, 82: 1.0, 142: 1.0, 195: 1.0, 223: 1.0, 296: 1.0, 320: 1.0, 322: 1.0, 325: 1.0, 330: 1.0, 332: 1.0, 351: 1.0, 375: 1.0, 405: 1.0, 407: 1.0, 475: 1.0, 512: 1.0, 528: 1.0, 541: 1.0, 558: 1.0, 579: 1.0, 582: 1.0, 614: 1.0, 664: 1.0, 702: 1.0, 727: 1.0, 760: 1.0, 772: 1.0, 780: 1.0, 804: 1.0, 846: 1.0, 847: 1.0})), (2, SparseVector(1722, {14: 1.0, 17: 1.0, 67: 1.0, 88: 1.0, 93: 1.0, 94: 1.0, 115: 1.0, 137: 1.0, 151: 1.0, 203: 1.0, 243: 1.0, 300: 1.0, 301: 1.0, 365: 1.0, 384: 1.0, 388: 1.0, 397: 1.0, 423: 1.0, 446: 1.0, 451: 1.0, 549: 1.0, 655: 1.0, 671: 1.0, 676: 1.0, 775: 1.0, 829: 1.0})), (3, SparseVector(1722, {44: 1.0, 50: 1.0, 105: 1.0, 165: 1.0, 249: 1.0, 270: 1.0, 283: 1.0, 303: 1.0, 350: 1.0, 358: 1.0, 362: 1.0, 379: 1.0, 401: 1.0, 417: 1.0, 450: 1.0, 517: 1.0, 564: 1.0, 589: 1.0, 591: 1.0, 620: 1.0, 630: 1.0, 638: 1.0, 719: 1.0, 732: 1.0, 733: 1.0, 765: 1.0, 799: 1.0, 835: 1.0, 855: 1.0})), (4, SparseVector(1722, {9: 1.0,

                                                                                

In [9]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

warnings.filterwarnings('ignore')


# define OG community dataset
temp_data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0])),
             (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0])),
             (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]))]
# print(f"temp_data:\n {temp_data}")


temp_data2 = formatted_data
# print(f"temp_data2: \n {temp_data2}")


# create dataframe
temp_df = spark.createDataFrame(temp_data, ["id", "features"])
# print(f"temp_df:\n {temp_df.collect()}")

temp_df2 = spark.createDataFrame(temp_data2, ["id", "features"])
# print(f"temp_df2:\n {temp_df2.collect()}")


# create model on dataset
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
mh.setSeed(42)
model = mh.fit(temp_df)
# print(f"model:\n {model}")

model2 = mh.fit(temp_df2)
# print(f"model2:\n {model2}")


# example apporx neighbour
# key = Vectors.sparse(6, [0, 1], [1.0, 1.0])
# model.approxNearestNeighbors(temp_df, key, 3).collect()


# getting cross reverence for approximty for all
result = model.approxSimilarityJoin(temp_df, temp_df, threshold=float('inf')).filter("datasetA.id != datasetB.id")
results_df = result.select(
  col("datasetA.id").alias("idA"),
  col("datasetB.id").alias("idB"),
  col("distCol").alias("JaccardDistance")
  )
print(f"count of temp_df: {temp_df.count()}")
print(f"count of results_df: {results_df.count()}")
results_df.show()

result2 = model2.approxSimilarityJoin(temp_df2, temp_df2, threshold=float('inf'))
results_df2 = result2.select(
  col("datasetA.id").alias("idA"),
  col("datasetB.id").alias("idB"),
  col("distCol").alias("JaccardDistance")
  )
print(f"count of temp_df2: {temp_df2.count()}")
print(f"count of results_df2: {results_df2.count()}")
results_df2.show()

# print results
# print(f"result type: {type(results_df)}")
# print(f"result2 type: {type(results_df2)}")



#todo add groupby for larger groups, filter only top 5 results and then you have for each community ID the most similiar community ID's

24/11/01 16:27:00 WARN DAGScheduler: Broadcasting large task binary with size 1187.6 KiB
24/11/01 16:27:00 WARN DAGScheduler: Broadcasting large task binary with size 1196.6 KiB


count of temp_df: 3


                                                                                

count of results_df: 6
+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
|  0|  2|            0.5|
|  2|  0|            0.5|
|  0|  1|            0.8|
|  2|  1|            0.5|
|  1|  2|            0.5|
|  1|  0|            0.8|
+---+---+---------------+



24/11/01 16:27:04 WARN DAGScheduler: Broadcasting large task binary with size 1196.3 KiB
24/11/01 16:27:05 WARN DAGScheduler: Broadcasting large task binary with size 1249.9 KiB


count of temp_df2: 53


24/11/01 16:27:06 WARN DAGScheduler: Broadcasting large task binary with size 1249.9 KiB
24/11/01 16:27:10 WARN DAGScheduler: Broadcasting large task binary with size 1324.1 KiB
24/11/01 16:27:10 WARN DAGScheduler: Broadcasting large task binary with size 1260.0 KiB
24/11/01 16:27:10 WARN DAGScheduler: Broadcasting large task binary with size 1249.9 KiB


count of results_df2: 53


24/11/01 16:27:10 WARN DAGScheduler: Broadcasting large task binary with size 1249.9 KiB
24/11/01 16:27:13 WARN DAGScheduler: Broadcasting large task binary with size 1324.1 KiB


+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
| 50| 50|            0.0|
| 22| 22|            0.0|
| 12| 12|            0.0|
| 17| 17|            0.0|
| 28| 28|            0.0|
| 11| 11|            0.0|
| 39| 39|            0.0|
| 53| 53|            0.0|
| 52| 52|            0.0|
|  9|  9|            0.0|
| 25| 25|            0.0|
| 23| 23|            0.0|
|  4|  4|            0.0|
| 32| 32|            0.0|
|  6|  6|            0.0|
| 38| 38|            0.0|
| 46| 46|            0.0|
| 37| 37|            0.0|
|  3|  3|            0.0|
| 40| 40|            0.0|
+---+---+---------------+
only showing top 20 rows



24/11/01 16:27:13 WARN DAGScheduler: Broadcasting large task binary with size 1261.1 KiB
                                                                                

In [14]:
print(results_df2.rdd.getNumPartitions())
results_df2.repartition(100).rdd.getNumPartitions()
print(results_df2.rdd.getNumPartitions())

1


24/11/01 16:31:25 WARN DAGScheduler: Broadcasting large task binary with size 1249.9 KiB
24/11/01 16:31:25 WARN DAGScheduler: Broadcasting large task binary with size 1249.9 KiB

1


24/11/01 16:31:28 WARN DAGScheduler: Broadcasting large task binary with size 1324.1 KiB
24/11/01 16:31:28 WARN DAGScheduler: Broadcasting large task binary with size 1261.0 KiB
