USARE LINK PREDICTION CON ADAMIC ADAR PER PAGE RANK

In [13]:
# pip install hdfs
# ./hdfscli.cfg

#     [global]
#     default.alias = dev

#     [dev.alias]
#     url = http://localhost:9870

# from hdfs import Config

# client = Config().get_client('dev')
# test = client.list('/test')
# print(test)

# with client.read('/test/movies.csv') as reader:
#     movies = reader.read()

In [14]:
import findspark
findspark.init()

In [15]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from graphframes import GraphFrame

# Creazione della SparkSession
# spark = SparkSession.builder.appName("Spark GraphFrames Example").getOrCreate()

try:
    # Configura SparkSession per connettersi al master
    spark = SparkSession.builder \
        .appName("Spark GraphFrames Example") \
        .master("spark://192.168.0.112:7077") \
        .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9001") \
        .getOrCreate()

    # Verifica versione Spark
    print(f"Spark Version: {spark.version}")

    # Verifica configurazione del master
    master_config = spark.conf.get("spark.master")
    print(f"Connected to master: {master_config}")

    # Test: crea un DataFrame vuoto
    test_df = spark.createDataFrame([], schema="id INT, value STRING")
    test_df.show()

    print("Connection successful, Spark is ready!")

except Exception as e:
    print("Failed to connect to Spark master.")
    print(f"Error: {e}")


Spark Version: 3.5.3
Connected to master: spark://192.168.0.112:7077
+---+-----+
| id|value|
+---+-----+
+---+-----+

Connection successful, Spark is ready!


In [16]:
# Prova un'operazione semplice su Spark
try:
    simple_df = spark.read.csv("hdfs://localhost:50010/test/ratings.csv", header=True, inferSchema=True)
    simple_df.show(5)
    print("Spark job completed successfully!")
except Exception as e:
    print(f"Error: {e}")


+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

Spark job completed successfully!


In [17]:
# # Verifica accesso a HDFS
# hdfs_files = spark.read.format("text").load("hdfs://localhost:9001/test")
# hdfs_files.show()

In [18]:
# Caricamento dei dati da HDFS
ratings = spark.read.csv('hdfs://localhost:50010/test/ratings.csv', header=True, inferSchema=True)
movies = spark.read.csv('hdfs://localhost:50010/test/movies.csv', header=True, inferSchema=True)

In [19]:
# Merge dei dataset ratings e movies
user_movie_matrix = ratings.join(movies, on="movieId", how="inner")
user_movie_matrix.show(5)

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
+-------+------+------+---------+--------------------+--------------------+
only showing top 5 rows



In [20]:
# # Controllo per film comuni valutati da più utenti
# common_movies = user_movie_matrix.groupBy("title").count()
# common_movies.filter(col("count") > 1).show(10)

In [21]:
# Mappatura dei punteggi

mapping_score = {
        0.5: -1.0,
        1: -1.0,
        1.5: -0.5,
        2: 0.0,
        2.5: 0.0,
        3: 0.0,
        3.5: 0.5,
        4: 1.0,
        4.5: 1.1,
        5: 1.2
}

In [22]:
map_score_udf = spark.udf.register("map_score", lambda x: mapping_score.get(x, 0), FloatType())
user_movie_matrix = user_movie_matrix.withColumn("weight", map_score_udf(col("rating")))

In [23]:
# Creazione degli edge
edges = user_movie_matrix.select(
    col("userId").cast("string").alias("src"),
    col("movieId").cast("string").alias("dst"),
    col("weight")
)

In [24]:
# Creazione dei vertici

user_vertices = user_movie_matrix.select(col("userId").cast("string").alias("id")).distinct().withColumn("bipartite", lit(0))
movie_vertices = user_movie_matrix.select(col("movieId").cast("string").alias("id")).distinct().withColumn("bipartite", lit(1))
vertices = user_vertices.union(movie_vertices)

In [25]:
# Creazione del grafo bipartito
user_movie_graph = GraphFrame(vertices, edges)



In [26]:
# Proiezione user-user
def project_user_user_graph(user_movie_graph):
    user_user_edges = user_movie_graph.edges.alias("e1") \
        .join(user_movie_graph.edges.alias("e2"), col("e1.dst") == col("e2.dst")) \
        .select(
            col("e1.src").alias("src"),
            col("e2.src").alias("dst"),
            (col("e1.weight") + col("e2.weight")).alias("weight")
        ).filter(col("src") != col("dst"))
    return GraphFrame(user_movie_graph.vertices, user_user_edges)

user_user_graph = project_user_user_graph(user_movie_graph)

In [27]:
# Proiezione movie-movie
def project_movie_movie_graph(user_movie_graph):
    movie_movie_edges = user_movie_graph.edges.alias("e1") \
        .join(user_movie_graph.edges.alias("e2"), col("e1.src") == col("e2.src")) \
        .select(
            col("e1.dst").alias("src"),
            col("e2.dst").alias("dst"),
            (col("e1.weight") + col("e2.weight")).alias("weight")
        ).filter(col("src") != col("dst"))
    return GraphFrame(user_movie_graph.vertices, movie_movie_edges)

movie_movie_graph = project_movie_movie_graph(user_movie_graph)

In [28]:
# Funzione per calcolare il vettore di preferenze
def create_preference_vector(user_id, user_movie_graph):
    edges = user_movie_graph.edges.filter(col("src") == user_id).rdd.map(lambda row: (row["dst"], row["weight"])).collect()
    tot = sum([weight for _, weight in edges])
    if tot > 0:
        return {movie: weight / tot for movie, weight in edges}
    else:
        movies = user_movie_graph.vertices.filter(col("bipartite") == 1).select("id").rdd.map(lambda row: row[0]).collect()
        return {movie: 1 / len(movies) for movie in movies}

