In [95]:
from pyspark.sql import Window, SparkSession
from pyspark.sql.functions import explode, collect_list, size, col, row_number,lit,udf,sum
from pyspark.sql.types import ArrayType,IntegerType
import os


In [96]:
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))
sparkSession = SparkSession.builder.enableHiveSupport().master("local [2]").getOrCreate()
basedf = sparkSession.read.parquet("/data/sample264").repartition(40)

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


# Task 1
#####  Build the edges of the type “track-track”. To do it you will need to count the collaborative similarity between all the tracks: if a user has started listening to track B within 7 minutes after starting track A, then you should add 1 to the weight of the edge from vertex A to vertex B (initial weight is equal to 0). 
##### Sort the resulting Data Frame in the descending order by the column norm_weight, and then in the ascending order this time first by “id1”, then by “id2”. Take top 40 rows, select only the columns “id1”, “id2”, and print the columns “id1”, “id2” of the resulting dataframe.

In [71]:
def weight(tm1,tm2):
    if (tm2-tm1)<7*60:
        return 1
    return 0

weightCal = udf(lambda x,y:weight(x,y),IntegerType())

In [81]:
joinDF = basedf.alias("a").join(basedf.alias("b"), on="userId") \
    .filter("a.trackId!=b.trackId and a.timestamp<b.timestamp") \
    .withColumn("weight",weightCal("a.timestamp","b.timestamp")).cache()
weightedDF = joinDF.groupBy("a.trackId","b.trackId").sum("weight").select("a.trackId","b.trackId",col("sum(weight)").alias("grp_weight")).cache()

In [82]:
## Window function 1: top 50 & 2: top 40
window = Window.partitionBy("a.trackId").orderBy(col("grp_weight").desc())
part = Window.partitionBy("a.trackId")
orderW = Window.orderBy(col("norm_W").desc())

"""
1. window for
"""
task1DF= weightedDF.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= 50) \
        .withColumn("sct_weight", sum(col("grp_weight")).over(part)) \
        .withColumn("norm_w", col("grp_weight")/col("sct_weight")) \
        .withColumn("ordered_rnum", row_number().over(orderW)) \
        .filter(col("ordered_rnum")<=40) \
        .select("a.trackId","b.trackId","norm_w").cache()

In [83]:
#task1DF.groupBy("a.trackId").sum("norm_w").show()
trackToTrack = task1DF.orderBy(col("norm_w").desc(),col("a.trackId"),col("b.trackId")).select("a.trackId","b.trackId")
trackToTrack.show()

+-------+-------+
|trackId|trackId|
+-------+-------+
| 803197| 810149|
| 803540| 904830|
| 807146| 818096|
| 813285| 935934|
| 814192| 845924|
| 816875| 821157|
| 818210| 831204|
| 824552| 913245|
| 829407| 848614|
| 831647| 922317|
| 832512| 919212|
| 833948| 936902|
| 833951| 865598|
| 835491| 888819|
| 839453| 902331|
| 841537| 824306|
| 842850| 940362|
| 844651| 897648|
| 845675| 966405|
| 847624| 805686|
+-------+-------+
only showing top 20 rows



# task 2
##### Build the edges of the type “user-track”. Take the amount of times the track was listened by the user as the weight of the edge from the user’s vertex to the track’s vertex.
##### For each user take top-1000 and normalize them.
##### Sort the resulting Data Frame in descending order by the column norm_weight, and then in ascending order this time first by “id1”, then by “id2”. Take top 40 rows, select only the columns “id1”, “id2”, and print the columns “id1”, “id2” of the resulting dataframe.

In [80]:
UTdf = basedf.groupBy("userId","trackId").count()
win = Window.partitionBy("userId").orderBy(col("count").desc())
part = Window.partitionBy("userId")

task2DF  = UTdf.withColumn("row_num",row_number().over(win)).filter("row_num<=1000") \
    .withColumn("wt_user",sum("count").over(part)) \
    .withColumn("norm_weight",col("count")/col("wt_user")) \
    .orderBy(col("norm_weight").desc(),"userId","trackId").cache()

task2DF.show(40)

+------+-------+-----+-------+-------+-----------+
|userId|trackId|count|row_num|wt_user|norm_weight|
+------+-------+-----+-------+-------+-----------+
|    66| 965774|    1|      1|      1|        1.0|
|   116| 867268|    1|      1|      1|        1.0|
|   128| 852564|    1|      1|      1|        1.0|
|   131| 880170|    1|      1|      1|        1.0|
|   195| 946408|    5|      1|      5|        1.0|
|   215| 860111|    1|      1|      1|        1.0|
|   235| 897176|    1|      1|      1|        1.0|
|   300| 857973|    1|      1|      1|        1.0|
|   321| 915545|    1|      1|      1|        1.0|
|   328| 943482|    1|      1|      1|        1.0|
|   333| 818202|    1|      1|      1|        1.0|
|   346| 864911|    1|      1|      1|        1.0|
|   356| 961308|    1|      1|      1|        1.0|
|   428| 943572|    3|      1|      3|        1.0|
|   431| 902497|    1|      1|      1|        1.0|
|   445| 831381|    1|      1|      1|        1.0|
|   488| 841340|    2|      1| 

