In [1]:
import pyspark.conf
import pyspark.sql
SparkConf = pyspark.conf.SparkConf
SparkSession = pyspark.sql.SparkSession
spark = SparkSession.builder \
            .appName("Intro") \
            .config('spark.executor.memory', '2g') \
            .config('spark.driver.memory','8g') \
            .config("spark.sql.crossJoin.enabled", "true")\
            .getOrCreate()

In [2]:
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.types import IntegerType

In [3]:
from pyspark.ml.recommendation import ALS
import random

In [4]:
datadir = '/home/ubuntu/profiledata_06-May-2005/'

In [5]:
base = datadir
rawUserArtistData = spark.read.text(base + "user_artist_data.txt")
rawArtistData = spark.read.text(base + "artist_data.txt")
rawArtistAlias = spark.read.text(base + "artist_alias.txt")

### preparation

In [6]:
for _ in rawUserArtistData.take(5):
    print(_)

Row(value='1000002 1 55')
Row(value='1000002 1000006 33')
Row(value='1000002 1000007 8')
Row(value='1000002 1000009 144')
Row(value='1000002 1000010 314')


In [7]:
def fx(row):
    cols = row.value.split(' ')
    user, artist = cols[:2]
    return int(user), int(artist)
# end def
    
userArtistDF = rawUserArtistData.rdd.map(fx).toDF(["user", "artist"])

In [8]:
userArtistDF.agg(F.min("user"), F.max("user"), F.min("artist"), F.max("artist")).show()

+---------+---------+-----------+-----------+
|min(user)|max(user)|min(artist)|max(artist)|
+---------+---------+-----------+-----------+
|       90|  2443548|          1|   10794401|
+---------+---------+-----------+-----------+



In [9]:
def buildArtistByID(rawArtistData):
    def func(row):
        try:
            (_id, name) = row.value.split('\t')
        except ValueError:
            return None, None
        # end try
        if (name.strip() == ''):
            return None, None
        else:
            try:
                return int(_id), name.strip()
            except:
                return None, None
            # end try
        # end if
    # end def
    return rawArtistData.rdd.map(func).toDF(["id", "name"]).na.drop()
# end def

In [10]:
def buildArtistAlias(rawArtistAlias):
    def func(row):
        try:
            artist, alias = row.value.split('\t')
        except ValueError:
            return None, None
        # end try
        if (artist.strip()==''):
            return None, None
        else:
            return int(artist), int(alias)
        # end if
    # end def
    return dict(rawArtistAlias.rdd.map(func).collect())
# end def

In [11]:
artistByID = buildArtistByID(rawArtistData)
artistAlias = buildArtistAlias(rawArtistAlias)

In [12]:
(badID, goodID) = next(iter(artistAlias.items()))

In [13]:
artistByID.filter(F.col('id').isin([badID, goodID])).show()

+-------+------------------+
|     id|              name|
+-------+------------------+
|1109457|             P.O.S|
|2097152|DJ Tiesto -  P.O.S|
+-------+------------------+



### model

In [14]:
bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))

In [15]:
def buildCounts(rawUserArtistData, bArtistAlias):
    def func(row):
        try:
            userID, artistID, count = map(int, row.value.split(' '))
            finalArtistID = bArtistAlias.value.get(artistID, artistID)
            return (userID, finalArtistID, count)
        except ValueError:
            return None, None, None
        # end try
    # end def
    return rawUserArtistData.rdd.map(func).toDF(["user", "artist", "count"]).na.drop()
# end def
trainData = buildCounts(rawUserArtistData, bArtistAlias).cache()

In [16]:
model = ALS().\
    setSeed(random.randrange(0,10000000)).\
    setImplicitPrefs(True).\
    setRank(10).\
    setRegParam(0.01).\
    setAlpha(1.0).\
    setMaxIter(5).\
    setUserCol("user").\
    setItemCol("artist").\
    setRatingCol("count").\
    setPredictionCol("prediction").\
    fit(trainData)

In [17]:
trainData.unpersist()

DataFrame[user: bigint, artist: bigint, count: bigint]

In [18]:
model.userFactors.select("features").show(truncate = False)

