![DataStax Academy](https://s3.amazonaws.com/datastaxtraining/vq8Jr36Gk48v/datastax-academy.svg "DataStax Academy")

# Exercise 05.04 - Key-Value Pairs: Pair RDD Joins

## Background

This exercise will look at using pair RDD joins to validate that the data in two separate tables are consistent. 

Data will be from the Cassandra `videos` table:

and the `videos_by_tag` table:

Note: There are some columns which may have a null value: `avg_rating` and `description`. Be sure to use the `Option[]` data type for those columns.

Keep in mind that there is a 1-n relationship between the `videos` table and the `videos_by_tag` table, i.e. there are multiple instances of the same video in `videos_by_tag`. 

***

## Directions

#### 1. Use a pair RDD join to check that the `title` for videos are consistent between the `videos` table and the `videos_by_tag` table. The output should list the video id and the title of the videos that are not consistent.

In [9]:
val video = sc.cassandraTable("killr_video", "videos")
    .select("video_id", "title")
    .as((i: java.util.UUID, t: String) => (i, t))
val videoByTag = sc.cassandraTable("killr_video", "videos_by_tag")
    .select("video_id", "title")
    .as((i: java.util.UUID, t: String) => (i, t))

video.join(videoByTag)
    .map{case (id, (t1, t2)) => (id, t1, t2)}
    .filter(tu => tu._2 != tu._3)
    .take(10)
    .foreach(println)

There should not be any inconsistent titles in the current data set.

#### 2. Run the following code box to insert some addtional videos in the two tables. These videos will have inconsistent titles.

In [None]:
val inconsistentVideos = sc.parallelize(Seq(("5ac5ce38-ad94-11e5-bf7f-feff819cdc9f", "Finding Nemo 2", 2017), 
                                            ("7c22eb38-ad94-11e5-bf7f-feff819cdc9f", "The Lego Batman Movie", 2017)))

val inconsistentVideosByTag = sc.parallelize(Seq(("nemo","5ac5ce38-ad94-11e5-bf7f-feff819cdc9f", "Finding Dory"), 
                                                 ("legos","7c22eb38-ad94-11e5-bf7f-feff819cdc9f", "The Lego Movie 2"),
                                                 ("batman","7c22eb38-ad94-11e5-bf7f-feff819cdc9f", "The Lego Movie 2")))                           
                              
inconsistentVideos.saveToCassandra("killr_video", "videos", SomeColumns("video_id","title","release_year"))
inconsistentVideosByTag.saveToCassandra("killr_video", "videos_by_tag", SomeColumns("tag","video_id","title"))

#### 3. Try running your code again.

In [None]:
val video = sc.cassandraTable("killr_video", "videos")
    .select("video_id", "title")
    .as((i: java.util.UUID, t: String) => (i, t))
val videoByTag = sc.cassandraTable("killr_video", "videos_by_tag")
    .select("video_id", "title")
    .as((i: java.util.UUID, t: String) => (i, t))

video.join(videoByTag)
    .map{case (id, (t1, t2)) => (id, t1, t2)}
    .filter(tu => tu._2 != tu._3)
    .take(10)
    .foreach(println)