# Capitulo 3: Recomendando Musicas utilizando el Audioscrobbler Data Set

# En qué concisten las recomendaciones utilizando Filtros Colaborativos

<img src="./imagenes/cf-01.png" style="width:60%"/>

<img src="./imagenes/cf-02.png" style="width:60%"/>

<img src="./imagenes/cf-03.png" style="width:60%"/>

<img src="./imagenes/cf-04.png" style="width:60%"/>

<img src="./imagenes/cf-05.png" style="width:60%"/>

<img src="./imagenes/Collaborative_filtering.gif" />

# Alternating Least Squares

<h3 style="text-align:center">Algoritmo de factorizacion de matrices utilizado para construir el Colaborative Filtering</h3>
<img src="./imagenes/mllib_rec_engine_image005.png" style="width:60%"/>

## Librerias a ser utilizadas

In [1]:
# from pyspark.mllib.recommendation import *
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import random
from operator import *
import findspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.mllib.evaluation import BinaryClassificationMetrics
findspark.find()

'/usr/local/spark'

## Se crea una sesion de spark para ejecutar los jobs

In [2]:
# Se construye una SparkSession 
spark = SparkSession.builder \
    .master("local") \
    .appName("Modelo de Recomendacion de musicas") \
    .config("spark.executor.memory", "8gb") \
    .getOrCreate()

sc = spark.sparkContext

# Descripcion del Audioscrobbler Dataset

El Audioscrobbler Dataset es un dataset obtenido del sitio last.fm e incluye los siguientes archivos



<h3 style="text-align:center"> artist_data.txt :</h3>
<p style="text-align:center">Las columnas de este dataset son de la forma:<p>


| artist_id | artist_name |
|------|------|
|1240105 |	André Visior |
|1240113 |	riow arai |
|1106617 |	Bloque |
|6776115 |	小松正夫 |



<h3 style="text-align:center">artist_alias.txt : </h3>
<p style="text-align:center">Dado que en el primer dataset, un mismo artista puede aparecer con nombres escritos de diferentes maneras, se les asigna un alias. Las columnas de este dataset son de la forma:</p>

| artist_id | artist_alias |
|------|------|
|1027859 |	1252408 |
|1017615 |	668 |
|6745885 |	1268522 |
|1018110 |	1018110 |

<h3 style="text-align:center"> user_artist_data.txt : </h3>
<p style="text-align:center">Este archivo es el mas importante de todos ya que se utilizará para generar el dataset. Las columnas de este dataset son de la forma:</p>

| user_id | artist_id | veces_reproducido |
|------|------|------|
|1059637 | 1000010 | 238 |
|1059637 | 1000049 | 1 |
|1059637 | 1000056 | 1 |
|1059637 | 1000062 | 11 |

# Preparando los Datos

## Parseo de los datos
Como los datos deben ser extraidos de un archivo txt definimos una funcion que parsea dichos datos

In [3]:
def parser(s, delimeters=" ", to_int=None):
    '''
    @param: s, string a ser parseado
    @param: delimiters, el criterio segun el cual hacer el split
    @param: to_int, luego del split, que elementos convertir a int
    parser parsea una entrada segun el tipo de delimitador,
    luego si to_int contiene una lista de indices, convierte los
    valores de s a int segun esta lista
    '''
    s = s.split(delimeters)
    # si to_int es distinto de None
    if to_int:
        # se convierte el elemento s[i] en int donde i es un elemento de to_int
        # el cual contiene una lista de indices de los elementos que deben ser convertidos a int
        # sino, dicho elemento de ese no necesita ser convertido a int y se mantiene su tipo original
        for i in to_int:
            s[i] = int(s[i])
    return tuple(s)

## Se parsean los datos de los 3 archivos

In [4]:
# Path que especifica donde se encuentran los datos
dataPath = "/home/jovyan/work/data/audioscrobbling/"

 Se indica de donde tomar los datos de los artistas y se hace el map que parsea los datos,
 para este dataset solo el primer elemento, que corresponde al __user_id__ 

In [5]:
artistData = sc.textFile(dataPath+"artist_data_small.txt").map(lambda x: parser(x,'\t',[0]))
# artistData = sc.textFile(dataPath+"artist_data.txt").map(lambda x: parser(x,'\t',[0]))

 Se indica de donde tomar los aliases de los artistas y se hace el map que parsea los datos.
 Tanto el __user_id__ como el __user_alias__ se deben convertir a int