+------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                  |
+------------------------------------------------------------------------------------------------------------------------------------------+
|[-0.53729624, -0.028494425, -0.06313752, -0.19646008, -0.26367348, 0.023305353, 0.07086386, 0.24986394, 0.44049087, 1.3109902]            |
|[-0.007965173, 0.06658514, -0.21245287, 0.16654468, -0.12804265, 0.21028997, -0.15430218, -0.020562911, 0.17293294, -0.13754143]          |
|[0.003127225, 0.0029438583, 0.001785691, 8.5407676E-4, -0.0012625128, 0.0056536254, 0.0030825643, 9.866614E-4, -0.0012622245, 0.006635723]|
|[-0.91178507, -0.062251084, -0.31099108, -0.73389816, -1.2645013, 0.14698508, -0.73243695, 0.15284924, 0.43211594, 1.0212367]             |
|[-0.7790672,

In [19]:
userID = 2093760

existingArtistIDs = trainData.\
    filter(F.col("user") == userID).rdd.\
    map(lambda row: int(row.artist)).collect()

In [20]:
artistByID = buildArtistByID(rawArtistData)

In [21]:
artistByID.filter(F.col("id").isin(existingArtistIDs)).show()

+-------+---------------+
|     id|           name|
+-------+---------------+
|   1180|     David Gray|
|    378|  Blackalicious|
|    813|     Jurassic 5|
|1255340|The Saw Doctors|
|    942|         Xzibit|
+-------+---------------+



In [22]:
toRecommend = model.itemFactors.\
        select(F.col("id").alias("artist")).\
        withColumn("user", F.lit(userID))

In [23]:
def makeRecommendations(model, userID, howMany):
    toRecommend = model.itemFactors.\
        select(F.col("id").alias("artist")).\
        withColumn("user", F.lit(userID))
    ans = model.transform(toRecommend).\
        select(["artist", "prediction"]).\
        orderBy(F.col("prediction"), ascending=False).\
        limit(howMany)
    return ans
# end def

In [24]:
topRecommendations = makeRecommendations(model, userID, 5)
topRecommendations.show()

+-------+-----------+
| artist| prediction|
+-------+-----------+
|   2814|0.029694578|
|1001819|0.029153192|
|1300642|0.029045615|
|   4605|0.027700486|
|1037970| 0.02766227|
+-------+-----------+



In [None]:
recommendedArtistIDs = topRecommendations.select("artist").rdd.map(lambda row: int(row['artist'])).collect()

In [28]:
artistByID.filter(F.col("id").isin(recommendedArtistIDs)).show()

+-------+----------+
|     id|      name|
+-------+----------+
|   2814|   50 Cent|
|   4605|Snoop Dogg|
|   1811|   Dr. Dre|
|1001819|      2Pac|
|1300642|  The Game|
+-------+----------+



In [29]:
model.userFactors.unpersist()
model.itemFactors.unpersist()

DataFrame[id: int, features: array<float>]

### evaluate

In [30]:
bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))

In [31]:
allData = buildCounts(rawUserArtistData, bArtistAlias)

In [32]:
trainData, cvData = allData.randomSplit([0.9, 0.1])

In [33]:
trainData.cache()
cvData.cache()

DataFrame[user: bigint, artist: bigint, count: bigint]

In [34]:
allArtistIDs = allData.select("artist").rdd.map(lambda row: int(row["artist"])).distinct().collect()

In [35]:
bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs)

In [36]:
positiveData = cvData

In [37]:
def predictMostListened(train):
    listenCounts = train.groupBy("artist").\
        agg(F.sum("count").alias("prediction")).\
        select(["artist", "prediction"])
    def func(allData):
        return allData.\
          join(listenCounts, ["artist"], "left_outer").\
          select(["user", "artist", "prediction"])
    # end def
    return func
# end def
predictFunction = predictMostListened(trainData)

