In [1]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Window

execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


In [2]:
sparkSession = SparkSession.builder.enableHiveSupport().master("local [2]").getOrCreate()

data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")

In [3]:
userToTrackWindow1 = Window.partitionBy('userId').orderBy(col('count').desc())
userToTrackWindow2 = Window.partitionBy('userId')

userToTrack_tmp = data.groupBy(col("userId"), col("trackId")) \
                      .count() \
                      .withColumn("rank", rank().over(userToTrackWindow1)) \
                      .filter(col("rank") <= 1000) \
                      .withColumn('sum_count', sum(col('count')).over(userToTrackWindow2)) \
                      .cache()
                    
userToTrack = userToTrack_tmp.withColumn('norm_count', col('count') / col('sum_count') * 0.4) \
                             .select(col("userId").alias("id1"), col("trackId").alias("id2"), col("norm_count").alias("weight"))

In [4]:
userToTrack2 = userToTrack_tmp.withColumn('norm_count', col('count') / col('sum_count') * 1.0) \
                              .select(col("userId").alias("id1"), col("trackId").alias("id2"), col("norm_count").alias("weight"))

In [5]:
userToArtistWindow1 = Window.partitionBy('userId').orderBy(col('count').desc())
userToArtistWindow2 = Window.partitionBy('userId')

userToArtist = data.groupBy(col("userId"), col("artistId")) \
                   .count() \
                   .withColumn("rank", rank().over(userToArtistWindow1)) \
                   .filter(col("rank") <= 100) \
                   .withColumn('sum_count', sum(col('count')).over(userToArtistWindow2)) \
                   .withColumn('norm_count', col('count') / col('sum_count') * 0.6) \
                   .select(col("userId").alias("id1"), col("artistId").alias("id2"), col("norm_count").alias("weight")) \
                   .cache()

In [6]:
artistToTrackWindow1 = Window.partitionBy('artistId').orderBy(col('count').desc())
artistToTrackWindow2 = Window.partitionBy('artistId')

artistToTrack = data.groupBy(col("artistId"), col("trackId")) \
                    .count() \
                    .withColumn("rank", rank().over(artistToTrackWindow1)) \
                    .filter(col("rank") <= 100) \
                    .withColumn('sum_count', sum(col('count')).over(artistToTrackWindow2)) \
                    .withColumn('norm_count', col('count') / col('sum_count') * 0.7) \
                    .select(col("artistId").alias("id1"), col("trackId").alias("id2"), col("norm_count").alias("weight")) \
                    .cache()

In [7]:
data2 = data.select(col('userId').alias('userId'), 
                    col('trackId').alias('trackId2'), 
                    col('artistId').alias('artistId2'), 
                    col('timestamp').alias('timestamp2'))

In [8]:
trackToTrackWindow1 = Window.partitionBy('trackId').orderBy(col('count').desc())
trackToTrackWindow2 = Window.partitionBy('trackId')

trackToTrak = data.join(data2, 'userId', 'inner') \
                  .filter((col('trackId') < col('trackId2')) & (abs(col('timestamp') - col('timestamp2')) < 7 * 60)) \
                  .groupBy(col('trackId'), col('trackId2')) \
                  .count() \
                  .withColumn("rank", rank().over(trackToTrackWindow1)) \
                  .filter(col("rank") <= 40) \
                  .withColumn('sum_count', sum(col('count')).over(trackToTrackWindow2)) \
                  .withColumn('norm_count', col('count') / col('sum_count') * 0.3) \
                  .select(col("trackId").alias("id1"), col("trackId2").alias("id2"), col("norm_count").alias("weight")) \
                  .cache()

In [9]:
init_vector = userToTrack.union(userToArtist) \
                         .join(meta.withColumn("id2", col("id")), on="id2", how="inner")\
                         .filter( col("id") == 776748 )\
                         .orderBy(col("Artist"), col("Name"))\
                         .withColumn("weight", lit(1.))\
                         .select(col("id2").alias("id1"), col("weight").alias("vertex_weight"))
                    
user_vector = spark.createDataFrame(data=[(776748, 1.)], schema=init_vector.schema)

In [10]:
edges = userToTrack.union(userToArtist) \
                   .union(userToTrack2) \
                   .union(artistToTrack) \
                   .union(trackToTrak)

In [11]:
x = init_vector
alpha = .15

for i in range(5):
    step = x.join(edges, on="id1", how="inner") \
            .withColumn("_weight", col("weight") * col("vertex_weight") * ( 1 - alpha )) \
            .select(col("id2").alias("id1"), col("_weight").alias("vertex_weight"))

    x = user_vector.withColumn("_weight", col("vertex_weight") * alpha ) \
                   .select(col("id1"), col("_weight").alias("vertex_weight")) \
                   .union(step) 

In [12]:
reccomendations = x.select(col("id1").alias("id"), col("vertex_weight")) \
                   .join(meta, on="id", how="inner") \
                   .filter(col("type") == 'track') \
                   .orderBy(col("vertex_weight").desc()) \
                   .select(col("Name"), col("Artist"), col("vertex_weight")) \
                   .take(40)

In [13]:
for val in reccomendations:
    print "%s %s %.5f" % val

Kill The DJ Artist: Green Day 0.01417
Come Out and Play Artist: The Offspring 0.00944
21 Guns Artist: Green Day 0.00944
Nothing Going On Artist: Clawfinger 0.00944
Kill The DJ Artist: Green Day 0.00567
Kill The DJ Artist: Green Day 0.00506
Take It Out On Me Artist: Thousand Foot Krutch 0.00472
Cocaine Artist: Nomy 0.00472
Prayer Of The Refugee Artist: Rise Against 0.00472
Getting Away With Murder Artist: Papa Roach 0.00472
I Hate Everything About You Artist: Three Days Grace 0.00472
Wait And Bleed Artist: Slipknot 0.00472
In The End Artist: Linkin Park 0.00472
Beautiful disaster Artist: 311 0.00472
Hard Rock Hallelujah Artist: Lordi 0.00472
Numb Artist: Linkin Park 0.00472
Sunday Artist: Iggy Pop 0.00472
Eagle Artist: Gotthard 0.00472
Girls and Boys Artist: Blur 0.00472
Sky is Over Artist: Serj Tankian 0.00472
She Keeps Me Up Artist: Nickelback 0.00472
Here To Stay Artist: Korn 0.00472
Kryptonite Artist: 3 Doors Down 0.00472
The Vengeful One Artist: Disturbed 0.00472
Come Out and Play 