In [6]:
artistAlias = sc.textFile(dataPath+"artist_alias_small.txt").map(lambda x: parser(x,'\t', [0,1]))
# artistAlias = sc.textFile(dataPath+"artist_alias.txt").map(lambda x: parser(x,'\t', [0,1]))

 Se leen los datos de los artistas y se los mapea, 
 retorna pares clave-valor a artistAliasMap, como un __diccionario__

In [7]:
artistAliasMap = artistAlias.collectAsMap()

 Se obtienen los datos de los usuarios.
 Todos los elementos de cada registro se convierten a __int__

In [8]:
userArtistData = sc.textFile(dataPath+"user_artist_data_small.txt").map(lambda x: parser(x,' ',[0,1,2]))
# userArtistData = sc.textFile(dataPath+"user_artist_data.txt").map(lambda x: parser(x,' ',[0,1,2]))

 Luego se obtiene un nuevo RDD donde el nombre de los artistas se
 convierten a sus alias equivalentes utilizando __get__ que 
 en caso de no encontrarse el alias en el diccionario, se utiliza el valor pasado por defecto
 en este caso, el valor en si (  x [ 1 ]  )

In [9]:
userArtistData = userArtistData.map(lambda x: (x[0], artistAliasMap.get(x[1], x[1]), x[2]))

# Creacion de los conjuntos de datos para entrenar el modelo ALS
 El dataset userArtistData se divide de la siguiente manera
 - 90% trainingData
 - 10% testData

In [10]:
trainingData, testData = userArtistData.randomSplit([90,10], 13)

## Se persisten los RDD con un nivel de almacenamiento predeterminado

In [11]:
trainingData.cache()
testData.cache()

PythonRDD[8] at RDD at PythonRDD.scala:48

In [12]:
# Visualizacion de los RDDs
print (trainingData.take(3))
print (testData.take(3))
print (trainingData.count())
print (testData.count())

[(1059637, 1000010, 238), (1059637, 1000049, 1), (1059637, 1000056, 1)]
[(1059637, 1000094, 1), (1059637, 1000112, 423), (1059637, 1000433, 10)]
44564
4917


# Entrenamiento y Prueba del modelo

## Entrenamiento del modelo
El algoritmo de ALS toma los siguientes hiperparametros
- rank = 10 El numero de factores latentes en el modelo, o equivalentemente, el numero de columnas k en los parametros en las matrices de usuarios y musicas. En casos no triviales, esto tambien es su rango
- iterations = 5 El numero de iteraciones que corre la factorizacion. Mas iteraciones toman más tiempo en computarse pero producen mejores factorizaciones
- lambda = 0.01 Un parametro de overfitting estandar. Valores más grandes resisten el overfiting, pero valores que son muy grandes afectan negativamente la __exactitud__ de la factorizacion
- alpha = 1.0 controla los pesos relativos de las interacciones observadas y no observadas entre usuario - producto (musica) en la factorizacion

In [13]:
training = trainingData.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
# cfModel = ALS.trainImplicit(training, rank=10, seed=345)
cfModel = ALS.trainImplicit(training, rank=10, iterations=20, lambda_=0.01, blocks=-1, alpha=1.0, nonnegative=False, seed=345)
# cfModel.save(sc, "/home/jovyan/work/models/full")

# Validacion del Modelo

## Curva ROC
<img src="./imagenes/ROC_space-2.png" style="width:60%;" />

<h3 style="text-align:center"> Para calcular el grado de acierto del modelo se calcula el Area Bajo la Curva (AUC) </h3>
<img src="./imagenes/Curvas-ROC.png" />

In [14]:
# Se obtienen todos los artist_id y se realiza un broadcast de los mismos
allitemIDs = userArtistData.map(lambda x: x[1]).distinct().collect()
bAllItemIDs = sc.broadcast(allitemIDs)

In [15]:
from random import randint

# Depende del numero de items en userIDAndPosItemIDs,
# crea un conjunto de productos 'negativos' para cada usuario.
# estos son elegidos de forma randomica
# de entre todos los otros items, excluyendo aquellos que son
# "positivos para el usuario"
# NOTE: mapPartitions opera sobre muchos pares (usuario, item-positivo) a la vez
# NOTE: flatMap divide las colecciones de arriba en un gran conjunto de tuplas
def xtractNegative(userIDAndPosItemIDs):
    def pickEnoughNegatives(line):
        userID = line[0]
        posItemIDSet = set(line[1])
        negative = []
        allItemIDs = bAllItemIDs.value
        # Se mantienen tantos ejemplos negativos como positivos por usuario. Pueden haber duplicados
        i = 0
        while (i < len(allItemIDs) and len(negative) < len(posItemIDSet)):
            itemID = allItemIDs[randint(0,len(allItemIDs)-1)]
            if itemID not in posItemIDSet:
                negative.append(itemID)
            i += 1
        
        # El resultado es una coleccion de tuplas (usario, item-negativo)
        return map(lambda itemID: (userID, itemID), negative)

    # Se inicializa un RNG y el conjunto de IDs de objetos una vez por particion 
    return map(pickEnoughNegatives, userIDAndPosItemIDs)

