<h1><b>Movieslen Dataset analysis using HIVE and Tableau on AWS</b></h1>

<h2>DATA CLEANING</h2>

<h3>Movies</h3>

In [58]:
from pyspark.sql.types import *
#creating schema for movies dataframe
schema4 = StructType([StructField('movieid', IntegerType(), True),StructField('title', StringType(), True),StructField('genres', StringType(), True)])
#creating movies dataframe and read csv file from s3 bucket
movies = spark.read.csv("s3a://project555/movies/movies.csv", schema=schema4 , header=True)
movies.show(7)

+-------+--------------------+--------------------+
|movieid|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 7 rows

In [4]:
movies.printSchema()

root
 |-- movieid: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

In [30]:
#Splitting the genres column of dataframe
from pyspark.sql.functions import split, explode
movies = movies.withColumn("genres", explode(split("genres", "[|]")))
movies.show(7)

+-------+----------------+---------+
|movieid|           title|   genres|
+-------+----------------+---------+
|      1|Toy Story (1995)|Adventure|
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      1|Toy Story (1995)|  Fantasy|
|      2|  Jumanji (1995)|Adventure|
|      2|  Jumanji (1995)| Children|
+-------+----------------+---------+
only showing top 7 rows

In [31]:
#Seperating the year from title column
#creating the function insertYear which seperate the year
import re
def insertYear(title):
  year = re.search('\(([0-9]{4})', title)
  if year is not None:
    year = year.group(1)
  else:
    year = 0
  return int(year)

sqlContext.udf.register("yearCleansing", insertYear)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

#udf used to cast integer datatype to new created 'year' column

year_udf = udf(insertYear, IntegerType())
df_movie_updated = movies.select("movieid","title","genres",year_udf("title").alias("year"))
df_movie_updated.show(7)

+-------+----------------+---------+----+
|movieid|           title|   genres|year|
+-------+----------------+---------+----+
|      1|Toy Story (1995)|Adventure|1995|
|      1|Toy Story (1995)|Animation|1995|
|      1|Toy Story (1995)| Children|1995|
|      1|Toy Story (1995)|   Comedy|1995|
|      1|Toy Story (1995)|  Fantasy|1995|
|      2|  Jumanji (1995)|Adventure|1995|
|      2|  Jumanji (1995)| Children|1995|
+-------+----------------+---------+----+
only showing top 7 rows

In [8]:
#data cleaning
#checking Null values in  given dataframe
df_movie_updated.filter(df_movie_updated.movieid.isNull()).count()
df_movie_updated.filter(df_movie_updated.title.isNull()).count()
df_movie_updated.filter(df_movie_updated.genres.isNull()).count()
df_movie_updated.filter(df_movie_updated.year.isNull()).count()

0

In [13]:
#data cleaning
#checking '0' values in  given dataframe
df_movie_updated.filter(df_movie_updated.movieid==0).count()
df_movie_updated.filter(df_movie_updated.title==0).count()
df_movie_updated.filter(df_movie_updated.genres==0).count()
df_movie_updated.filter(df_movie_updated.year==0).count()

394

In [15]:
#Removing the '0' value rows from year column
df_movie_updated = df_movie_updated.filter(df_movie_updated.year!=0)

In [16]:
df_movie_updated.filter(df_movie_updated.year==0).count()

0

In [17]:
#Storing cleaned csv file as ORC format in S3 bucket
df_movie_updated.write.format("orc").save("s3a://project555/new/movies_clean.orc/")

<h3>Ratings</h3>

In [32]:
from pyspark.sql.types import *
#define schema and create df and read csv from s3 bucket
schema5 = (StructType().add("userid", IntegerType()).add("movieid", IntegerType()).add("rating", FloatType()).add("timestamp", LongType()))
ratings = spark.read.csv("s3a://project555/ratings/ratings.csv", schema=schema5, header=True)

import pyspark.sql.functions as func
#functions func is used to round the rating column
ratings = ratings.withColumn("rating", func.round(ratings["rating"], 1))
ratings.show(7)

+------+-------+------+----------+
|userid|movieid|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
+------+-------+------+----------+
only showing top 7 rows

In [19]:
#data cleaning
#checking Null values in given dataframe
ratings.filter(ratings.userid.isNull()).count()
ratings.filter(ratings.movieid.isNull()).count()
ratings.filter(ratings.rating.isNull()).count()
ratings.filter(ratings.timestamp.isNull()).count()

0

In [21]:
#data cleaning
#checking '0' values in  given dataframe
ratings.filter(ratings.userid==0).count()
ratings.filter(ratings.movieid==0).count()
ratings.filter(ratings.rating==0).count()
ratings.filter(ratings.timestamp==0).count()

0

In [33]:
#Convert the timestamp column from df into DateTime 
from pyspark.sql import functions as func 
ratings= ratings.withColumn('Datetime', func.from_unixtime('timestamp').cast(TimestampType()))
ratings.show(7)

