<a href="https://colab.research.google.com/github/megmenegazzi/AMD-project/blob/main/AMD_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Install pyspark packages**

In [None]:
!pip install pyspark

In [None]:
!pip install -q findspark

## **Instancing pyspark rdd**

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
import pyspark
type(spark)

In [None]:
sc = spark.sparkContext

In [None]:
import os
import findspark
import pandas as pd

## **Load Data**

In [None]:
os.environ["KAGGLE_USERNAME"] = "margheritamenegazzi"
os.environ["KAGGLE_KEY"] = "75953089094982034e32cf970ea2d0e2"

In [None]:
!kaggle datasets download ashirwadsangwan/imdb-dataset --unzip

## **Clean Data**

In [None]:
# import as dataframe
data = spark.read.option("delimiter", "\t").option("header", True).csv("title.principals.tsv/data.tsv").limit(10000) 

In [None]:
data.cache()
data.count()

In [None]:
data.take(5)

In [None]:
# drop unwanted columns
data1 = data.drop("ordering","category","job", "characters")

In [None]:
data1.take(5)

In [None]:
data1.printSchema()

In [None]:
# rename columns

data2 = data1.selectExpr("tconst as title", "nconst as actor")

data2.printSchema()

In [None]:
# define funcitons to drop unwanted characters

from pyspark.sql.functions import udf,col

udf_title_change = udf(lambda title : int(title[2:]))
udf_actor_change = udf(lambda actor : int(actor[2:]))



In [None]:
# drop unwanted characters

data3 = data2.withColumn("title",udf_title_change(col("title")))
data4 = data3.withColumn("actor",udf_actor_change(col("actor")))

data4.show(5)

## **Load and analyze Dataset on Spark**

In [None]:
# create tuples with title and actor

rdd = data4.rdd

In [None]:

simple_rdd = rdd.map(tuple)


# sample dataset, take %, set seed 42


simple_rdd.take(5)

In [None]:
simple_rdd.count()

In [None]:
# invert key and value, prima actor then title

inverted = simple_rdd.map(lambda t : (t[1], t[0]))
inverted.take(2) 

In [None]:
inverted.count()

In [None]:
# link movies with the same actor

joined = inverted.join(inverted)
joined.take(2) 

In [None]:
# remove self loops

filtered = joined.filter(lambda x : x[1][0]!= x[1][1])
filtered.take(2) 

In [None]:
# keep the link list

links = filtered.map(lambda x : x[1])
links.take(2) 

In [None]:
# define function that computes the entries of the adjacency matrix

def adj(x,y):
  
  for elem in y:
    x.append(elem)
  return x   

In [None]:
adjacency1 = links.mapValues(lambda v: [v])
adjacency1.take(2)

In [None]:
# create the adjacency matrix

adjacency = adjacency1.reduceByKey(adj)
adjacency.take(2) 

In [None]:
# define function that computes the entries of the connection matrix

def conn(x):
  k, v = x
  result = []
  for vi in v:
    entry = (k, vi, 1/len(v))
    result.append(entry)
  return result



In [None]:
# create the connection matrix

connection = adjacency.flatMap(conn)
connection.take(10) 

In [None]:
# define the keys list

KL = sorted(connection.map(lambda x : x[0]).distinct().collect()) 

In [None]:
# dictionary that maps every movie id to its position in sequence

dizionario = dict(zip(KL, range(len(KL)))) 

In [None]:
# define map function

def remap(x):
  
  #scompose
  k,v,f=x

  #map
  k = dizionario[k]
  v = dizionario[v]

  #recompose
  nuovatupla = ((k),(v,f))

  return nuovatupla


mapped = connection.map(remap)

In [None]:
# create the array for the page rank

import numpy as np
n = len(KL)
page_rank = np.ones(n)/n
old_page_rank = np.ones(n)

In [None]:
mapped.mapValues(lambda x : x[1]).collect()

In [None]:
# define a function that measures the distance to make the page rank converge

def l2distance(v, q):
    
    if len(v) != len(q):
        raise ValueError(f'Cannot compute the distance of two vectors of size {len(v)} and {len(q)}')
    
    return sum([(q_el - v_el)**2 for v_el, q_el in zip(v, q)])

In [None]:
# compute the page rank

tolerance = 10e-12
max_iterations = 350
 
iteration = 0
while(l2distance(old_page_rank, page_rank) >= tolerance and iteration < max_iterations):
    page_rank_values = (mapped
                        .mapValues(lambda v: v[1]*page_rank[v[0]])
                        .reduceByKey(lambda a, b: a+b)
                        .sortByKey()
                        .collect()
                       )
    old_page_rank = page_rank
    print(page_rank[5])
    page_rank = np.array([c for (i, c) in page_rank_values])
    
    # we use the nice_print function to show how the page_rank vector
    # evolves over time
    # nice_print(page_rank)
    print(iteration)
    iteration += 1