def ratioOfCorrectRanks(positiveRatings, negativeRatings):
    
    # Encuentra elementos en arr cuyo index >= start y tiene valores mas pequenios que x
    # arr es un array ordenado
    def findNumElementsSmallerThan(arr, x, start=0):
        left = start
        right = len(arr) -1
        # si x es mayor que el elemento mas grande en arr
        if start > right or x > arr[right]:
            return right + 1
        mid = -1
        while left <= right:
            mid = (left + right) // 2 # floordiv
            if arr[mid] < x:
                left = mid + 1
            elif arr[mid] > x:
                right = mid - 1
            else:
                while mid-1 >= start and arr[mid-1] == x:
                    mid -= 1
                return mid
        return mid if arr[mid] > x else mid + 1
    
    # AUC puede ser visto como la probabilidad de que un item randomico positivo obtenga
    # un puntaje mayor que un item randomico negativo. Aqui se computa la 
    # proporcion de todos los pares positivo-negativos que son rankeados correctamente. 
    # El resultado es igual a la metrica AUC
    correct = 0 ## L
    total = 0 ## L
    
    # ordenar positiveRatings es mas costoso
    # positiveRatings = np.array(map(lambda x: x.rating, positiveRatings))

    negativeRatings = list(map(lambda x:x.rating, negativeRatings))
    
    #np.sort(positiveRatings)
    negativeRatings.sort()# = np.sort(negativeRatings)
    total = len(positiveRatings)*len(negativeRatings)
    
    for positive in positiveRatings:
        # conteo de pares correctamente rankeados
        correct += findNumElementsSmallerThan(negativeRatings, positive.rating)

    # Retorna AUC: fraccion de pares ranqueados correctamente
    return float(correct) / total

def calculateAUC(positiveData, bAllItemIDs, predictFunction):
    # Toma los datos como 'positivos' y los mapea a tuplas
    positiveUserProducts = positiveData.map(lambda r: (r[0], r[1]))
    # Se realiza predicciones para todos los elementos, 
    # incluyendo una puntuacion numerica, y agrupados por usuario
    positivePredictions = predictFunction(positiveUserProducts).groupBy(lambda r: r.user)
    
    # Se crea un conjunto de productos "negativos" para cada usuario. 
    # Estos son elegidos randomicamente de entre todos los otros items, 
    # excluyendo aquellos que son positivos para el usuario
    negativeUserProducts = positiveUserProducts.groupByKey() \
                                .mapPartitions(xtractNegative).flatMap(lambda x: x)
    # Se realizan predicciones sobre los demas
    negativePredictions = predictFunction(negativeUserProducts).groupBy(lambda r: r.user)
    
    return (
            positivePredictions.join(negativePredictions)
                .values()
                .map(
                    lambda positive_negativeRatings: \
                    ratioOfCorrectRanks(positive_negativeRatings[0], positive_negativeRatings[1])
                )
                .mean()
            )

In [16]:
auc = calculateAUC(testData, bAllItemIDs, cfModel.predictAll)
print("auc=",auc)

auc= 0.8073447229476185


## Prueba del modelo con diferentes usuarios

In [17]:
recommended = map(lambda x: x.product, cfModel.recommendProducts(1059637, 5))
for i, artist in enumerate(recommended):
    print ("Artist %s: %s" % (i, artistData.lookup(artist)[0],))

Artist 0: The Starting Line
Artist 1: From Autumn to Ashes
Artist 2: New Found Glory
Artist 3: Taking Back Sunday
Artist 4: My Chemical Romance


In [18]:
recommended = map(lambda x: x.product, cfModel.recommendProducts(2064012, 5))
for i, artist in enumerate(recommended):
    print ("Artist %s: %s" % (i, artistData.lookup(artist)[0],))

Artist 0: Alanis Morissette
Artist 1: Pixies
Artist 2: The Offspring
Artist 3: Rage Against the Machine
Artist 4: Nirvana


In [19]:
spark.stop()

<h1 style="text-align:center"> FIN </h1>

FIN DEL TP