#### This notebook has code for a song recommendation system using MapReduce framework on Spark. The dataset is the Million Songs dataset,obtained from Kaggle. It contains 1M records of 110K users. The key fields are user id, song id and playcount. The code includes 3 parts:
#### 1. Building a rating for each song based on user's playcount
#### 2. Identifying similar users who have liked the same song as a given user and recommending top 5 songs based on similar user preferences
#### 3. Computing cosine similarity metric for each user pair and identifying top 5 most similar user pairs

In [1]:
import findspark
findspark.init('C://apachespark')
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
sc = SparkContext(conf = conf)

In [2]:
triplet = sc.textFile(r"kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 

In [3]:
triplet.take(5)

[['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOBONKR12A58A7A7E0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOEGIYH12A6D4FC0E3', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOFLJQZ12A6D4FADA6', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOHTKMO12AB01843B0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SODQZCY12A6D4F9D11', '1']]

In [4]:
songs = sc.textFile(r"kaggle_songs.txt") \
    .map(lambda line: line.split(" ")) 

In [5]:
songs.take(5)

[['SOAAADD12AB018A9DD', '1'],
 ['SOAAADE12A6D4F80CC', '2'],
 ['SOAAADF12A8C13DF62', '3'],
 ['SOAAADZ12A8C1334FB', '4'],
 ['SOAAAFI12A6D4F9C66', '5']]

#### Rating for each song has been calculated as the playcount for the song to the total playcount of all songs listened to by the user

In [8]:
triplet.map(lambda x:(x[0],x[2])).reduceByKey(lambda x,y:int(x)+int(y)).take(5)

[('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', 54),
 ('f6e34f0a68d5ea1344511e33486f956de361db78', 219),
 ('bcb1e6d620cf522390d5c92bae26936928e0b588', 56),
 ('ed199f27a41066e37414c3fe9eefb2ae372b8819', 24),
 ('c1d24ce8cd80e40aa8d803d5ddfceb91a6b5d75d', 15)]

In [9]:
total=triplet.map(lambda x:(x[0],x[2])).reduceByKey(lambda x,y:int(x)+int(y))
triplet.map(lambda x:(x[0],[x[1],x[2]])).join(total).take(5)

[('bcb1e6d620cf522390d5c92bae26936928e0b588', (['25150', '26'], 56)),
 ('bcb1e6d620cf522390d5c92bae26936928e0b588', (['177172', '1'], 56)),
 ('bcb1e6d620cf522390d5c92bae26936928e0b588', (['212753', '8'], 56)),
 ('bcb1e6d620cf522390d5c92bae26936928e0b588', (['25890', '1'], 56)),
 ('bcb1e6d620cf522390d5c92bae26936928e0b588', (['259912', '1'], 56))]

In [10]:
triplet.map(lambda x:(x[0],[x[1],x[2]])).join(total)\
       .map(lambda x:((x[0],x[1][0][0]),[x[1][0][1],x[1][1]]))\
       .mapValues(lambda x:float(x[0])/x[1]).take(5)

[(('bcb1e6d620cf522390d5c92bae26936928e0b588', '25150'), 0.4642857142857143),
 (('bcb1e6d620cf522390d5c92bae26936928e0b588', '177172'),
  0.017857142857142856),
 (('bcb1e6d620cf522390d5c92bae26936928e0b588', '212753'), 0.14285714285714285),
 (('bcb1e6d620cf522390d5c92bae26936928e0b588', '25890'), 0.017857142857142856),
 (('bcb1e6d620cf522390d5c92bae26936928e0b588', '259912'),
  0.017857142857142856)]

In [11]:
rating=triplet.map(lambda x:(x[0],[x[1],x[2]])).join(total)\
              .map(lambda x:((x[0],x[1][0][0]),[x[1][0][1],x[1][1]]))\
              .mapValues(lambda x:round(float(x[0])/x[1],2))
triplet.map(lambda x:((x[0],x[1]),x[2])).join(rating).take(5)

[(('00f7c493ee64884998ea98d9f5bed87bc4a0afcf', '25150'), ('5', 0.28)),
 (('eead1f1e3ad91575346f9ce826ddaea19bff80ed', '25150'), ('4', 0.2)),
 (('5a7ee1e5519d1652b79d90f98f477654dcea8193', '25150'), ('1', 0.04)),
 (('9e8bb9a929188a6ffbd27c932df116029182a9ff', '25150'), ('1', 0.01)),
 (('6d6acd6ffddc69d0da4b9ceeed8de0a1f3a8e12d', '25150'), ('2', 0.04))]

In [55]:
rating_triplet= triplet.map(lambda x:((x[0],x[1]),x[2])).join(rating)\
               .map(lambda x:[x[0][0],x[0][1],x[1][0],x[1][1]])

In [56]:
rating_triplet.take(2)

[['00f7c493ee64884998ea98d9f5bed87bc4a0afcf', '25150', '5', 0.28],
 ['eead1f1e3ad91575346f9ce826ddaea19bff80ed', '25150', '4', 0.2]]

#### Recommended songs for a given user has been computed below using the below steps:
#### Identifying users who have liked the same song and rated it higher
#### Exploring other songs listened to by these users and recommending top 5 amongst them

In [57]:
user='00f7c493ee64884998ea98d9f5bed87bc4a0afcf'
song='25150'
user_rating=0.28

In [15]:
def liked(col):
    if((col[1] == song) and (col[3] > user_rating)):
        return True
    return False
users = rating_triplet.filter(liked) \
              .map(lambda line:(line[0],1) )

In [16]:
users.take(5)

[('7af67014f661a3aa87072cbbd2dddb7a72e496e0', 1),
 ('2c6580e546c87f828e08e9a75551a6e4dac69d3a', 1),
 ('4a16fd8943913c0268b360bd12f37a4736c2b897', 1),
 ('d19062a5a043da421e6027e4d554ca2a961e6414', 1),
 ('92e4a1223dbde968430e9e122b53f69e70fe2f94', 1)]

In [17]:
rating_triplet.map(lambda x:(x[0],[x[1],x[2],x[3]])).join(users).filter(lambda x:x[1][0][0]!=song).take(5)

[('4a16fd8943913c0268b360bd12f37a4736c2b897', (['25150', '10', 0.37], 1)),
 ('4a16fd8943913c0268b360bd12f37a4736c2b897', (['32544', '1', 0.04], 1))]

In [18]:
rating_triplet.map(lambda x:(x[0],[x[1],x[2],x[3]]))\
              .join(users).filter(lambda x:x[1][0][0]!=song)\
              .map(lambda x:(x[1][0][0],x[1][0][2]))\
              .reduceByKey(lambda x,y:round(x+y,2))\
              .takeOrdered(5,key=lambda x:-x[1])""

[('25150', 303.15),
 ('12985', 20.92),
 ('288653', 15.18),
 ('68212', 7.38),
 ('319911', 6.44)]

In [62]:
rating_triplet = sc.parallelize(rating_triplet.take(10000))
rating_triplet.take(5)

[['00f7c493ee64884998ea98d9f5bed87bc4a0afcf', '25150', '5', 0.28],
 ['eead1f1e3ad91575346f9ce826ddaea19bff80ed', '25150', '4', 0.2],
 ['5a7ee1e5519d1652b79d90f98f477654dcea8193', '25150', '1', 0.04],
 ['9e8bb9a929188a6ffbd27c932df116029182a9ff', '25150', '1', 0.01],
 ['6d6acd6ffddc69d0da4b9ceeed8de0a1f3a8e12d', '25150', '2', 0.04]]

#### Below code does the following:
#### Compute cosine similarity metric for each user pair
#### Identify unique user pair sets and showcase top 5 most similar user pair sets
#### Recommend songs for a given user based on similar users, identified through cosine similarity

In [63]:
def checkSimilar(row):
    if (row[1][0][0]!=row[1][1][0] and row[1][0][1]==row[1][1][1]):
        return True
    return False

In [93]:
user_pair = rating_triplet.map(lambda x:(1,x)).join(rating_triplet.map(lambda x:(1,x))).filter(checkSimilar)\
                                  .map(lambda x:((x[1][0][0],x[1][1][0]),(float(x[1][0][3]),float(x[1][1][3]))))

In [92]:
user_pair_temp = rating_triplet.map(lambda x:(1,x)).join(rating_triplet.map(lambda x:(1,x))).filter(checkSimilar)\
                                  .map(lambda x:((x[1][0][0],x[1][1][0]),1))\
                                  .reduceByKey(lambda x,y: x + y)

In [99]:
user_pair_final = user_pair.join(user_pair_temp)\
                            .filter(lambda x: x[1][1]>1)\
                            .map(lambda x: ((x[0][0], x[0][1]),(x[1][0][0],x[1][0][1])))

In [105]:
user_pair_final.take(5)

[(('51fb0c6f89320241c4daaf73a37d8e582d628935',
   '615c1f145c6cf993371658357b6ab73a4ac4131e'),
  (0.12, 0.03)),
 (('51fb0c6f89320241c4daaf73a37d8e582d628935',
   '615c1f145c6cf993371658357b6ab73a4ac4131e'),
  (0.12, 0.03)),
 (('40e43d7235c5bc26b510e8888b47892c2245e677',
   'cadde6d1f0ef29f3c6ef39b1c1346b70bb962f73'),
  (0.01, 0.5)),
 (('40e43d7235c5bc26b510e8888b47892c2245e677',
   'cadde6d1f0ef29f3c6ef39b1c1346b70bb962f73'),
  (0.01, 0.1)),
 (('e385bdf19e767b0f0fd0e2f944c4b4247d13f2e6',
   'f1078f1db06630f7f3dbaa5cee50b2a1c377f051'),
  (0.08, 0.04))]

In [106]:
def getCosine(value):
    a=value[0]
    b=value[1]
    return (a*b)
num=user_pair_final.mapValues(getCosine).reduceByKey(lambda x,y:x+y)
den1=user_pair_final.mapValues(lambda x:x[0]).mapValues(lambda x:x*x).reduceByKey(lambda x,y:x+y)
den2=user_pair_final.mapValues(lambda x:x[1]).mapValues(lambda x:x*x).reduceByKey(lambda x,y:x+y)

In [107]:
num.join(den1).join(den2).take(5)
#.mapValues(lambda x:x[0]).take(5)

[(('51fb0c6f89320241c4daaf73a37d8e582d628935',
   '615c1f145c6cf993371658357b6ab73a4ac4131e'),
  ((0.0072, 0.0288), 0.0018)),
 (('40e43d7235c5bc26b510e8888b47892c2245e677',
   'cadde6d1f0ef29f3c6ef39b1c1346b70bb962f73'),
  ((0.006, 0.0002), 0.26)),
 (('e385bdf19e767b0f0fd0e2f944c4b4247d13f2e6',
   'f1078f1db06630f7f3dbaa5cee50b2a1c377f051'),
  ((0.009600000000000001, 0.0128), 0.008)),
 (('3bac2b72d06b520b910025271889a796b153bd0d',
   'ce0cf4c545df0c6b67360c821161753343fd7880'),
  ((0.0112, 0.0164), 0.0097)),
 (('615c1f145c6cf993371658357b6ab73a4ac4131e',
   'c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2'),
  ((0.0111, 0.0018), 0.07250000000000001))]

In [108]:
from math import sqrt
num.join(den1).join(den2).mapValues(lambda x:round(x[0][0]/(sqrt(x[0][1])*sqrt(x[1])),2)).take(5)

[(('51fb0c6f89320241c4daaf73a37d8e582d628935',
   '615c1f145c6cf993371658357b6ab73a4ac4131e'),
  1.0),
 (('40e43d7235c5bc26b510e8888b47892c2245e677',
   'cadde6d1f0ef29f3c6ef39b1c1346b70bb962f73'),
  0.83),
 (('e385bdf19e767b0f0fd0e2f944c4b4247d13f2e6',
   'f1078f1db06630f7f3dbaa5cee50b2a1c377f051'),
  0.95),
 (('3bac2b72d06b520b910025271889a796b153bd0d',
   'ce0cf4c545df0c6b67360c821161753343fd7880'),
  0.89),
 (('615c1f145c6cf993371658357b6ab73a4ac4131e',
   'c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2'),
  0.97)]

In [109]:
cos_rdd=num.join(den1).join(den2).mapValues(lambda x:round(x[0][0]/(sqrt(x[0][1])*sqrt(x[1])),2))

In [110]:
cos_sorted=cos_rdd.map(lambda x:[x[0][0],x[0][1],x[1]]).sortBy(lambda a:-a[2])
cos_sorted.collect()

[['51fb0c6f89320241c4daaf73a37d8e582d628935',
  '615c1f145c6cf993371658357b6ab73a4ac4131e',
  1.0],
 ['7b58845ea93b5f8abad8a4ebe6cc9b163a8b0b94',
  'edc8a52250534d875fede3f0457f476f2a8c7c45',
  1.0],
 ['edc8a52250534d875fede3f0457f476f2a8c7c45',
  '7b58845ea93b5f8abad8a4ebe6cc9b163a8b0b94',
  1.0],
 ['c57f169f48927db0c273d37a9176c120e13aa8ae',
  'beb67d633b3b86c7cb034d5eed1c422f366475a4',
  1.0],
 ['beb67d633b3b86c7cb034d5eed1c422f366475a4',
  'c57f169f48927db0c273d37a9176c120e13aa8ae',
  1.0],
 ['5c96a5e4de377f182b1ef8d06e988d040dc460ac',
  'f762bf7822c6b531d7f08f4748804ed0d2e1a5fd',
  1.0],
 ['f762bf7822c6b531d7f08f4748804ed0d2e1a5fd',
  '5c96a5e4de377f182b1ef8d06e988d040dc460ac',
  1.0],
 ['b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  'c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2',
  1.0],
 ['789503f133190d106f822d79b99a0fcd7d9e3c05',
  'ae865f60390efcd8a19fbec1dfbd7e1cb1a0fc0e',
  1.0],
 ['615c1f145c6cf993371658357b6ab73a4ac4131e',
  '51fb0c6f89320241c4daaf73a37d8e582d628935',
  1.0],


In [139]:
cos_temp=cos_sorted.map(lambda x:(x[0],1))\
                   .distinct()\
                   .join(cos_sorted.map(lambda x: ((x[0]),(x[1],x[2]))))\
                   .map(lambda x: (x[0], x[1][1][0], x[1][1][1]))
                   #.groupByKey().map(lambda x:(x[0],list(x[1])))\
                    # .mapValues(lambda x:list(x[0]))\
                     #.map(lambda x:[x[0],x[1][0],x[1][1]])
cos_temp.collect()

[('beb67d633b3b86c7cb034d5eed1c422f366475a4',
  'c57f169f48927db0c273d37a9176c120e13aa8ae',
  1.0),
 ('cadde6d1f0ef29f3c6ef39b1c1346b70bb962f73',
  '40e43d7235c5bc26b510e8888b47892c2245e677',
  0.83),
 ('2a05e960080e185a4865dbdfced458a889a7a4e4',
  '7bcf634fb0551fde20bb340696b9e3de1a7ae29e',
  0.99),
 ('49aa6a7697d7f60742bc151ebdb5e22e3ce5099e',
  'e0757fd648ae07c975ce8d95dd272dbc984ceb66',
  0.94),
 ('eda0650f3f13c6de426899884dfce19ce947f3f8',
  'ba2ce8bd47bcc9767ca1698203729efe8338fdb5',
  1.0),
 ('86a6cb1ffb3d614ac68aee425f77bddd579d69cd',
  'e55219352fe85e41383e1382838844b07118f3c8',
  0.92),
 ('51fb0c6f89320241c4daaf73a37d8e582d628935',
  '615c1f145c6cf993371658357b6ab73a4ac4131e',
  1.0),
 ('51fb0c6f89320241c4daaf73a37d8e582d628935',
  'b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  0.98),
 ('51fb0c6f89320241c4daaf73a37d8e582d628935',
  'c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2',
  0.97),
 ('ce0cf4c545df0c6b67360c821161753343fd7880',
  'fa2fcc956b2048532afdf72aebd20b5c9e4369cd',
  

In [143]:
cos_temp1 = cos_temp.map(lambda x:(x[1],1))\
                   .distinct()\
                   .join(cos_temp.map(lambda x: ((x[1]),(x[0],x[2]))))\
                   .map(lambda x: (x[1][1][0], x[0], x[1][1][1]))
cos_temp1.collect()

[('c57f169f48927db0c273d37a9176c120e13aa8ae',
  'beb67d633b3b86c7cb034d5eed1c422f366475a4',
  1.0),
 ('8176bce662b8cf98359ea75f60b4cadc03a1cfa3',
  'ba729be25caed0641882b89034453ac859e3a8c7',
  0.84),
 ('8e5cf0c719a4129d49157b501913b20665334a0b',
  'c97def7a4a0a88cc6100337c6979743051b99fa3',
  0.96),
 ('073447c401a1c2e647eeb294b19471392baeb70c',
  '826316aca97d9ea8a679c4e22cce52d876fb7f42',
  1.0),
 ('b0661e5d6a1f9f4e5f1a24990b1eb94cdf21a4fa',
  '823d03faa2648c9a72d6446a11adbb7e70d252b3',
  0.89),
 ('f762bf7822c6b531d7f08f4748804ed0d2e1a5fd',
  '5c96a5e4de377f182b1ef8d06e988d040dc460ac',
  1.0),
 ('51fb0c6f89320241c4daaf73a37d8e582d628935',
  'b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  0.98),
 ('c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2',
  'b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  1.0),
 ('615c1f145c6cf993371658357b6ab73a4ac4131e',
  'b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  0.98),
 ('789503f133190d106f822d79b99a0fcd7d9e3c05',
  'ae865f60390efcd8a19fbec1dfbd7e1cb1a0fc0e',
  1

In [154]:
cos_temp2=cos_temp1.map(lambda x: (x[0], (x[1], x[2])))
cos_temp3=cos_temp1.map(lambda x: (x[1], (x[0], x[2])))

In [155]:
cos_temp2.collect()

[('c57f169f48927db0c273d37a9176c120e13aa8ae',
  ('beb67d633b3b86c7cb034d5eed1c422f366475a4', 1.0)),
 ('8176bce662b8cf98359ea75f60b4cadc03a1cfa3',
  ('ba729be25caed0641882b89034453ac859e3a8c7', 0.84)),
 ('8e5cf0c719a4129d49157b501913b20665334a0b',
  ('c97def7a4a0a88cc6100337c6979743051b99fa3', 0.96)),
 ('073447c401a1c2e647eeb294b19471392baeb70c',
  ('826316aca97d9ea8a679c4e22cce52d876fb7f42', 1.0)),
 ('b0661e5d6a1f9f4e5f1a24990b1eb94cdf21a4fa',
  ('823d03faa2648c9a72d6446a11adbb7e70d252b3', 0.89)),
 ('f762bf7822c6b531d7f08f4748804ed0d2e1a5fd',
  ('5c96a5e4de377f182b1ef8d06e988d040dc460ac', 1.0)),
 ('51fb0c6f89320241c4daaf73a37d8e582d628935',
  ('b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc', 0.98)),
 ('c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2',
  ('b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc', 1.0)),
 ('615c1f145c6cf993371658357b6ab73a4ac4131e',
  ('b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc', 0.98)),
 ('789503f133190d106f822d79b99a0fcd7d9e3c05',
  ('ae865f60390efcd8a19fbec1dfbd7e1cb1a0fc0e', 1.

In [156]:
cos_temp3.collect()

[('beb67d633b3b86c7cb034d5eed1c422f366475a4',
  ('c57f169f48927db0c273d37a9176c120e13aa8ae', 1.0)),
 ('ba729be25caed0641882b89034453ac859e3a8c7',
  ('8176bce662b8cf98359ea75f60b4cadc03a1cfa3', 0.84)),
 ('c97def7a4a0a88cc6100337c6979743051b99fa3',
  ('8e5cf0c719a4129d49157b501913b20665334a0b', 0.96)),
 ('826316aca97d9ea8a679c4e22cce52d876fb7f42',
  ('073447c401a1c2e647eeb294b19471392baeb70c', 1.0)),
 ('823d03faa2648c9a72d6446a11adbb7e70d252b3',
  ('b0661e5d6a1f9f4e5f1a24990b1eb94cdf21a4fa', 0.89)),
 ('5c96a5e4de377f182b1ef8d06e988d040dc460ac',
  ('f762bf7822c6b531d7f08f4748804ed0d2e1a5fd', 1.0)),
 ('b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  ('51fb0c6f89320241c4daaf73a37d8e582d628935', 0.98)),
 ('b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  ('c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2', 1.0)),
 ('b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  ('615c1f145c6cf993371658357b6ab73a4ac4131e', 0.98)),
 ('ae865f60390efcd8a19fbec1dfbd7e1cb1a0fc0e',
  ('789503f133190d106f822d79b99a0fcd7d9e3c05', 1.

In [157]:
cos_temp2.subtractByKey(cos_temp3).collect()
l1=cos_temp2.subtractByKey(cos_temp3)

In [158]:
cos_temp3.subtractByKey(cos_temp2).collect()
l2=cos_temp3.subtractByKey(cos_temp2)

In [163]:
cos_temp4=cos_temp2.intersection(cos_temp3)
l3=cos_temp4

In [164]:
cos_final=l1.union(l2).union(l3)

In [166]:
cos_final.takeOrdered(5, key = lambda x: -x[1][1])

[('e0757fd648ae07c975ce8d95dd272dbc984ceb66',
  ('b2d7e885c05769e3da347e81af37f78657254e65', 1.0)),
 ('f762bf7822c6b531d7f08f4748804ed0d2e1a5fd',
  ('5c96a5e4de377f182b1ef8d06e988d040dc460ac', 1.0)),
 ('c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2',
  ('b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc', 1.0)),
 ('826316aca97d9ea8a679c4e22cce52d876fb7f42',
  ('073447c401a1c2e647eeb294b19471392baeb70c', 1.0)),
 ('789503f133190d106f822d79b99a0fcd7d9e3c05',
  ('ae865f60390efcd8a19fbec1dfbd7e1cb1a0fc0e', 1.0))]

In [167]:
cos_final.map(lambda x:[x[0],x[1][0],x[1][1]]).sortBy(lambda x:-x[2]).collect()

[['e0757fd648ae07c975ce8d95dd272dbc984ceb66',
  'b2d7e885c05769e3da347e81af37f78657254e65',
  1.0],
 ['f762bf7822c6b531d7f08f4748804ed0d2e1a5fd',
  '5c96a5e4de377f182b1ef8d06e988d040dc460ac',
  1.0],
 ['c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2',
  'b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  1.0],
 ['826316aca97d9ea8a679c4e22cce52d876fb7f42',
  '073447c401a1c2e647eeb294b19471392baeb70c',
  1.0],
 ['789503f133190d106f822d79b99a0fcd7d9e3c05',
  'ae865f60390efcd8a19fbec1dfbd7e1cb1a0fc0e',
  1.0],
 ['beb67d633b3b86c7cb034d5eed1c422f366475a4',
  'c57f169f48927db0c273d37a9176c120e13aa8ae',
  1.0],
 ['34d3b4d4d4c07e864de04d331afb35215e95ebdd',
  'b2d7e885c05769e3da347e81af37f78657254e65',
  1.0],
 ['b952fd13fdbdc6b23fdc33d0f9d6e65fcf0ddebc',
  'c7af4f6b55beebda80cc1c3a54f20b71c1cbb5a2',
  1.0],
 ['34d3b4d4d4c07e864de04d331afb35215e95ebdd',
  'e0757fd648ae07c975ce8d95dd272dbc984ceb66',
  1.0],
 ['eda0650f3f13c6de426899884dfce19ce947f3f8',
  'ba2ce8bd47bcc9767ca1698203729efe8338fdb5',
  1.0],


In [168]:
givenUser = 'e0757fd648ae07c975ce8d95dd272dbc984ceb66'
top_users = cos_sorted.filter(lambda x: x[0]==givenUser).map(lambda x: (x[1],1))
top_users.collect()

[('b2d7e885c05769e3da347e81af37f78657254e65', 1),
 ('34d3b4d4d4c07e864de04d331afb35215e95ebdd', 1),
 ('fd9c3c7438f9505f956a52c98e58dbfee25c63aa', 1),
 ('49aa6a7697d7f60742bc151ebdb5e22e3ce5099e', 1)]

In [169]:
triplet_temp = rating_triplet.map(lambda x: (x[0], (x[1], x[2], x[3])))
user_temp = triplet_temp.join(top_users)
user_temp.map(lambda x: x[1][0][0]).distinct().collect()

['167248', '54386', '177172']