In [1]:
%%init_spark
launcher.master="yarn"
launcher.num_executors=6
launcher.executor_cores=2
launcher.executor_memory="2600m"
launcher.conf.set("spark.sql.warehouse.dir", "hdfs://bd-hm:9000/user/hive/warehouse")
launcher.conf.set("spark.sql.catalogImplementation","hive")

#### Read in ratings data as RDD

In [2]:
val rdd= sc.textFile("hdfs://bd-hm:9000/hadoop-user/data/ratings")
rdd.take(4).foreach(println)

Intitializing Scala interpreter ...

Spark Web UI available at http://bd-hm:8088/proxy/application_1572480568928_0001
SparkContext available as 'sc' (version = 2.4.4, master = yarn, app id = application_1572480568928_0001)
SparkSession available as 'spark'


1	1#1097#1907#2321#2018#260#938#1246#2028#150#3408#2340#919#527#914#3186#1270#2355#48#531#2692#783#661#608#745#1035#2918#1022#2687#1028#1029#2804#1836#2797#2398#2791#1961#1287#1721#1962#1545#2294#1193#588#1197#3114#595#2762#1566#594#1207#720#3105
2	1103#1372#1096#1917#1370#2194#1090#3809#1357#21#2728#265#2717#1610#2236#515#1873#1124#318#2490#292#1385#780#3578#2268#349#2501#3471#3468#1597#3068#1834#1293#3071#95#3735#1544#3255#368#110#380#3256#3257#589#1537#590#356#593#3035#1084#1801#2278#3030#1552#1792#2852#2067#1217#3699#2312#1225#2858#2321#3418#3147#1244#1245#1246#1784#1247#2028#2002#1253#442#1527#2006#648#920#3678#163#647#434#1259#165#2353#3451#2359#1265#902#3654#2571#1968#2916#736#3334#3095#1957#1408#1953#459#1955#457#1954#3893#2126#2396#2943#1962#2881#235#1188#1193#982#1198#498#1196#1945#1442#1687#2628#1207#1210#3107#3105#480#2427#3108#1690#1213
3	1641#3534#552#2735#260#2997#3552#1615#2470#1394#1378#1379#1136#1580#1304#1049#1291#590#104#593#1079#2617#2858#2871#3421#653#2006#648#126

rdd: org.apache.spark.rdd.RDD[String] = hdfs://bd-hm:9000/hadoop-user/data/ratings MapPartitionsRDD[1] at textFile at <console>:25


#### Clean data

In [3]:
val ratings_pairs= rdd.map(line=>(line.split("\t")(0), line.split("\t")(1))).flatMapValues(l=>l.split("[\\t#]"))
ratings_pairs.take(4).foreach(println)

(1,1)
(1,1097)
(1,1907)
(1,2321)


ratings_pairs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at flatMapValues at <console>:26


#### Turn RDD into DF and save table

In [4]:
val ratings_cols=Seq("user", "movie")
ratings_pairs.toDF(ratings_cols:_*).write.saveAsTable("ratings")

org.apache.spark.sql.AnalysisException:  Table `ratings` already exists.;

In [5]:
sql("select * from ratings").show(5)

+----+-----+
|user|movie|
+----+-----+
|   1|    1|
|   1| 1097|
|   1| 1907|
|   1| 2321|
|   1| 2018|
+----+-----+
only showing top 5 rows



#### Read in movies data as RDD

In [6]:
val rddMovies= sc.textFile("hdfs://bd-hm:9000/hadoop-user/data/movies")
rddMovies.take(4).foreach(println)

1#Toy Story (1995)#Animation|Children's|Comedy
2#Jumanji (1995)#Adventure|Children's|Fantasy
3#Grumpier Old Men (1995)#Comedy|Romance
4#Waiting to Exhale (1995)#Comedy|Drama


rddMovies: org.apache.spark.rdd.RDD[String] = hdfs://bd-hm:9000/hadoop-user/data/movies MapPartitionsRDD[9] at textFile at <console>:25


#### Clean data

In [7]:
val movies_pairs= rddMovies.map(line=>(line.split("#")(0), line.split("#")(1)))
movies_pairs.take(4).foreach(println)

(1,Toy Story (1995))
(2,Jumanji (1995))
(3,Grumpier Old Men (1995))
(4,Waiting to Exhale (1995))


movies_pairs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at map at <console>:26


#### Turn RDD into DF and save table

In [8]:
val movies_cols=Seq("movie", "title")
movies_pairs.toDF(movies_cols:_*).write.saveAsTable("movies")

org.apache.spark.sql.AnalysisException:  Table `movies` already exists.;

In [9]:
sql("select * from movies").show(5)

+-----+--------------------+
|movie|               title|
+-----+--------------------+
| 2020|Dangerous Liaison...|
| 2021|         Dune (1984)|
| 2022|Last Temptation o...|
| 2023|Godfather: Part I...|
| 2024| Rapture, The (1991)|
+-----+--------------------+
only showing top 5 rows



#### Write query to extract final metrics

In [10]:
sql("select r1.user user1, r2.user user2, count (distinct r1.movie) count_common_movies, collect_set(r3.title) titles_list from ratings r1, ratings r2, movies r3 where r1.movie=r2.movie and r1.user<r2.user and r1.movie=r3.movie group by r1.user, r2.user having count (distinct r1.movie) > 50 order by count_common_movies desc").show(10)

+-----+-----+-------------------+--------------------+
|user1|user2|count_common_movies|         titles_list|
+-----+-----+-------------------+--------------------+
| 1680| 1941|               1169|[Airport '77 (197...|
| 1181| 1680|               1013|[Titan A.E. (2000...|
| 1680| 2063|                977|[Vampire in Brook...|
| 1181| 1941|                938|[Gremlins (1984),...|
| 1941| 2909|                936|[Airport '77 (197...|
| 1680|  424|                925|[Gremlins (1984),...|
| 1680| 1980|                923|[Gremlins (1984),...|
| 1941| 2063|                920|[Primal Fear (199...|
| 1680|  889|                916|[Before and After...|
| 1015| 1680|                909|[Primal Fear (199...|
+-----+-----+-------------------+--------------------+
only showing top 10 rows

