## initializing spark


In [1]:
#importing spark
import findspark
findspark.init()

import pyspark
sc=pyspark.SparkContext().getOrCreate()

## importing data from mongodb 

In [2]:
from pyspark.sql import *
spark = SparkSession.builder.config("spark.mongodb.input.uri=mongodb://127.0.0.1/sai.movie_meta").getOrCreate()

## reading data into spark dataframe

In [3]:
#df1 = spark.read.option("header","true").option("inferSchema","true").csv("movie_metadata.csv")
df1=spark.read.format("mongo").option("uri","mongodb://127.0.0.1/sai.movie_meta").load()

In [4]:
df1

DataFrame[_id: struct<oid:string>, actor_1_facebook_likes: string, actor_1_name: string, actor_2_facebook_likes: string, actor_2_name: string, actor_3_facebook_likes: string, actor_3_name: string, aspect_ratio: string, budget: string, cast_total_facebook_likes: int, color: string, content_rating: string, country: string, director_facebook_likes: string, director_name: string, duration: string, facenumber_in_poster: string, genres: string, gross: string, imdb_score: double, language: string, movie_facebook_likes: int, movie_imdb_link: string, movie_title: string, num_critic_for_reviews: string, num_user_for_reviews: string, num_voted_users: int, plot_keywords: string, title_year: string]

In [5]:
df1.show()

+--------------------+----------------------+------------------+----------------------+--------------------+----------------------+--------------------+------------+---------+-------------------------+-----+--------------+-----------+-----------------------+-----------------+--------+--------------------+--------------------+---------+----------+--------+--------------------+--------------------+--------------------+----------------------+--------------------+---------------+--------------------+----------+
|                 _id|actor_1_facebook_likes|      actor_1_name|actor_2_facebook_likes|        actor_2_name|actor_3_facebook_likes|        actor_3_name|aspect_ratio|   budget|cast_total_facebook_likes|color|content_rating|    country|director_facebook_likes|    director_name|duration|facenumber_in_poster|              genres|    gross|imdb_score|language|movie_facebook_likes|     movie_imdb_link|         movie_title|num_critic_for_reviews|num_user_for_reviews|num_voted_users|       

## selecting required Features and droping remaining columns for recommendation

In [6]:
cols = ['director_name','actor_1_name','actor_2_name','actor_3_name','genres','movie_title']
df1=df1.select(*cols)
df1.show()

+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    director_name|      actor_1_name|        actor_2_name|        actor_3_name|              genres|         movie_title|
+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    James Cameron|       CCH Pounder|    Joel David Moore|           Wes Studi|Action|Adventure|...|              Avatar|
|   Gore Verbinski|       Johnny Depp|       Orlando Bloom|      Jack Davenport|Action|Adventure|...|Pirates of the Ca...|
|   Andrew Stanton|      Daryl Sabara|     Samantha Morton|        Polly Walker|Action|Adventure|...|         John Carter|
|       Sam Mendes|   Christoph Waltz|        Rory Kinnear|    Stephanie Sigman|Action|Adventure|...|             Spectre|
|Christopher Nolan|         Tom Hardy|      Christian Bale|Joseph Gordon-Levitt|     Action|Thriller|The Dark Knight R...|
|      Doug Walk

## filling na values as 'unknown' 

In [7]:
#unkown as it is string it occupies min  8 byte ,if we use null then it occupies 0 bytes
df1.na.fill({'actor_1_name': 'unknown','actor_1_name': 'unknown','actor_1_name': 'unknown','director_name':'unknown'})

DataFrame[director_name: string, actor_1_name: string, actor_2_name: string, actor_3_name: string, genres: string, movie_title: string]

In [8]:
df1.show()

+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    director_name|      actor_1_name|        actor_2_name|        actor_3_name|              genres|         movie_title|
+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    James Cameron|       CCH Pounder|    Joel David Moore|           Wes Studi|Action|Adventure|...|              Avatar|
|   Gore Verbinski|       Johnny Depp|       Orlando Bloom|      Jack Davenport|Action|Adventure|...|Pirates of the Ca...|
|   Andrew Stanton|      Daryl Sabara|     Samantha Morton|        Polly Walker|Action|Adventure|...|         John Carter|
|       Sam Mendes|   Christoph Waltz|        Rory Kinnear|    Stephanie Sigman|Action|Adventure|...|             Spectre|
|Christopher Nolan|         Tom Hardy|      Christian Bale|Joseph Gordon-Levitt|     Action|Thriller|The Dark Knight R...|
|      Doug Walk