**Page Rank**

In [34]:
# Funzione di predizione

def predict_user(user_id, user_movie_graph, movie_movie_graph):
    # Crea il vettore di preferenze dell'utente
    p_vec = create_preference_vector(user_id, user_movie_graph)
    # Film già visti dall'utente
    already_seen = [movie for movie, weight in p_vec.items() if weight > 0]
    if len(already_seen) == len(p_vec):  # Se ha visto tutti i film, non c'è nulla da predire
        return []
    
    # Calcolo del PageRank sui film
    pagerank_results = movie_movie_graph.pageRank(resetProbability=0.95, maxIter=20)
    
    # Ordina i film in base al PageRank (senza usare .collect() in anticipo)
    item_rank = pagerank_results.vertices.select("id", "pagerank") \
                                          .filter(~col("id").isin(already_seen)) \
                                          .orderBy(col("pagerank"), ascending=False)
    
    # Recupera i primi 10 film raccomandati
    recommendations = item_rank.limit(10).rdd.map(lambda row: row['id']).collect()
    return recommendations




**Link Prediction**

In [35]:
from pyspark.sql.functions import collect_list
def calculate_adamic_adar(graph):
    # Trova tutti i vicini per ciascun nodo (utente o film)
    neighbors = graph.edges.groupBy("src").agg(collect_list("dst").alias("neighbors"))

    # Genera coppie di nodi (film-utente, utente-utente, etc.)
    neighbors_df = neighbors.alias("n1").join(
        neighbors.alias("n2"), col("n1.src") < col("n2.src")
    ).select(
        col("n1.src").alias("v1"),
        col("n2.src").alias("v2"),
        col("n1.neighbors").alias("neighbors_v1"),
        col("n2.neighbors").alias("neighbors_v2")
    )

    # Funzione per calcolare l'indice di Adamic-Adar
    def compute_adamic_adar(neighbors_v1, neighbors_v2):
        common_neighbors = set(neighbors_v1).intersection(set(neighbors_v2))
        if not common_neighbors:
            return 0.0
        return float(sum(1 / np.log(len(neighbors_v1) + len(neighbors_v2)) for _ in common_neighbors))

    # Creazione dell'udf per calcolare l'indice
    compute_adamic_adar_udf = udf(compute_adamic_adar, FloatType())

    # Calcola l'Adamic-Adar index
#     adamic_adar_scores = neighbors_df.withColumn(
#         "score", compute_adamic_adar_udf(col("neighbors_v1"), col("neighbors_v2"))
#     ).filter(col("score") > 0)  # Filtro per evitare punteggi nulli

    adamic_adar_scores = neighbors_df.withColumn(
        "score", compute_adamic_adar_udf(col("neighbors_v1"), col("neighbors_v2"))
    )  # NO Filtro per evitare punteggi nulli

    return adamic_adar_scores.select("v1", "v2", "score")


In [36]:
# Plot histogram for Adamic-Adar Index
def plot_adamic_adar_histogram(adamic_adar_scores):
    # Estrai i punteggi in un array
    scores = [score[2] for score in adamic_adar_scores]
    plt.hist(scores, bins=np.arange(0, max(scores), 0.01), edgecolor='black', alpha=0.7)
    plt.xlabel('Adamic-Adar Index')
    plt.ylabel('Frequency')
    plt.title('Histogram of Adamic-Adar Index for Predicted Edges')
    plt.show()


In [37]:
# Link prediction and plot
# Calcolo dell'indice di Adamic-Adar
adamic_adar_scores = calculate_adamic_adar(user_movie_graph)

In [38]:
#print the length of the predicted edges
print(len(adamic_adar_scores.collect()))

185745


In [39]:
# #plot to find a correct threshold
# plot_adamic_adar_histogram(adamic_adar_scores)

In [40]:
def add_predicted_links(graph, predicted_edges, threshold):
    # Raccolta dei punteggi di Adamic-Adar in una lista
    predicted_edges_list = predicted_edges.collect()

    # Filtra gli edge con punteggio superiore alla soglia
    new_edges = [(row['v1'], row['v2'], row['score']) for row in predicted_edges_list if row['score'] > threshold]
    
    # Crea un DataFrame PySpark per i nuovi edge
    edges_df = spark.createDataFrame(new_edges, ["src", "dst", "weight"])
    
    # Unisce i nuovi edge al grafo esistente
    extended_graph = GraphFrame(graph.vertices, graph.edges.union(edges_df))
    
    return extended_graph


In [41]:
user_movie_graph_extended = add_predicted_links(user_movie_graph, adamic_adar_scores, 0.5)

**Prediction**

In [42]:
# Predict movies for a user
user =10
recommended_movies = predict_user(user, user_movie_graph_extended, movie_movie_graph)
print(f"Recommended movies for user {user}: {recommended_movies[:10]}")

Py4JJavaError: An error occurred while calling o381.run.
: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 91 (mapPartitions at GraphImpl.scala:207) has failed the maximum allowable number of times: 4. Most recent failure reason:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 9 partition 0
	at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1747)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$11(MapOutputTracker.scala:1694)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$11$adapted(MapOutputTracker.scala:1693)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1693)
	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1335)
	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:1297)
	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:141)
	at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:220)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
	at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:381)
	at org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1372)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1597)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
	at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:381)
	at org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1372)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1597)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
	at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2031)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3054)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1039)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1037)
	at org.apache.spark.graphx.lib.PageRank$.runWithOptions(PageRank.scala:199)
	at org.apache.spark.graphx.lib.PageRank$.runWithOptions(PageRank.scala:144)
	at org.graphframes.lib.PageRank$.run(PageRank.scala:130)
	at org.graphframes.lib.PageRank.run(PageRank.scala:104)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
spark.stop()