In [1]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


In [89]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local [2]").getOrCreate()

In [90]:
data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")

## Normalization could be done by next function

In [91]:
def norm(df, key1, key2, field, n): 
    
    window = Window.partitionBy(key1).orderBy(col(field).desc())
        
    topsDF = df.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= n) \
        .drop(col("row_number")) 
        
    tmpDF = topsDF.groupBy(col(key1)).agg(col(key1), sum(col(field)).alias("sum_" + field))
   
    normalizedDF = topsDF.join(tmpDF, key1, "inner") \
        .withColumn("norm_" + field, col(field) / col("sum_" + field)) \
        .cache()

    return normalizedDF

In [109]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, sum,desc,when,col, rank,lit,round,rank

user = 776748

alpha = 0.15
beta_user_artist = 0.5
beta_user_track = 0.5
beta_track_track = 1
beta_artist_track = 1

## Task 1: track track

In [110]:
data_b = data.withColumnRenamed("trackId","b_trackId")

train_data=data.alias("a").join(data_b.alias("b"),"userId")\
.where("a.trackId!=b.b_trackId")\
.withColumn("op_weight",col("b.timestamp")-col("a.timestamp"))\
.where("abs(b.timestamp - a.timestamp) <= 420")

track = train_data.groupBy(col("trackId"),col("b_trackId")).count()
        
trackNorm = norm(track, "trackId", "b_trackId", "count", 1000) \
        .withColumn("source", col("trackId")) \
        .withColumn("target", col("b_trackId")) \
        .withColumn("next_val", col("norm_count") * beta_track_track) \
        .select(col("source"), col("target"), col("next_val"))     

track_track = trackNorm\
    .select(col("source"), col("target"),col("next_val"))\
    .cache()

## Task 2: user track

In [111]:
users = data.groupBy("userId","trackId").count()
user_track = norm(users, "userId", "trackId", "count", 1000)\
            .withColumn("next_val", col("norm_count") * beta_user_track)\
            .select(col("userId"), col("trackId"),col("next_val"))\
            .withColumnRenamed("userId","source")\
            .withColumnRenamed("trackId","target")\
            .cache()

## Task 3: user artist

In [112]:
users = data.groupBy("userId","artistId").count()
user_artist = norm(users, "userId", "artistId", "count", 1000)\
            .withColumn("next_val", col("norm_count") * beta_user_artist)\
            .select(col("userId"), col("artistId"),col("next_val"))\
            .withColumnRenamed("userId","source")\
            .withColumnRenamed("artistId","target")\
            .cache()

## Task 4: artist track

In [113]:
users = data.groupBy("artistId","trackId").count()
artist_track = norm(users, "artistId", "trackId", "count", 100)\
            .withColumn("next_val", col("norm_count") * beta_artist_track)\
            .select(col("artistId"), col("trackId"),col("next_val"))\
            .withColumnRenamed("artistId","source")\
            .withColumnRenamed("trackId","target")\
            .cache()

## Task 5: user 776748

In [114]:
user_only = data.filter(col("userId") == user).withColumn("val",lit(1.0))

user_onlyArtist = user_only.alias("a")\
                .select(col("artistId").alias("id"),"val")\
                .distinct()
    
user_onlyTrack = user_only.alias("b")\
                .select(col("trackId").alias("id"),"val")\
                .distinct()                

## Task 6

In [115]:
edges = track_track\
        .union(user_track)\
        .union(user_artist)\
        .union(artist_track)\
        .cache()

In [116]:
users_v = data\
        .select("userId")\
        .withColumnRenamed("userId","id")\
        .distinct()\
        .withColumn("v", when(col("id") == user, 1.0).otherwise(0.0))

In [117]:
tracks = data\
        .select(col("trackId").alias("id"))\
        .distinct()\
        .join(user_onlyTrack, "id", "left")\
        .withColumn("v", when(col("val").isNull(), 0.0).otherwise(col("val")))\
        .select("id", "v")
         

In [118]:
artists = data\
        .select(col("artistId").alias("id"))\
        .distinct()\
        .join(user_onlyArtist, "id", "left")\
        .withColumn("v", when(col("val").isNull(), 0.0).otherwise(col("val")))\
        .select("id", "v")

In [119]:
x = users_v.union(artists)\
        .union(tracks)\

In [120]:
u = x.withColumn("u_idx", when(col("id") == user, 1.0).otherwise(0.0))\
     .select(col("id"), col("u_idx")) \
     .cache()

In [121]:
for _ in range(5):
    next_v = x.join(edges, x["id"]==edges["source"], "left")\
            .na.fill(0.0, ["next_val"])\
            .withColumn("acc", col("v") * col("next_val"))\
            .groupBy("target")\
            .agg(sum("acc").alias("sigma"))

    x = u.join(next_v,u["id"]==next_v["target"], "left")\
        .na.fill(0.0, ["sigma"])\
        .withColumn("next_val", alpha*col("u_idx") + (1-alpha) * col("sigma"))\
        .select(col("id"), col("next_val").alias("v"))\
        .cache()

In [122]:
window = Window.orderBy(col("v").desc())
        
#.orderBy(col("v").desc())        
results = x.where(col("id") != user)\
.join(meta, "id")\
.withColumn("rank", rank().over(window))\
.select(col("Name"), col("Artist"), round(col("v"), 5))\
.where("rank <= 40").take(40)

In [123]:
for name, artist, V in results:
    print("{} {} {}".format(name, artist, V))

Kill The DJ Artist: Green Day 1.42809
Come Out and Play Artist: The Offspring 1.37473
I Hate Everything About You Artist: Three Days Grace 1.37362
Prayer Of The Refugee Artist: Rise Against 1.35278
Eagle Artist: Gotthard 1.21412
21 Guns Artist: Green Day 1.17302
Beautiful disaster Artist: 311 0.92155
Wait And Bleed Artist: Slipknot 0.92155
Here To Stay Artist: Korn 0.91653
Hard Rock Hallelujah Artist: Lordi 0.91653
Nothing Going On Artist: Clawfinger 0.80983
In The End Artist: Linkin Park 0.80292
Numb Artist: Linkin Park 0.80292
Sky is Over Artist: Serj Tankian 0.68799
Kryptonite Artist: 3 Doors Down 0.68799
Take It Out On Me Artist: Thousand Foot Krutch 0.47024
Girls and Boys Artist: Blur 0.40245
Cocaine Artist: Nomy 0.20893
Getting Away With Murder Artist: Papa Roach 0.20648
Artist: Green Day Artist: Green Day 0.01181
Artist: Clawfinger Artist: Clawfinger 0.00472
Artist: The Offspring Artist: The Offspring 0.00472
Artist: Linkin Park Artist: Linkin Park 0.00472
The Vengeful One Artis