In [9]:
from pyspark.sql.functions import *
df1.withColumn("genres",regexp_replace("genres","[|]", " "))
#df1.withColumn("genres", explode(split("genres","[|]"))).show()

DataFrame[director_name: string, actor_1_name: string, actor_2_name: string, actor_3_name: string, genres: string, movie_title: string]

## seperating different Genres

In [10]:
newdf=df1.withColumn("genres",regexp_replace("genres","[|]", " "))

In [11]:
newdf.show()

+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    director_name|      actor_1_name|        actor_2_name|        actor_3_name|              genres|         movie_title|
+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    James Cameron|       CCH Pounder|    Joel David Moore|           Wes Studi|Action Adventure ...|              Avatar|
|   Gore Verbinski|       Johnny Depp|       Orlando Bloom|      Jack Davenport|Action Adventure ...|Pirates of the Ca...|
|   Andrew Stanton|      Daryl Sabara|     Samantha Morton|        Polly Walker|Action Adventure ...|         John Carter|
|       Sam Mendes|   Christoph Waltz|        Rory Kinnear|    Stephanie Sigman|Action Adventure ...|             Spectre|
|Christopher Nolan|         Tom Hardy|      Christian Bale|Joseph Gordon-Levitt|     Action Thriller|The Dark Knight R...|
|      Doug Walk

In [14]:
newdf["movie_title"][1]

Column<b'movie_title[1]'>

In [15]:
#selecting only movie_title column
newdf.select("movie_title").show()

+--------------------+
|         movie_title|
+--------------------+
|              Avatar|
|Pirates of the Ca...|
|         John Carter|
|             Spectre|
|The Dark Knight R...|
|Star Wars: Episod...|
|        Spider-Man 3|
|             Tangled|
|Avengers: Age of ...|
|Harry Potter and ...|
|Batman v Superman...|
|    Superman Returns|
|   Quantum of Solace|
|Pirates of the Ca...|
|     The Lone Ranger|
|        Man of Steel|
|The Chronicles of...|
|        The Avengers|
|Pirates of the Ca...|
|The Hobbit: The B...|
+--------------------+
only showing top 20 rows



In [16]:
#printing first 2 titles
newdf.select("movie_title").head(2)

[Row(movie_title='Avatar'),
 Row(movie_title="Pirates of the Caribbean: At World's End")]

## null terminating char at the end

In [19]:
newDF = newdf.withColumn("movie_title", split(newdf['movie_title'], '\xa0')[0])

In [20]:
newDF.show()

+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    director_name|      actor_1_name|        actor_2_name|        actor_3_name|              genres|         movie_title|
+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    James Cameron|       CCH Pounder|    Joel David Moore|           Wes Studi|Action Adventure ...|              Avatar|
|   Gore Verbinski|       Johnny Depp|       Orlando Bloom|      Jack Davenport|Action Adventure ...|Pirates of the Ca...|
|   Andrew Stanton|      Daryl Sabara|     Samantha Morton|        Polly Walker|Action Adventure ...|         John Carter|
|       Sam Mendes|   Christoph Waltz|        Rory Kinnear|    Stephanie Sigman|Action Adventure ...|             Spectre|
|Christopher Nolan|         Tom Hardy|      Christian Bale|Joseph Gordon-Levitt|     Action Thriller|The Dark Knight R...|
|      Doug Walk

In [21]:
newDF.select("movie_title").head(2)

[Row(movie_title='Avatar'),
 Row(movie_title="Pirates of the Caribbean: At World's End")]

## converting title column to lower case

In [22]:
new_DF=newDF.withColumn('movie_title',lower(newDF.movie_title))

In [33]:
new_DF.show()

+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    director_name|      actor_1_name|        actor_2_name|        actor_3_name|              genres|         movie_title|
+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+
|    James Cameron|       CCH Pounder|    Joel David Moore|           Wes Studi|Action Adventure ...|              avatar|
|   Gore Verbinski|       Johnny Depp|       Orlando Bloom|      Jack Davenport|Action Adventure ...|pirates of the ca...|
|       Sam Mendes|   Christoph Waltz|        Rory Kinnear|    Stephanie Sigman|Action Adventure ...|             spectre|
|Christopher Nolan|         Tom Hardy|      Christian Bale|Joseph Gordon-Levitt|     Action Thriller|the dark knight r...|
|      Doug Walker|       Doug Walker|          Rob Walker|                null|         Documentary|star wars: episod...|
|   Andrew Stant

## saving it as new csv file

In [34]:
#new_DF.write.csv('data.csv')

new_DF.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('moutput.csv')