+------+-------+------+----------+-------------------+
|userid|movieid|rating| timestamp|           Datetime|
+------+-------+------+----------+-------------------+
|     1|    307|   3.5|1256677221|2009-10-27 21:00:21|
|     1|    481|   3.5|1256677456|2009-10-27 21:04:16|
|     1|   1091|   1.5|1256677471|2009-10-27 21:04:31|
|     1|   1257|   4.5|1256677460|2009-10-27 21:04:20|
|     1|   1449|   4.5|1256677264|2009-10-27 21:01:04|
|     1|   1590|   2.5|1256677236|2009-10-27 21:00:36|
|     1|   1591|   1.5|1256677475|2009-10-27 21:04:35|
+------+-------+------+----------+-------------------+
only showing top 7 rows

In [34]:
##splitting the Datetime column into Date and Time
import pyspark.sql.functions as F
ratings_split = F.split(ratings['Datetime'], ' ')
ratings = ratings.withColumn('date', ratings_split.getItem(0))
##ratings = ratings.withColumn('time', ratings_split.getItem(1))
ratings = ratings.drop('timestamp','Datetime')
ratings.show(7)

+------+-------+------+----------+
|userid|movieid|rating|      date|
+------+-------+------+----------+
|     1|    307|   3.5|2009-10-27|
|     1|    481|   3.5|2009-10-27|
|     1|   1091|   1.5|2009-10-27|
|     1|   1257|   4.5|2009-10-27|
|     1|   1449|   4.5|2009-10-27|
|     1|   1590|   2.5|2009-10-27|
|     1|   1591|   1.5|2009-10-27|
+------+-------+------+----------+
only showing top 7 rows

In [24]:
#Storing cleaned csv file as ORC format in S3 bucket
ratings.write.format("orc").save("s3a://project555/new/ratings_clean.orc/")

<h3>Tags</h3>

In [35]:
from pyspark.sql.types import *
#define schema and create df and read csv from s3 bucket
schema6 = (StructType().add("userid", IntegerType()).add("movieid", IntegerType()).add("tag", StringType()).add("timestamp", LongType()))
tags = spark.read.csv("s3a://project555/tags/tags.csv", schema=schema6, header=True)
tags.show(7)

+------+-------+------------+----------+
|userid|movieid|         tag| timestamp|
+------+-------+------------+----------+
|    14|    110|        epic|1443148538|
|    14|    110|    Medieval|1443148532|
|    14|    260|      sci-fi|1442169410|
|    14|    260|space action|1442169421|
|    14|    318|imdb top 250|1442615195|
|    14|    318|     justice|1442615192|
|    14|    480|   Dinosaurs|1443148563|
+------+-------+------------+----------+
only showing top 7 rows

In [36]:
#Convert the timestamp column from df into DateTime 
from pyspark.sql import functions as func 
tag= tags.withColumn('Datetime', func.from_unixtime('timestamp').cast(TimestampType()))
tag.show(7)

+------+-------+------------+----------+-------------------+
|userid|movieid|         tag| timestamp|           Datetime|
+------+-------+------------+----------+-------------------+
|    14|    110|        epic|1443148538|2015-09-25 02:35:38|
|    14|    110|    Medieval|1443148532|2015-09-25 02:35:32|
|    14|    260|      sci-fi|1442169410|2015-09-13 18:36:50|
|    14|    260|space action|1442169421|2015-09-13 18:37:01|
|    14|    318|imdb top 250|1442615195|2015-09-18 22:26:35|
|    14|    318|     justice|1442615192|2015-09-18 22:26:32|
|    14|    480|   Dinosaurs|1443148563|2015-09-25 02:36:03|
+------+-------+------------+----------+-------------------+
only showing top 7 rows

In [37]:
#data cleaning
#checking Null values in given dataframe
tag.filter(tag.userid.isNull()).count()
tag.filter(tag.movieid.isNull()).count()
tag.filter(tag.tag.isNull()).count()
tag.filter(tag.timestamp.isNull()).count()

2

In [38]:
#droping Null values from timestamp column
tag = tag.dropna(subset=["timestamp"])

tag.filter(tag.timestamp.isNull()).count()
tag.filter(tag.Datetime.isNull()).count()

0

In [39]:
##splitting the Datetime column into Date and Time
import pyspark.sql.functions as F
tagclean = F.split(tag['Datetime'], ' ')
tag = tag.withColumn('date', tagclean.getItem(0))
##tag= tag.withColumn('time', tagclean.getItem(1))

##Drop the timestamp and Datetime Column
tag_clean = tag.drop('timestamp','Datetime')
tag_clean.show(7)

+------+-------+------------+----------+
|userid|movieid|         tag|      date|
+------+-------+------------+----------+
|    14|    110|        epic|2015-09-25|
|    14|    110|    Medieval|2015-09-25|
|    14|    260|      sci-fi|2015-09-13|
|    14|    260|space action|2015-09-13|
|    14|    318|imdb top 250|2015-09-18|
|    14|    318|     justice|2015-09-18|
|    14|    480|   Dinosaurs|2015-09-25|
+------+-------+------------+----------+
only showing top 7 rows