# Task 3
###### Build the edges of the type “user-artist”. Take the amount of times the user has listened to the artist’s tracks as the weight of the edge from the user’s vertex to the artist’s vertex.
###### Sort the resulting Data Frame in descending order by the column norm_weight, and then in ascending order this time first by “id1”, then by “id2”. Take top 40 rows, select only the columns “id1”, “id2”, and print the columns “id1”, “id2” of the resulting dataframe.

In [104]:
artWindow = Window.partitionBy("userId").orderBy(col("count").desc())
wtArt = Window.partitionBy("userId")
usartDF= basedf.groupBy("userId","artistId").count().withColumn("row_num",row_number().over(artWindow)) \
    .filter("row_num <= 100").withColumn("wt_user",sum("count").over(wtArt)) \
    .withColumn("norm_wt",col("count")/col("wt_user")) \
    .orderBy(col("norm_wt").desc(),"userId","artistId").cache()

In [103]:
basedf.filter("userId==131").show()

+------+-------+--------+----------+
|userId|trackId|artistId| timestamp|
+------+-------+--------+----------+
|   131| 880170|  983068|1501576153|
+------+-------+--------+----------+



In [105]:
#usartDF.filter("count!=wt_user").show(40)
usartDF.show(40)

+------+--------+-----+-------+-------+-------+
|userId|artistId|count|row_num|wt_user|norm_wt|
+------+--------+-----+-------+-------+-------+
|    66|  993426|    1|      1|      1|    1.0|
|   116|  974937|    1|      1|      1|    1.0|
|   128| 1003021|    1|      1|      1|    1.0|
|   131|  983068|    1|      1|      1|    1.0|
|   195|  997265|    5|      1|      5|    1.0|
|   215|  991696|    1|      1|      1|    1.0|
|   235|  990642|    1|      1|      1|    1.0|
|   288| 1000564|   13|      1|     13|    1.0|
|   300| 1003362|    1|      1|      1|    1.0|
|   321|  986172|    1|      1|      1|    1.0|
|   328|  967986|    1|      1|      1|    1.0|
|   333| 1000416|    1|      1|      1|    1.0|
|   346|  982037|    1|      1|      1|    1.0|
|   356|  974846|    1|      1|      1|    1.0|
|   374| 1003167|    4|      1|      4|    1.0|
|   428|  993161|    3|      1|      3|    1.0|
|   431|  969340|    1|      1|      1|    1.0|
|   445|  970387|    1|      1|      1| 

# Task 4
###### Build the edges of the type “artist-track”. Take the amount of times the track HAS BEEN listened by all users as the weight of the edge from the artist’s vertex to the track’s vertex.
###### Sort the resulting Data Frame in descending order by the column norm_weight, and then in ascending order this time first by “id1”, then by “id2”. Take top 40 rows, select only the columns “id1”, “id2”, and print the columns “id1”, “id2” of the resulting dataframe.

In [107]:
artTrackWindow = Window.partitionBy("artistId").orderBy(col("count").desc())
partArt = Window.partitionBy("artistId")
artTracDF= basedf.groupBy("artistId","trackId").count().withColumn("row_num",row_number().over(artTrackWindow)) \
    .filter("row_num <= 100").withColumn("wt_user",sum("count").over(partArt)) \
    .withColumn("norm_wt",col("count")/col("wt_user")) \
    .orderBy(col("norm_wt").desc(),"artistId","trackId").cache()

In [108]:
artTracDF.show(40)

+--------+-------+-----+-------+-------+-------+
|artistId|trackId|count|row_num|wt_user|norm_wt|
+--------+-------+-----+-------+-------+-------+
|  967993| 869415|    2|      1|      2|    1.0|
|  967998| 947428|    3|      1|      3|    1.0|
|  968004| 927380|    1|      1|      1|    1.0|
|  968017| 859321|    1|      1|      1|    1.0|
|  968022| 852786|    1|      1|      1|    1.0|
|  968034| 807671|    1|      1|      1|    1.0|
|  968038| 964150|    1|      1|      1|    1.0|
|  968042| 835935|    1|      1|      1|    1.0|
|  968043| 913568|    1|      1|      1|    1.0|
|  968046| 935077|    1|      1|      1|    1.0|
|  968047| 806127|    3|      1|      3|    1.0|
|  968065| 907906|   12|      1|     12|    1.0|
|  968073| 964586|    1|      1|      1|    1.0|
|  968086| 813446|    3|      1|      3|    1.0|
|  968092| 837129|    4|      1|      4|    1.0|
|  968118| 914441|    2|      1|      2|    1.0|
|  968125| 821410|    1|      1|      1|    1.0|
|  968140| 953008|  