![DataStax Academy](https://s3.amazonaws.com/datastaxtraining/vq8Jr36Gk48v/datastax-academy.svg "DataStax Academy")

# Exercise 03.04 - Cassandra Connector: Cassandra Save Data

## Background

This exercise will take a look at processing data and saving it back into Cassandra. Data will be from a Cassandra table with the following definition:

Note: There are some columns which may have a null value: `avg_rating` and `description`. Be sure to use the `Option[]` data type for those columns.

***

## Directions

#### 1. Create an RDD for videos in the `videos_by_year_title` table from 2014.

In [1]:
case class Video( added_year : Int,
                  title : String, 
                  video_id : java.util.UUID, 
                  added_date : java.util.Date,
                  avg_rating : Option[Float], 
                  description : Option[String], 
                  user_id : java.util.UUID ) 

val videos = sc.cassandraTable("killr_video", "videos_by_year_title").as(Video).where("added_year = 2014")

#### 2. Filter the RDD to select videos that have an average user rating of 4 or lower, and count the number of videos.

In [2]:
val filterRating = videos.filter( video => if (video.avg_rating.isEmpty) false else video.avg_rating.get <= 4 )
filterRating.count

Long = 9

#### 3. Save the resulting RDD into a new Cassandra table, `worst_2014_videos`, using the `saveAsCassandraTable` action in the `killr_video` keyspace.

In [3]:
filterRating.saveAsCassandraTable("killr_video", "worst_2014_videos", SomeColumns("added_year","title","video_id",
                                                                                  "added_date","avg_rating",
                                                                                  "description","user_id"))

You will need to drop the table, if it exists, before your above code can run. 

#### 4. Take a look at the schema of the table that was created.

The kernel that we're using for Jupyter has a special feature to be able to view a table schema using the syntax %%showschema [keyspace][.table].

In [4]:
%%showschema killr_video.worst_2014_videos

0,1,2
added_year,IntType,partition key
added_date,TimestampType,
avg_rating,FloatType,
description,VarCharType,
title,VarCharType,
user_id,UUIDType,
video_id,UUIDType,


#### 5. Try to read all of the rows from the newly created table, selecting the title, added year, and average rating.

In [5]:
val worstVideos = sc.cassandraTable("killr_video", "worst_2014_videos").select("title","added_year","avg_rating")
worstVideos.collect.foreach(println)

CassandraRow{title: Sinbad: The Fifth Voyage, added_year: 2014, avg_rating: 3.8}


How many videos were returned? Is this the count that you had prior to saving to the new table?

Spark is able to infer the schema, create a new table, and load the table with the contents of the RDD. However only the first column is used as the primary key, which may not be how you'd want to define it.

#### 6. Save the earlier filtered RDD to Cassandra in a new table `worst_2014_videos_ex`, this time using the `saveAsCassandraTableEx` action. Define the table so that data is partitioned by the title, and sorted by added year.

In [6]:
import com.datastax.spark.connector.cql._;
import com.datastax.spark.connector.types._;

case class reorderedVideo( title : String,
                           added_year : Int, 
                           added_date : java.util.Date, 
                           avg_rating : Option[Float], 
                           description : Option[String], 
                           user_id : java.util.UUID, 
                           video_id : java.util.UUID )

val tableDef = TableDef( "killr_video", "worst_2014_videos_ex",
                         Seq(new ColumnDef("title", PartitionKeyColumn, TextType)),
                         Seq(new ColumnDef("added_year", ClusteringColumn(0), IntType)),
                         Seq(new ColumnDef("added_date", RegularColumn, TimestampType),
                             new ColumnDef("avg_rating", RegularColumn, FloatType),
                             new ColumnDef("description", RegularColumn, TextType),
                             new ColumnDef("user_id", RegularColumn, UUIDType),
                             new ColumnDef("video_id", RegularColumn, UUIDType))                    
)

val mappedColumns = filterRating.map(m => reorderedVideo(m.title, m.added_year, m.added_date, m.avg_rating, 
                                                        m.description, m.user_id, m.video_id))

mappedColumns.saveAsCassandraTableEx(tableDef)

#### 7. Check the schema of this new table.

In [7]:
%%showschema killr_video.worst_2014_videos_ex

0,1,2
title,VarCharType,partition key
added_year,IntType,cluster key 0
added_date,TimestampType,
avg_rating,FloatType,
description,VarCharType,
user_id,UUIDType,
video_id,UUIDType,


#### 8. Read all of the rows from the new table, selecting the title, added year, and average rating.

In [8]:
val worstVideosEx = sc.cassandraTable("killr_video", "worst_2014_videos_ex").select("title","added_year","avg_rating")
worstVideosEx.collect.foreach(println)

CassandraRow{title: Leprechaun: Origins, added_year: 2014, avg_rating: 4.0}
CassandraRow{title: Jinn, added_year: 2014, avg_rating: 4.0}
CassandraRow{title: Left Behind, added_year: 2014, avg_rating: 3.8}
CassandraRow{title: Monsters: Dark Continent, added_year: 2014, avg_rating: 3.8}
CassandraRow{title: Hercules Reborn, added_year: 2014, avg_rating: 3.9}
CassandraRow{title: Cymbeline, added_year: 2014, avg_rating: 3.6}
CassandraRow{title: Lap Dance, added_year: 2014, avg_rating: 3.6}
CassandraRow{title: Sinbad: The Fifth Voyage, added_year: 2014, avg_rating: 3.8}
CassandraRow{title: Alien Outpost, added_year: 2014, avg_rating: 3.9}