In [40]:
#Storing cleaned csv file as ORC format in S3 bucket
tag_clean.write.format("orc").save("s3a://project555/new/tags_clean.orc/")

##Read the multiple file from s3 bucket, give the path of folder contain multiple files
#dff = sqlContext.read.format('orc').load("s3a://project555/new_cleaning/links_clean.orc/")

<h3>Links</h3>

In [41]:
#create a schema in spark for link file
from pyspark.sql.types import *
#define schema and create df and read csv from s3 bucket
schema1 = StructType([StructField('movieid', IntegerType(), True),StructField('imdbid',  IntegerType(), True),StructField('tmdbid', IntegerType(), True)])

#read data from s3 bucket
link = spark.read.csv("s3a://project555/links/links.csv", schema=schema1 , header=True)
link.show(7)

+-------+------+------+
|movieid|imdbid|tmdbid|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
|      6|113277|   949|
|      7|114319| 11860|
+-------+------+------+
only showing top 7 rows

In [42]:
#data cleaning
#checking Null values in  given dataframe

link.filter(link.movieid.isNull()).count()
link.filter(link.imdbid.isNull()).count()
link.filter(link.tmdbid.isNull()).count()

181

In [43]:
#cleaning Null values from tmdbId Column

link = link.dropna(subset=["tmdbid"])
link.filter(link.tmdbid.isNull()).count()

0

In [44]:
#data cleaning
#checking '0' values in  given dataframe
link.filter(link.movieid==0).count()
link.filter(link.imdbid==0).count()
link.filter(link.tmdbid==0).count()

0

In [45]:
#Storing cleaned csv file as ORC format in S3 bucket
link.write.format("orc").save("s3a://project555/new/links_clean.orc")

<h3>Genome-tags</h3>

In [46]:
from pyspark.sql.types import *
#define schema and create df and read csv from s3 bucket
schema3 = StructType([StructField('tagid', IntegerType(), True),StructField('tag', StringType(), True)])

genome_tags = spark.read.csv("s3a://project555/genome-tags/genome-tags.csv", schema=schema3, header=True)
genome_tags.show(7)

+-----+------------+
|tagid|         tag|
+-----+------------+
|    1|         007|
|    2|007 (series)|
|    3|18th century|
|    4|       1920s|
|    5|       1930s|
|    6|       1950s|
|    7|       1960s|
+-----+------------+
only showing top 7 rows

In [47]:
#data cleaning
#checking Null values in given dataframe
genome_tags.filter(genome_tags.tagid.isNull()).count()
genome_tags.filter(genome_tags.tag.isNull()).count()

0

In [48]:
#data cleaning
#checking '0' values in given dataframe
genome_tags.filter(genome_tags.tagid==0).count()
genome_tags.filter(genome_tags.tag==0).count()

0

In [49]:
#Storing cleaned csv file as ORC format in S3 bucket
genome_tags.write.format("orc").save("s3a://project555/new/genome_tags_clean.orc/")

<h3>Genome-scores</h3>

In [50]:
from pyspark.sql.types import *
#define schema and create df and read csv from s3 bucket
schema2 = StructType([StructField('movieid', IntegerType(), True),StructField('tagid',  IntegerType(), True),StructField('relevance', FloatType(), True)])

genome_scores = spark.read.csv("s3a://project555/genome-scores/genome-scores.csv", schema=schema2 , header=True)

import pyspark.sql.functions as func
#functions func is used to round the relevance column
genome_scores = genome_scores.withColumn("relevance", func.round(genome_scores["relevance"], 3))
genome_scores.show(7)

+-------+-----+---------+
|movieid|tagid|relevance|
+-------+-----+---------+
|      1|    1|    0.029|
|      1|    2|    0.024|
|      1|    3|    0.054|
|      1|    4|    0.069|
|      1|    5|     0.16|
|      1|    6|    0.195|
|      1|    7|    0.076|
+-------+-----+---------+
only showing top 7 rows

In [51]:
#data cleaning
#checking Null values in given dataframe
genome_scores.filter(genome_scores.movieid.isNull()).count()
genome_scores.filter(genome_scores.tagid.isNull()).count()
genome_scores.filter(genome_scores.relevance.isNull()).count()

0

In [52]:
#data cleaning
#checking '0' values in given dataframe
genome_scores.filter(genome_scores.movieid==0).count()
genome_scores.filter(genome_scores.tagid==0).count()
genome_scores.filter(genome_scores.relevance==0).count()

10

In [55]:
genome_scores = genome_scores.filter(genome_scores.relevance!=0)

In [56]:
genome_scores.filter(genome_scores.relevance==0).count()

0

In [57]:
#Storing cleaned csv file as ORC format in S3 bucket
genome_scores.write.format("orc").save("s3a://project555/new/genome_scores_clean.orc/")