In [1]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()

In [2]:
data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")
dataforNext=sparkSession.read.parquet("/data/sample264")
dataforNext=dataforNext.select('userId',  \
                         dataforNext.trackId.alias('trackId1'), \
                         dataforNext.timestamp.alias('timestamp1'))

## Normalization could be done by next function

In [3]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, sum

def norm(df, key1, key2, field, n): 
    
    window = Window.partitionBy(key1).orderBy(col(field).desc())
        
    topsDF = df.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= n) \
        .drop(col("row_number")) 
        
    tmpDF = topsDF.groupBy(col(key1)).agg(col(key1), sum(col(field)).alias("sum_" + field))
   
    normalizedDF = topsDF.join(tmpDF, key1, "inner") \
        .withColumn("norm_" + field, col(field) / col("sum_" + field)) \
        .cache()

    return normalizedDF

In [4]:
from pyspark.sql import Window
from pyspark.sql.functions import col, rank,when

dataJoined = data.join(dataforNext, data.userId == dataforNext.userId )\
                       .filter(data.trackId!=dataforNext.trackId1) \
                       .filter((dataforNext.timestamp1-data.timestamp<=420) & (dataforNext.timestamp1-data.timestamp>0) ) \
                    .select(data.userId,  \
                         data.trackId.alias('trackId1'), \
                         dataforNext.trackId1.alias('trackId2')) \
                .withColumn("id", when((col('trackId2')>=col('trackId1')),col('trackId1')).otherwise(col('trackId2'))) \
                .withColumn("id2", when((col('trackId1')<=col('trackId2')),col('trackId2')).otherwise(col('trackId1'))) 
trackTrack = dataJoined.groupBy('id', 'id2').count()
trackTrackNorm = norm(trackTrack, "id","id2", "count", 40)


In [5]:
userTrack = data.groupBy(col("userId"), col("trackId")).count()

userTrackNorm = norm(userTrack, "userId", "trackId", "count", 1000) \
        .withColumn("id", col("userId")) \
        .withColumn("id2", col("trackId")) \
        .withColumn("norm_count", col("norm_count") * 0.5) \
        .select(col("id"), col("id2"), col("norm_count"))     


In [6]:
userArtist = data.groupBy(col("userId"), col("artistId")).count()

userArtistNorm = norm(userArtist, "userId", "artistId", "count", 100) \
        .withColumn("id", col("userId")) \
        .withColumn("id2", col("artistId")) \
        .withColumn("norm_count", col("norm_count") * 0.5) \
        .select(col("id"), col("id2"), col("norm_count"))    

In [7]:
artistTrack = data.groupBy(col("artistId"), col("trackId")).count()
artistTrackNorm = norm(artistTrack, "artistId", "trackId", "count", 100) \
        .withColumn("id", col("artistId")) \
        .withColumn("id2", col("trackId")) \
        .withColumn("norm_count", col("norm_count") * 0.5) \
        .select(col("id"), col("id2"), col("norm_count"))     



In [9]:
user=776748
userTrackMeta=userTrackNorm.join(meta,(userTrackNorm.id2==meta.Id) & (meta.type=='track')).filter(userTrackNorm.id==user)
userArtistMeta=userArtistNorm.join(meta,(userArtistNorm.id2==meta.Id)& (meta.type=='artist')).filter(userArtistNorm.id==user)

In [10]:
result=userArtistMeta.select(userArtistMeta.Artist,userArtistMeta.Name)\
        .union(userTrackMeta.select(userTrackMeta.Artist,userTrackMeta.Name))\
        .sort(["Artist","Name"],ascending=[1,1]).take(40)

In [11]:
for val in result:
    print "%s %s" % val

Artist: 3 Doors Down Artist: 3 Doors Down
Artist: 3 Doors Down Kryptonite
Artist: 311 Artist: 311
Artist: 311 Beautiful disaster
Artist: Blur Artist: Blur
Artist: Blur Girls and Boys
Artist: Clawfinger Artist: Clawfinger
Artist: Clawfinger Nothing Going On
Artist: Disturbed Artist: Disturbed
Artist: Disturbed The Vengeful One
Artist: Gotthard Artist: Gotthard
Artist: Gotthard Eagle
Artist: Green Day 21 Guns
Artist: Green Day Artist: Green Day
Artist: Green Day Kill The DJ
Artist: Iggy Pop Artist: Iggy Pop
Artist: Iggy Pop Sunday
Artist: Korn Artist: Korn
Artist: Korn Here To Stay
Artist: Linkin Park Artist: Linkin Park
Artist: Linkin Park In The End
Artist: Linkin Park Numb
Artist: Lordi Artist: Lordi
Artist: Lordi Hard Rock Hallelujah
Artist: Nickelback Artist: Nickelback
Artist: Nickelback She Keeps Me Up
Artist: Nomy Artist: Nomy
Artist: Nomy Cocaine
Artist: Papa Roach Artist: Papa Roach
Artist: Papa Roach Getting Away With Murder
Artist: Rise Against Artist: Rise Against
Artist: Ri