In [116]:
def areaUnderCurve(positiveData, bAllArtistIDs, predictFunction):
    positivePredictions = predictFunction(positiveData.select(["user", "artist"])).\
    withColumnRenamed("prediction", "positivePrediction")

    # BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    # small AUC problems, and it would be inefficient, when a direct computation is available.

    # Create a set of "negative" products for each user. These are randomly chosen
    # from among all of the other artists, excluding those that are "positive" for the user.
    def func(item):
        userID, userIDAndPosArtistIDs = item
        posItemIDSet = list(userIDAndPosArtistIDs[1])
        negative = []
        allArtistIDs = bAllArtistIDs.value
        i = 0
        # Make at most one pass over all artists to avoid an infinite loop.
        # Also stop when number of negative equals positive set size
        while (i < len(allArtistIDs) and len(negative) < len(posItemIDSet)):
            artistID = random.choice(allArtistIDs)
            # Only add distinct IDs
            if (artistID not in posItemIDSet):
                negative.append(artistID)
            # end def
            i += 1
        # end while
        # Return the set with user ID added back
        return (userID, artistID)
    # end def

    # BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    # small AUC problems, and it would be inefficient, when a direct computation is available.

    # Create a set of "negative" products for each user. These are randomly chosen
    # from among all of the other artists, excluding those that are "positive" for the user.
    negativeData = positiveData.select(["user", "artist"]).rdd.\
        map(lambda row: (int(row['user']), int(row['artist']))).\
        groupByKey().map(lambda item: (item[0], (item[0], item[1]))).\
        map(func).toDF(["user", "artist"])

    # Make predictions on the rest:
    negativePredictions = predictFunction(negativeData).\
        withColumnRenamed("prediction", "negativePrediction")

    # Join positive predictions to negative predictions by user, only.
    # This will result in a row for every possible pairing of positive and negative
    # predictions within each user.
    joinedPredictions = positivePredictions.join(negativePredictions, "user").\
        select(["user", "positivePrediction", "negativePrediction"]).cache()

    # Count the number of pairs per user
    allCounts = joinedPredictions.\
        groupBy("user").agg(F.count(F.lit("1")).alias("total")).\
        select(["user", "total"])
    # Count the number of correctly ordered pairs per user
    correctCounts = joinedPredictions.\
        filter(F.col("positivePrediction") > F.col("negativePrediction")).\
        groupBy("user").agg(F.count("user").alias("correct")).\
        select(["user", "correct"])

    # Combine these, compute their ratio, and average over all users
    meanAUC = allCounts.join(correctCounts, ["user"], "left_outer").\
        select("user", (F.coalesce(F.col("correct"), F.lit(0)) / F.col("total")).alias("auc")).\
        agg(F.mean("auc"))

    meanAUC = meanAUC.collect()[0]
    meanAUC = list(meanAUC.asDict().values())[0]
    
    joinedPredictions.unpersist()
    return meanAUC
# end def

In [117]:
mostListenedAUC = areaUnderCurve(cvData, bAllArtistIDs, predictMostListened(trainData))
print(mostListenedAUC)

0.8782432847197721


In [120]:
evaluations = []
for rank in (5, 30):
    for regParam in (1.0, 0.0001):
        for alpha in (1.0, 40.0):
            model = ALS().\
                setSeed(random.randrange(0,10000000)).\
                setImplicitPrefs(True).\
                setRank(rank).setRegParam(regParam).\
                setAlpha(alpha).setMaxIter(20).\
                setUserCol("user").setItemCol("artist").\
                setRatingCol("count").setPredictionCol("prediction").\
                fit(trainData)

            auc = areaUnderCurve(cvData, bAllArtistIDs, model.transform)

            model.userFactors.unpersist()
            model.itemFactors.unpersist()

            ans = (auc, (rank, regParam, alpha))
            evaluations.append(ans)
        # end for
    # end for
# end for

for _ in reversed(sorted(evaluations)):
    print(_)
# end for

trainData.unpersist()
cvData.unpersist()

Py4JJavaError: An error occurred while calling o2522.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5154.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5154.0 (TID 10104, localhost, executor driver): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1915)
	at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1124)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1117)
	at org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1712)
	at org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1653)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:983)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:970)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:970)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:676)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:658)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:658)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:569)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1915)
	at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [58]:
def recommend(rawUserArtistData, rawArtistData, rawArtistAlias):

    bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))
    allData = buildCounts(rawUserArtistData, bArtistAlias).cache()
    model = ALS().\
        setSeed(random.randrange(0,10000000)).\
        setImplicitPrefs(True).\
        setRank(10).setRegParam(1.0).setAlpha(40.0).setMaxIter(20).\
        setUserCol("user").setItemCol("artist").\
        setRatingCol("count").setPredictionCol("prediction").\
        fit(allData)
    allData.unpersist()

    userID = 2093760
    topRecommendations = makeRecommendations(model, userID, 5)

    recommendedArtistIDs = topRecommendations.select("artist").rdd.map(lambda row: int(row["artist"])).collect()
    artistByID = buildArtistByID(rawArtistData)
    artistByID.join(spark.createDataFrame(recommendedArtistIDs, IntegerType()).toDF("id"), "id").\
        select("name").show()

    model.userFactors.unpersist()
    model.itemFactors.unpersist()
# end def
recommend(rawUserArtistData, rawArtistData, rawArtistAlias)

+-----------+
|       name|
+-----------+
|  Green Day|
|  [unknown]|
|The Beatles|
|     Eminem|
|         U2|
+-----------+

