In [4]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Chap2').getOrCreate()
spark

In [5]:
file_location = 'movie_data_part1.csv'
file_type = 'csv'
InferSchema = False
first_row_is_header = True
delimiter = "|"

In [6]:
df = spark.read.format(file_type)\
.option("inferschema", InferSchema)\
.option('header', first_row_is_header)\
.option('sep', delimiter)\
.load(file_location)

In [7]:
#df.count()
print("The total number of records is " + str(df.count()))

The total number of records is 43998


In [8]:
df.printSchema()

root
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: string (nullable = true)



In [9]:
df.dtypes

[('belongs_to_collection', 'string'),
 ('budget', 'string'),
 ('id', 'string'),
 ('original_language', 'string'),
 ('original_title', 'string'),
 ('overview', 'string'),
 ('popularity', 'string'),
 ('production_companies', 'string'),
 ('production_countries', 'string'),
 ('release_date', 'string'),
 ('revenue', 'string'),
 ('runtime', 'string'),
 ('status', 'string'),
 ('tagline', 'string'),
 ('title', 'string'),
 ('vote_average', 'string')]

In [10]:
#subsetting the dataframe
columns = ['id', 'budget', 'popularity', 'release_date', 'revenue', 'title']
df = df.select(columns)
df.show(25, False)

+-----+-------+------------------+------------+-------+--------------------------------------------------------+
|id   |budget |popularity        |release_date|revenue|title                                                   |
+-----+-------+------------------+------------+-------+--------------------------------------------------------+
|43000|0      |2.503             |1962-05-23  |0      |The Elusive Corporal                                    |
|43001|0      |5.51              |1962-11-12  |0      |Sundays and Cybele                                      |
|43002|0      |5.62              |1962-05-24  |0      |Lonely Are the Brave                                    |
|43003|0      |7.159             |1975-03-12  |0      |F for Fake                                              |
|43004|500000 |3.988             |1962-10-09  |0      |Long Day's Journey Into Night                           |
|43006|0      |3.194             |1962-03-09  |0      |My Geisha                                

In [11]:
df.select(df[2], df[1]).show()

+------------------+-------+
|        popularity| budget|
+------------------+-------+
|             2.503|      0|
|              5.51|      0|
|              5.62|      0|
|             7.159|      0|
|             3.988| 500000|
|             3.194|      0|
|             2.689|      0|
|             6.537|      0|
|             4.297|      0|
|             4.417|      0|
|4.7219999999999995|7000000|
|             2.543|      0|
|             4.303|      0|
|             3.493|      0|
|             2.851|      0|
|             4.047|      0|
|             2.661|      0|
|             3.225|      0|
|              5.72|      0|
|             3.292|      0|
+------------------+-------+
only showing top 20 rows



In [12]:
#missing value
from pyspark.sql.functions import *

df.filter((df['popularity'] == '') | (df['popularity'].isNull()) | isnan(df['popularity'])).count()

215

In [13]:
df.select([count(when((col(c) == '')  | (col(c).isNull()) | (isnan(col(c))), c)).alias(c) for c in df.columns]).show()

+---+------+----------+------------+-------+-----+
| id|budget|popularity|release_date|revenue|title|
+---+------+----------+------------+-------+-----+
|125|   125|       215|         221|    215|  304|
+---+------+----------+------------+-------+-----+



In [14]:
df.groupBy(col('Title')).count().show()

+--------------------+-----+
|               Title|count|
+--------------------+-----+
|   The Corn Is Green|    1|
|Meet The Browns -...|    1|
|Morenita, El Esca...|    1|
| Father Takes a Wife|    1|
|The Werewolf of W...|    1|
|My Wife Is a Gang...|    1|
|Depeche Mode: Tou...|    1|
|  A Woman Is a Woman|    1|
|History Is Made a...|    1|
|      Colombian Love|    1|
|        Ace Attorney|    1|
|     Not Like Others|    1|
|40 Guns to Apache...|    1|
|          Middle Men|    1|
|         It's a Gift|    1|
|    La Vie de Bohème|    1|
|Rasputin: The Mad...|    1|
|The Ballad of Jac...|    1|
|         How to Deal|    1|
|             Freaked|    1|
+--------------------+-----+
only showing top 20 rows



In [15]:
df.groupBy(col('Title')).count().sort(desc("count")).show(20, False)

+-----------------------------+-----+
|Title                        |count|
+-----------------------------+-----+
|null                         |304  |
|The Three Musketeers         |8    |
|Les Misérables               |8    |
|Cinderella                   |8    |
|Dracula                      |7    |
|The Island                   |7    |
|Hamlet                       |7    |
|Frankenstein                 |7    |
|A Christmas Carol            |7    |
|Crime and Punishment         |6    |
|Beauty and the Beast         |6    |
|Treasure Island              |6    |
|Cleopatra                    |6    |
|The Hound of the Baskervilles|6    |
|First Love                   |6    |
|Borderline                   |6    |
|Framed                       |6    |
|The Lost World               |6    |
|Wuthering Heights            |5    |
|The Intruder                 |5    |
+-----------------------------+-----+
only showing top 20 rows



In [16]:
df_temp = df.filter((col('Title') != '') | (col('Title').isNotNull()) | (~isnan(col('Title'))))
df_temp.show(10, False)

+-----+------+----------+------------+-------+---------------------------------------+
|id   |budget|popularity|release_date|revenue|title                                  |
+-----+------+----------+------------+-------+---------------------------------------+
|43000|0     |2.503     |1962-05-23  |0      |The Elusive Corporal                   |
|43001|0     |5.51      |1962-11-12  |0      |Sundays and Cybele                     |
|43002|0     |5.62      |1962-05-24  |0      |Lonely Are the Brave                   |
|43003|0     |7.159     |1975-03-12  |0      |F for Fake                             |
|43004|500000|3.988     |1962-10-09  |0      |Long Day's Journey Into Night          |
|43006|0     |3.194     |1962-03-09  |0      |My Geisha                              |
|43007|0     |2.689     |1962-10-31  |0      |Period of Adjustment                   |
|43008|0     |6.537     |1959-03-13  |0      |The Hanging Tree                       |
|43010|0     |4.297     |1962-01-01  |0    

In [17]:
df_temp.groupBy('Title').count().filter(col('count') >= 4).sort(col('count').desc()).show(10, False)

+--------------------+-----+
|Title               |count|
+--------------------+-----+
|null                |304  |
|The Three Musketeers|8    |
|Les Misérables      |8    |
|Cinderella          |8    |
|The Island          |7    |
|Frankenstein        |7    |
|A Christmas Carol   |7    |
|Hamlet              |7    |
|Dracula             |7    |
|Framed              |6    |
+--------------------+-----+
only showing top 10 rows



In [18]:
#del df_temp

In [19]:
df.dtypes

[('id', 'string'),
 ('budget', 'string'),
 ('popularity', 'string'),
 ('release_date', 'string'),
 ('revenue', 'string'),
 ('title', 'string')]

In [20]:
df = df.withColumn("budget", col('budget').cast('float'))

In [21]:
df.dtypes

[('id', 'string'),
 ('budget', 'float'),
 ('popularity', 'string'),
 ('release_date', 'string'),
 ('revenue', 'string'),
 ('title', 'string')]

In [22]:
df.show(10, False)

+-----+--------+----------+------------+-------+---------------------------------------+
|id   |budget  |popularity|release_date|revenue|title                                  |
+-----+--------+----------+------------+-------+---------------------------------------+
|43000|0.0     |2.503     |1962-05-23  |0      |The Elusive Corporal                   |
|43001|0.0     |5.51      |1962-11-12  |0      |Sundays and Cybele                     |
|43002|0.0     |5.62      |1962-05-24  |0      |Lonely Are the Brave                   |
|43003|0.0     |7.159     |1975-03-12  |0      |F for Fake                             |
|43004|500000.0|3.988     |1962-10-09  |0      |Long Day's Journey Into Night          |
|43006|0.0     |3.194     |1962-03-09  |0      |My Geisha                              |
|43007|0.0     |2.689     |1962-10-31  |0      |Period of Adjustment                   |
|43008|0.0     |6.537     |1959-03-13  |0      |The Hanging Tree                       |
|43010|0.0     |4.297

In [23]:
df.dtypes

[('id', 'string'),
 ('budget', 'float'),
 ('popularity', 'string'),
 ('release_date', 'string'),
 ('revenue', 'string'),
 ('title', 'string')]

In [24]:
int_vars = ['id']
date_vars = ['release_date']
float_vars = ['budget' , 'popularity', 'revenue']

In [25]:
from pyspark.sql.types import *

for column in int_vars:
    df = df.withColumn(column, col(column).cast(IntegerType()))
    
for column in date_vars:
    df = df.withColumn(column, col(column).cast(DateType()))

for column in float_vars:
    df = df.withColumn(column, col(column).cast(FloatType()))


In [26]:
df.dtypes

[('id', 'int'),
 ('budget', 'float'),
 ('popularity', 'float'),
 ('release_date', 'date'),
 ('revenue', 'float'),
 ('title', 'string')]

In [27]:
df.show(20, False)

+-----+---------+----------+------------+---------+--------------------------------------------------+
|id   |budget   |popularity|release_date|revenue  |title                                             |
+-----+---------+----------+------------+---------+--------------------------------------------------+
|43000|0.0      |2.503     |1962-05-23  |0.0      |The Elusive Corporal                              |
|43001|0.0      |5.51      |1962-11-12  |0.0      |Sundays and Cybele                                |
|43002|0.0      |5.62      |1962-05-24  |0.0      |Lonely Are the Brave                              |
|43003|0.0      |7.159     |1975-03-12  |0.0      |F for Fake                                        |
|43004|500000.0 |3.988     |1962-10-09  |0.0      |Long Day's Journey Into Night                     |
|43006|0.0      |3.194     |1962-03-09  |0.0      |My Geisha                                         |
|43007|0.0      |2.689     |1962-10-31  |0.0      |Period of Adjustment  

In [28]:
df.describe().show()

+-------+------------------+--------------------+-----------------+--------------------+--------------------+
|summary|                id|              budget|       popularity|             revenue|               title|
+-------+------------------+--------------------+-----------------+--------------------+--------------------+
|  count|             43784|               43873|            43783|               43783|               43694|
|   mean|44502.304312077475|   3736901.834963166|5.295444259579189|   9697079.597382545|            Infinity|
| stddev|27189.646588626343|1.5871814952777334E7|6.168030519208248|5.6879384496288106E7|                 NaN|
|    min|                 2|                 0.0|              0.6|                 0.0|!Women Art Revolu...|
|    max|            100988|               3.8E8|            180.0|        2.78796518E9|       시크릿 Secret|
+-------+------------------+--------------------+-----------------+--------------------+--------------------+



In [29]:
df_temp = df.filter((col('budget') != 0) & (col('budget').isNotNull()) & (~isnan(col('budget'))))

In [30]:
median = df_temp.approxQuantile('budget', [0.5], 0.1)
median

[6000000.0]

In [31]:
median = df_temp.approxQuantile(float_vars, [0.5], 0.1)
median

[[6000000.0], [8.567000389099121], [1700000.0]]

In [32]:
df.agg(countDistinct(col('title')).alias('count')).show()

+-----+
|count|
+-----+
|41138|
+-----+



In [33]:
df.select('title').distinct().show(10, False)

+---------------------------------------------+
|title                                        |
+---------------------------------------------+
|The Corn Is Green                            |
|Meet The Browns - The Play                   |
|Morenita, El Escandalo                       |
|Father Takes a Wife                          |
|The Werewolf of Washington                   |
|My Wife Is a Gangster                        |
|Depeche Mode: Touring the Angel Live in Milan|
|A Woman Is a Woman                           |
|History Is Made at Night                     |
|Colombian Love                               |
+---------------------------------------------+
only showing top 10 rows



In [34]:
df.withColumn('year', year(col('release_date')))\
.withColumn('month', month(col('release_date'))).show()

+-----+---------+----------+------------+---------+--------------------+----+-----+
|   id|   budget|popularity|release_date|  revenue|               title|year|month|
+-----+---------+----------+------------+---------+--------------------+----+-----+
|43000|      0.0|     2.503|  1962-05-23|      0.0|The Elusive Corporal|1962|    5|
|43001|      0.0|      5.51|  1962-11-12|      0.0|  Sundays and Cybele|1962|   11|
|43002|      0.0|      5.62|  1962-05-24|      0.0|Lonely Are the Brave|1962|    5|
|43003|      0.0|     7.159|  1975-03-12|      0.0|          F for Fake|1975|    3|
|43004| 500000.0|     3.988|  1962-10-09|      0.0|Long Day's Journe...|1962|   10|
|43006|      0.0|     3.194|  1962-03-09|      0.0|           My Geisha|1962|    3|
|43007|      0.0|     2.689|  1962-10-31|      0.0|Period of Adjustment|1962|   10|
|43008|      0.0|     6.537|  1959-03-13|      0.0|    The Hanging Tree|1959|    3|
|43010|      0.0|     4.297|  1962-01-01|      0.0|Sherlock Holmes a...|1962

In [35]:
df_temp = df.withColumn('release_year', year('release_date'))

In [36]:
df_temp = df_temp.withColumn('relase month', month('release_date')) \
                  .withColumn('relase day', dayofmonth('release_date'))  

In [37]:
df_temp.show()

+-----+---------+----------+------------+---------+--------------------+------------+------------+----------+
|   id|   budget|popularity|release_date|  revenue|               title|release_year|relase month|relase day|
+-----+---------+----------+------------+---------+--------------------+------------+------------+----------+
|43000|      0.0|     2.503|  1962-05-23|      0.0|The Elusive Corporal|        1962|           5|        23|
|43001|      0.0|      5.51|  1962-11-12|      0.0|  Sundays and Cybele|        1962|          11|        12|
|43002|      0.0|      5.62|  1962-05-24|      0.0|Lonely Are the Brave|        1962|           5|        24|
|43003|      0.0|     7.159|  1975-03-12|      0.0|          F for Fake|        1975|           3|        12|
|43004| 500000.0|     3.988|  1962-10-09|      0.0|Long Day's Journe...|        1962|          10|         9|
|43006|      0.0|     3.194|  1962-03-09|      0.0|           My Geisha|        1962|           3|         9|
|43007|   

In [38]:
df_temp.groupBy('release_year').agg(countDistinct('title').alias('count')).sort(col('release_year').desc()).show(20, False)

+------------+-----+
|release_year|count|
+------------+-----+
|2020        |1    |
|2018        |4    |
|2017        |6    |
|2016        |7    |
|2015        |13   |
|2014        |32   |
|2013        |106  |
|2012        |634  |
|2011        |2090 |
|2010        |2064 |
|2009        |2225 |
|2008        |2052 |
|2007        |1896 |
|2006        |1719 |
|2005        |1530 |
|2004        |1379 |
|2003        |1199 |
|2002        |1123 |
|2001        |1020 |
|2000        |932  |
+------------+-----+
only showing top 20 rows



In [39]:
df.filter(df['title'].like('Meet M%')).show(10, False)

+-----+---------+----------+------------+---------+--------------------+
|id   |budget   |popularity|release_date|revenue  |title               |
+-----+---------+----------+------------+---------+--------------------+
|20430|0.0      |3.614     |2004-01-29  |0.0      |Meet Market         |
|58401|0.0      |6.461     |2010-11-12  |0.0      |Meet My Friend      |
|909  |1707561.0|7.877     |1944-11-28  |7566000.0|Meet Me in St. Louis|
|94452|0.0      |0.6       |2007-04-08  |0.0      |Meet Mr. Daddy      |
|56928|5000000.0|5.649     |2010-06-04  |31649.0  |Meet Monica Velour  |
+-----+---------+----------+------------+---------+--------------------+



In [40]:
df.filter(~ df['title'].like('%s')).show(10, False)

+-----+--------+----------+------------+-------+---------------------------------------+
|id   |budget  |popularity|release_date|revenue|title                                  |
+-----+--------+----------+------------+-------+---------------------------------------+
|43000|0.0     |2.503     |1962-05-23  |0.0    |The Elusive Corporal                   |
|43001|0.0     |5.51      |1962-11-12  |0.0    |Sundays and Cybele                     |
|43002|0.0     |5.62      |1962-05-24  |0.0    |Lonely Are the Brave                   |
|43003|0.0     |7.159     |1975-03-12  |0.0    |F for Fake                             |
|43004|500000.0|3.988     |1962-10-09  |0.0    |Long Day's Journey Into Night          |
|43006|0.0     |3.194     |1962-03-09  |0.0    |My Geisha                              |
|43007|0.0     |2.689     |1962-10-31  |0.0    |Period of Adjustment                   |
|43008|0.0     |6.537     |1959-03-13  |0.0    |The Hanging Tree                       |
|43010|0.0     |4.297

In [41]:
df.filter(df['title'].like('%s')).show(10, False)

+-----+------+----------+------------+------------+----------------------+
|id   |budget|popularity|release_date|revenue     |title                 |
+-----+------+----------+------------+------------+----------------------+
|43016|0.0   |2.851     |1962-01-01  |0.0         |Waltz of the Toreadors|
|43020|0.0   |5.72      |1961-06-15  |0.0         |The Colossus of Rhodes|
|43026|0.0   |3.444     |1961-12-13  |0.0         |Paris Belongs to Us   |
|43031|0.0   |1.847     |1960-02-19  |0.0         |Devi - The Goddess    |
|43035|0.0   |6.383     |1960-03-01  |0.0         |Heller in Pink Tights |
|43047|0.0   |3.988     |1960-12-08  |0.0         |The Sundowners        |
|43059|0.0   |1.932     |2002-07-23  |0.0         |Con Express           |
|43074|1.44E8|17.568    |2016-07-14  |2.29147504E8|Ghostbusters          |
|43076|0.0   |3.252     |2006-09-15  |0.0         |Sons                  |
|43078|0.0   |1.248     |2006-06-15  |0.0         |Whaledreamers         |
+-----+------+----------+

In [42]:
df.filter(df['title'].rlike('\w*ove')).show(10,False)

+-----+------+----------+------------+------------+------------------------+
|id   |budget|popularity|release_date|revenue     |title                   |
+-----+------+----------+------------+------------+------------------------+
|43100|0.0   |7.252     |1959-10-07  |0.0         |General Della Rovere    |
|43152|0.0   |5.126     |2001-06-21  |0.0         |Love on a Diet          |
|43191|0.0   |4.921     |1952-08-29  |0.0         |Beware, My Lovely       |
|43281|0.0   |2.411     |1989-11-22  |0.0         |Love Without Pity       |
|43343|0.0   |3.174     |1953-12-25  |0.0         |Easy to Love            |
|43347|3.0E7 |14.863    |2010-11-22  |1.02820008E8|Love & Other Drugs      |
|43362|0.0   |1.705     |1952-02-23  |0.0         |Love Is Better Than Ever|
|43363|0.0   |2.02      |1952-05-29  |0.0         |Lovely to Look At       |
|43395|0.0   |4.758     |1950-11-10  |0.0         |Two Weeks with Love     |
|43455|0.0   |4.669     |1948-08-23  |0.0         |The Loves of Carmen     |

In [43]:
df.filter(df['title'].contains('ove')).show(10, False)

+-----+------+----------+------------+------------+------------------------+
|id   |budget|popularity|release_date|revenue     |title                   |
+-----+------+----------+------------+------------+------------------------+
|43100|0.0   |7.252     |1959-10-07  |0.0         |General Della Rovere    |
|43152|0.0   |5.126     |2001-06-21  |0.0         |Love on a Diet          |
|43191|0.0   |4.921     |1952-08-29  |0.0         |Beware, My Lovely       |
|43281|0.0   |2.411     |1989-11-22  |0.0         |Love Without Pity       |
|43343|0.0   |3.174     |1953-12-25  |0.0         |Easy to Love            |
|43347|3.0E7 |14.863    |2010-11-22  |1.02820008E8|Love & Other Drugs      |
|43362|0.0   |1.705     |1952-02-23  |0.0         |Love Is Better Than Ever|
|43363|0.0   |2.02      |1952-05-29  |0.0         |Lovely to Look At       |
|43395|0.0   |4.758     |1950-11-10  |0.0         |Two Weeks with Love     |
|43455|0.0   |4.669     |1948-08-23  |0.0         |The Loves of Carmen     |

In [44]:
var = df.select(df.colRegex("`re\w*`")).show()

+------------+---------+
|release_date|  revenue|
+------------+---------+
|  1962-05-23|      0.0|
|  1962-11-12|      0.0|
|  1962-05-24|      0.0|
|  1975-03-12|      0.0|
|  1962-10-09|      0.0|
|  1962-03-09|      0.0|
|  1962-10-31|      0.0|
|  1959-03-13|      0.0|
|  1962-01-01|      0.0|
|  1962-01-01|      0.0|
|  1962-11-21|4000000.0|
|  1962-04-17|      0.0|
|  1962-10-24|      0.0|
|  1962-12-07|      0.0|
|  1962-01-01|      0.0|
|  1961-10-11|      0.0|
|  1961-06-02|      0.0|
|  2010-05-28|      0.0|
|  1961-06-15|      0.0|
|  2008-08-22|      0.0|
+------------+---------+
only showing top 20 rows



In [45]:
var = df.select(df.colRegex("`re\w*`")).columns
var

['release_date', 'revenue']

In [46]:
df.select(df.colRegex("`re\w*`")).printSchema()

root
 |-- release_date: date (nullable = true)
 |-- revenue: float (nullable = true)



In [47]:
df.select(df.colRegex("`\w*e`")).printSchema()

root
 |-- release_date: date (nullable = true)
 |-- revenue: float (nullable = true)
 |-- title: string (nullable = true)



In [48]:
mean_pop = df.agg({'popularity' : 'mean'}).collect()[0]['avg(popularity)']
count_df = df.count()

In [49]:
print(mean_pop, count_df)

5.295444259579189 43998


In [50]:
df = df.withColumn('mean_pop', lit(mean_pop))
df.show()

+-----+---------+----------+------------+---------+--------------------+-----------------+
|   id|   budget|popularity|release_date|  revenue|               title|         mean_pop|
+-----+---------+----------+------------+---------+--------------------+-----------------+
|43000|      0.0|     2.503|  1962-05-23|      0.0|The Elusive Corporal|5.295444259579189|
|43001|      0.0|      5.51|  1962-11-12|      0.0|  Sundays and Cybele|5.295444259579189|
|43002|      0.0|      5.62|  1962-05-24|      0.0|Lonely Are the Brave|5.295444259579189|
|43003|      0.0|     7.159|  1975-03-12|      0.0|          F for Fake|5.295444259579189|
|43004| 500000.0|     3.988|  1962-10-09|      0.0|Long Day's Journe...|5.295444259579189|
|43006|      0.0|     3.194|  1962-03-09|      0.0|           My Geisha|5.295444259579189|
|43007|      0.0|     2.689|  1962-10-31|      0.0|Period of Adjustment|5.295444259579189|
|43008|      0.0|     6.537|  1959-03-13|      0.0|    The Hanging Tree|5.295444259579189|

In [51]:
df = df.withColumn('variance', pow((df['popularity'] - df['mean_pop']),2))
df.show()

+-----+---------+----------+------------+---------+--------------------+-----------------+--------------------+
|   id|   budget|popularity|release_date|  revenue|               title|         mean_pop|            variance|
+-----+---------+----------+------------+---------+--------------------+-----------------+--------------------+
|43000|      0.0|     2.503|  1962-05-23|      0.0|The Elusive Corporal|5.295444259579189|   7.797744825681142|
|43001|      0.0|      5.51|  1962-11-12|      0.0|  Sundays and Cybele|5.295444259579189|0.046034263963398346|
|43002|      0.0|      5.62|  1962-05-24|      0.0|Lonely Are the Brave|5.295444259579189| 0.10533635435520017|
|43003|      0.0|     7.159|  1975-03-12|      0.0|          F for Fake|5.295444259579189|  3.4728396990815216|
|43004| 500000.0|     3.988|  1962-10-09|      0.0|Long Day's Journe...|5.295444259579189|   1.709410711356759|
|43006|      0.0|     3.194|  1962-03-09|      0.0|           My Geisha|5.295444259579189|   4.416067952

In [52]:
variance_sum = df.agg({'variance' : 'sum'}).collect()[0]['sum(variance)']
variance_sum

1665668.6984729888

In [53]:
variance_pop = variance_sum/count_df-1
variance_pop

36.85782759382219

In [54]:
x=df.agg({'variance': 'sum'}).collect()[0]['sum(variance)']
x

1665668.6984729888

In [55]:
def new_cols(budget,popularity):
    if budget < 10000000:
        budget_cat = "Small"
    elif budget<100000000:
        budget_cat = "Medium"
    else:
        budget_cat = "Big"
        
    if popularity < 3:
        ratings = "low"
    elif popularity < 5:
        ratings = 'mid'
    else:
        ratings = 'high'
    
    return budget_cat, ratings

In [56]:
udfb = udf(new_cols, StructType([StructField("budget_cat",StringType(), True), StructField('ratings', StringType(), True)]))

In [57]:
temp_df = df.select('id', 'budget', 'popularity')\
            .withColumn('newcat', udfb("budget","popularity"))
temp_df.show(10, False)

+-----+--------+----------+-------------+
|id   |budget  |popularity|newcat       |
+-----+--------+----------+-------------+
|43000|0.0     |2.503     |{Small, low} |
|43001|0.0     |5.51      |{Small, high}|
|43002|0.0     |5.62      |{Small, high}|
|43003|0.0     |7.159     |{Small, high}|
|43004|500000.0|3.988     |{Small, mid} |
|43006|0.0     |3.194     |{Small, mid} |
|43007|0.0     |2.689     |{Small, low} |
|43008|0.0     |6.537     |{Small, high}|
|43010|0.0     |4.297     |{Small, mid} |
|43011|0.0     |4.417     |{Small, mid} |
+-----+--------+----------+-------------+
only showing top 10 rows



In [58]:
df_with_newcols = temp_df.select('id', 'budget', 'popularity', 'newcat') \
                          .withColumn('budget_cat', temp_df.newcat.getItem("budget_cat"))\
                          .withColumn('ratings', temp_df.newcat.getItem("ratings")).drop('newcat')

df_with_newcols.show(10,False)

+-----+--------+----------+----------+-------+
|id   |budget  |popularity|budget_cat|ratings|
+-----+--------+----------+----------+-------+
|43000|0.0     |2.503     |Small     |low    |
|43001|0.0     |5.51      |Small     |high   |
|43002|0.0     |5.62      |Small     |high   |
|43003|0.0     |7.159     |Small     |high   |
|43004|500000.0|3.988     |Small     |mid    |
|43006|0.0     |3.194     |Small     |mid    |
|43007|0.0     |2.689     |Small     |low    |
|43008|0.0     |6.537     |Small     |high   |
|43010|0.0     |4.297     |Small     |mid    |
|43011|0.0     |4.417     |Small     |mid    |
+-----+--------+----------+----------+-------+
only showing top 10 rows



In [59]:
df_with_newcols = df.select('id', 'budget', 'popularity') \
                    .withColumn('budget_cat', when(df['budget'] < 10000000, 'small').when(df['budget'] < 100000000, 'medium').otherwise('big'))\
                     .withColumn('ratings', when(df['popularity'] < 3, 'low').when(df['popularity'] < 5, 'Mid').otherwise('High'))
    
df_with_newcols.show()

+-----+---------+----------+----------+-------+
|   id|   budget|popularity|budget_cat|ratings|
+-----+---------+----------+----------+-------+
|43000|      0.0|     2.503|     small|    low|
|43001|      0.0|      5.51|     small|   High|
|43002|      0.0|      5.62|     small|   High|
|43003|      0.0|     7.159|     small|   High|
|43004| 500000.0|     3.988|     small|    Mid|
|43006|      0.0|     3.194|     small|    Mid|
|43007|      0.0|     2.689|     small|    low|
|43008|      0.0|     6.537|     small|   High|
|43010|      0.0|     4.297|     small|    Mid|
|43011|      0.0|     4.417|     small|    Mid|
|43012|7000000.0|     4.722|     small|    Mid|
|43013|      0.0|     2.543|     small|    low|
|43014|      0.0|     4.303|     small|    Mid|
|43015|      0.0|     3.493|     small|    Mid|
|43016|      0.0|     2.851|     small|    low|
|43017|      0.0|     4.047|     small|    Mid|
|43018|      0.0|     2.661|     small|    low|
|43019|      0.0|     3.225|     small| 

In [60]:
col_to_drop = ['budget_cat']
df_with_newcols = df_with_newcols.drop(*col_to_drop)
df_with_newcols.show(10,False)

+-----+--------+----------+-------+
|id   |budget  |popularity|ratings|
+-----+--------+----------+-------+
|43000|0.0     |2.503     |low    |
|43001|0.0     |5.51      |High   |
|43002|0.0     |5.62      |High   |
|43003|0.0     |7.159     |High   |
|43004|500000.0|3.988     |Mid    |
|43006|0.0     |3.194     |Mid    |
|43007|0.0     |2.689     |low    |
|43008|0.0     |6.537     |High   |
|43010|0.0     |4.297     |Mid    |
|43011|0.0     |4.417     |Mid    |
+-----+--------+----------+-------+
only showing top 10 rows



In [61]:
df_with_newcols = df_with_newcols.withColumnRenamed('id','film_id') \
                    .withColumnRenamed('ratings','film_ratings')

In [62]:
df_with_newcols.show(10, False)

+-------+--------+----------+------------+
|film_id|budget  |popularity|film_ratings|
+-------+--------+----------+------------+
|43000  |0.0     |2.503     |low         |
|43001  |0.0     |5.51      |High        |
|43002  |0.0     |5.62      |High        |
|43003  |0.0     |7.159     |High        |
|43004  |500000.0|3.988     |Mid         |
|43006  |0.0     |3.194     |Mid         |
|43007  |0.0     |2.689     |low         |
|43008  |0.0     |6.537     |High        |
|43010  |0.0     |4.297     |Mid         |
|43011  |0.0     |4.417     |Mid         |
+-------+--------+----------+------------+
only showing top 10 rows



In [63]:
new_names = [('budget','budget_film'), ('popularity', 'popularity_file')]

In [64]:
df_with_newcols = df_with_newcols.select(list(map(lambda old,new: col(old).alias(new), *zip(*new_names))))
df_with_newcols.show(10, False)

+-----------+---------------+
|budget_film|popularity_file|
+-----------+---------------+
|0.0        |2.503          |
|0.0        |5.51           |
|0.0        |5.62           |
|0.0        |7.159          |
|500000.0   |3.988          |
|0.0        |3.194          |
|0.0        |2.689          |
|0.0        |6.537          |
|0.0        |4.297          |
|0.0        |4.417          |
+-----------+---------------+
only showing top 10 rows



In [65]:
df.show(10, False)

+-----+--------+----------+------------+-------+---------------------------------------+-----------------+--------------------+
|id   |budget  |popularity|release_date|revenue|title                                  |mean_pop         |variance            |
+-----+--------+----------+------------+-------+---------------------------------------+-----------------+--------------------+
|43000|0.0     |2.503     |1962-05-23  |0.0    |The Elusive Corporal                   |5.295444259579189|7.797744825681142   |
|43001|0.0     |5.51      |1962-11-12  |0.0    |Sundays and Cybele                     |5.295444259579189|0.046034263963398346|
|43002|0.0     |5.62      |1962-05-24  |0.0    |Lonely Are the Brave                   |5.295444259579189|0.10533635435520017 |
|43003|0.0     |7.159     |1975-03-12  |0.0    |F for Fake                             |5.295444259579189|3.4728396990815216  |
|43004|500000.0|3.988     |1962-10-09  |0.0    |Long Day's Journey Into Night          |5.29544425957918

In [66]:
df_with_cols = df.select('id', 'budget', 'popularity')\
               .withColumn('budget_cat', when(df['budget'] < 10000000, 'Small').when(df['budget'] < 100000000, 'Medium').otherwise('big'))\
                .withColumn('ratings', when(df['popularity'] < 3,'Low').when(df['popularity'] < 5, 'Mid').otherwise('High'))
    

df_with_cols.show(10,False)

+-----+--------+----------+----------+-------+
|id   |budget  |popularity|budget_cat|ratings|
+-----+--------+----------+----------+-------+
|43000|0.0     |2.503     |Small     |Low    |
|43001|0.0     |5.51      |Small     |High   |
|43002|0.0     |5.62      |Small     |High   |
|43003|0.0     |7.159     |Small     |High   |
|43004|500000.0|3.988     |Small     |Mid    |
|43006|0.0     |3.194     |Small     |Mid    |
|43007|0.0     |2.689     |Small     |Low    |
|43008|0.0     |6.537     |Small     |High   |
|43010|0.0     |4.297     |Small     |Mid    |
|43011|0.0     |4.417     |Small     |Mid    |
+-----+--------+----------+----------+-------+
only showing top 10 rows



In [67]:
df_with_cols = df_with_cols.withColumn('Concat_budget_ratings', concat(col('budget_cat'), col('ratings')))
df_with_cols.show(10,False)

+-----+--------+----------+----------+-------+---------------------+
|id   |budget  |popularity|budget_cat|ratings|Concat_budget_ratings|
+-----+--------+----------+----------+-------+---------------------+
|43000|0.0     |2.503     |Small     |Low    |SmallLow             |
|43001|0.0     |5.51      |Small     |High   |SmallHigh            |
|43002|0.0     |5.62      |Small     |High   |SmallHigh            |
|43003|0.0     |7.159     |Small     |High   |SmallHigh            |
|43004|500000.0|3.988     |Small     |Mid    |SmallMid             |
|43006|0.0     |3.194     |Small     |Mid    |SmallMid             |
|43007|0.0     |2.689     |Small     |Low    |SmallLow             |
|43008|0.0     |6.537     |Small     |High   |SmallHigh            |
|43010|0.0     |4.297     |Small     |Mid    |SmallMid             |
|43011|0.0     |4.417     |Small     |Mid    |SmallMid             |
+-----+--------+----------+----------+-------+---------------------+
only showing top 10 rows



In [68]:
df_with_cols = df_with_cols.withColumn('Small_concat', trim(lower(concat(col('budget_cat'), lit(' '), col('ratings')))))

In [69]:
df_with_cols.show(10,False)

+-----+--------+----------+----------+-------+---------------------+------------+
|id   |budget  |popularity|budget_cat|ratings|Concat_budget_ratings|Small_concat|
+-----+--------+----------+----------+-------+---------------------+------------+
|43000|0.0     |2.503     |Small     |Low    |SmallLow             |small low   |
|43001|0.0     |5.51      |Small     |High   |SmallHigh            |small high  |
|43002|0.0     |5.62      |Small     |High   |SmallHigh            |small high  |
|43003|0.0     |7.159     |Small     |High   |SmallHigh            |small high  |
|43004|500000.0|3.988     |Small     |Mid    |SmallMid             |small mid   |
|43006|0.0     |3.194     |Small     |Mid    |SmallMid             |small mid   |
|43007|0.0     |2.689     |Small     |Low    |SmallLow             |small low   |
|43008|0.0     |6.537     |Small     |High   |SmallHigh            |small high  |
|43010|0.0     |4.297     |Small     |Mid    |SmallMid             |small mid   |
|43011|0.0     |

In [70]:
df_with_cols.registerTempTable('temp_data')



In [71]:
spark.sql('select ratings, count(*) from temp_data group by ratings').show(10,False)

+-------+--------+
|ratings|count(1)|
+-------+--------+
|High   |16856   |
|Low    |14865   |
|Mid    |12277   |
+-------+--------+



In [72]:
spark.sql('select popularity, case when popularity < 3 then "Low" when popularity < 5 then "Mid" else "High" end as bucket from  temp_data').show(10,False)

+----------+------+
|popularity|bucket|
+----------+------+
|2.503     |Low   |
|5.51      |High  |
|5.62      |High  |
|7.159     |High  |
|3.988     |Mid   |
|3.194     |Mid   |
|2.689     |Low   |
|6.537     |High  |
|4.297     |Mid   |
|4.417     |Mid   |
+----------+------+
only showing top 10 rows



In [73]:
df_with_newcols = df_with_cols.filter((df_with_cols['popularity'].isNotNull()) & (~isnan(df_with_cols['popularity'])) )
df_with_newcols.show(10, False)

+-----+--------+----------+----------+-------+---------------------+------------+
|id   |budget  |popularity|budget_cat|ratings|Concat_budget_ratings|Small_concat|
+-----+--------+----------+----------+-------+---------------------+------------+
|43000|0.0     |2.503     |Small     |Low    |SmallLow             |small low   |
|43001|0.0     |5.51      |Small     |High   |SmallHigh            |small high  |
|43002|0.0     |5.62      |Small     |High   |SmallHigh            |small high  |
|43003|0.0     |7.159     |Small     |High   |SmallHigh            |small high  |
|43004|500000.0|3.988     |Small     |Mid    |SmallMid             |small mid   |
|43006|0.0     |3.194     |Small     |Mid    |SmallMid             |small mid   |
|43007|0.0     |2.689     |Small     |Low    |SmallLow             |small low   |
|43008|0.0     |6.537     |Small     |High   |SmallHigh            |small high  |
|43010|0.0     |4.297     |Small     |Mid    |SmallMid             |small mid   |
|43011|0.0     |

In [74]:
from pyspark.sql.window import *

df_with_newcols = df_with_newcols.select('id', 'budget', 'popularity', ntile(10).over(Window.partitionBy()\
                    .orderBy(df_with_cols['popularity'].desc())).alias("decile_rank"))

df_with_newcols.show(10, False)

+----+------+----------+-----------+
|id  |budget|popularity|decile_rank|
+----+------+----------+-----------+
|null|0.6   |180.0     |1          |
|null|3.585 |180.0     |1          |
|6795|6.5E7 |161.547   |2          |
|null|0.939 |159.0     |3          |
|null|0.877 |155.0     |4          |
|null|4.892 |153.0     |5          |
|null|0.616 |150.0     |6          |
|null|1.4   |150.0     |7          |
|null|0.674 |150.0     |8          |
|null|2.923 |142.0     |9          |
+----+------+----------+-----------+
only showing top 10 rows



### title ###custom

### title ###custom

In [75]:
df_with_newcols.groupBy('decile_rank')\
                .agg(\
                     min('popularity').alias('min_popularity'),\
                     max('popularity').alias('max_popularity'),\
                    count('popularity')\
                    ).show()

+-----------+--------------+--------------+-----------------+
|decile_rank|min_popularity|max_popularity|count(popularity)|
+-----------+--------------+--------------+-----------------+
|          1|        10.185|         180.0|             4379|
|          2|         7.481|        10.182|             4379|
|          3|         5.841|         7.481|             4379|
|          4|         4.823|         5.841|             4378|
|          5|         4.054|         4.822|             4378|
|          6|         3.383|         4.054|             4378|
|          7|         2.747|         3.383|             4378|
|          8|         2.075|         2.747|             4378|
|          9|         1.389|         2.075|             4378|
|         10|           0.6|         1.389|             4378|
+-----------+--------------+--------------+-----------------+



In [76]:
from pyspark.sql.window import *
df_second_best = df.select('id', 'popularity', 'release_date', year(col('release_date')).alias('year'))
df_second_best.show()

+-----+----------+------------+----+
|   id|popularity|release_date|year|
+-----+----------+------------+----+
|43000|     2.503|  1962-05-23|1962|
|43001|      5.51|  1962-11-12|1962|
|43002|      5.62|  1962-05-24|1962|
|43003|     7.159|  1975-03-12|1975|
|43004|     3.988|  1962-10-09|1962|
|43006|     3.194|  1962-03-09|1962|
|43007|     2.689|  1962-10-31|1962|
|43008|     6.537|  1959-03-13|1959|
|43010|     4.297|  1962-01-01|1962|
|43011|     4.417|  1962-01-01|1962|
|43012|     4.722|  1962-11-21|1962|
|43013|     2.543|  1962-04-17|1962|
|43014|     4.303|  1962-10-24|1962|
|43015|     3.493|  1962-12-07|1962|
|43016|     2.851|  1962-01-01|1962|
|43017|     4.047|  1961-10-11|1961|
|43018|     2.661|  1961-06-02|1961|
|43019|     3.225|  2010-05-28|2010|
|43020|      5.72|  1961-06-15|1961|
|43021|     3.292|  2008-08-22|2008|
+-----+----------+------------+----+
only showing top 20 rows



In [77]:
year_window = Window.partitionBy(df_second_best['year']).orderBy(df_second_best['popularity'].desc())

In [78]:
df_second_best = df_second_best.select('id', 'popularity', 'year',rank().over(year_window).alias('rank') )

In [79]:
df_second_best.filter((df_second_best['year']==1970) & (df_second_best['rank']==2)).show()

+-----+----------+----+----+
|   id|popularity|year|rank|
+-----+----------+----+----+
|11202|    14.029|1970|   2|
+-----+----------+----+----+



In [80]:
df.show(10, False)

+-----+--------+----------+------------+-------+---------------------------------------+-----------------+--------------------+
|id   |budget  |popularity|release_date|revenue|title                                  |mean_pop         |variance            |
+-----+--------+----------+------------+-------+---------------------------------------+-----------------+--------------------+
|43000|0.0     |2.503     |1962-05-23  |0.0    |The Elusive Corporal                   |5.295444259579189|7.797744825681142   |
|43001|0.0     |5.51      |1962-11-12  |0.0    |Sundays and Cybele                     |5.295444259579189|0.046034263963398346|
|43002|0.0     |5.62      |1962-05-24  |0.0    |Lonely Are the Brave                   |5.295444259579189|0.10533635435520017 |
|43003|0.0     |7.159     |1975-03-12  |0.0    |F for Fake                             |5.295444259579189|3.4728396990815216  |
|43004|500000.0|3.988     |1962-10-09  |0.0    |Long Day's Journey Into Night          |5.29544425957918

In [81]:
#df_with_newcols.write.format('csv').option('delimiter', '|').save(r'C:\Users\HP\Pyspark\output_df')

In [82]:
#df_with_newcols.write.mode('overwrite').csv(r'C:\Users\HP\Pyspark\output_df')

In [83]:
#df_with_newcols.write.saveAsTable('film_ratings')

In [84]:
df_Pandas = df_with_newcols.toPandas()

In [85]:
df_Pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 43783 entries, 0 to 43782
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   id           43694 non-null  float64
 1   budget       43783 non-null  float32
 2   popularity   43783 non-null  float32
 3   decile_rank  43783 non-null  int32  
dtypes: float32(2), float64(1), int32(1)
memory usage: 855.3 KB


In [86]:
df_py = spark.createDataFrame(df_Pandas)

In [87]:
df_py.show(10,False)

+------+------------------+-----------------+-----------+
|id    |budget            |popularity       |decile_rank|
+------+------------------+-----------------+-----------+
|NaN   |0.6000000238418579|180.0            |1          |
|NaN   |3.5850000381469727|180.0            |1          |
|6795.0|6.5E7             |161.5469970703125|1          |
|NaN   |0.9390000104904175|159.0            |1          |
|NaN   |0.8769999742507935|155.0            |1          |
|NaN   |4.892000198364258 |153.0            |1          |
|NaN   |1.399999976158142 |150.0            |1          |
|NaN   |0.6740000247955322|150.0            |1          |
|NaN   |0.6159999966621399|150.0            |1          |
|NaN   |0.8399999737739563|142.0            |1          |
+------+------------------+-----------------+-----------+
only showing top 10 rows



In [88]:
file_location = "movie_data_part2_v1.csv"
delimter = '|'
header = True
file_type = "csv"
inferSchema = False

In [90]:
df_p1 = spark.read.format(file_type)\
        .option('header', header)\
        .option('inferSchema',inferSchema )\
        .option('sep', delimter)\
        .option('header', header)\
        .load(file_location)

In [92]:
df_p1.printSchema()

root
 |-- cast: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- video: string (nullable = true)
 |-- id: string (nullable = true)



In [93]:
df_p1.count()

24998

In [95]:
df_p1 = df_p1.withColumn('id', df_p1['id'].cast('integer'))
df_p1 = df_p1.filter((df_p1['id'].isNotNull()) & (~isnan(df_p1['id'])))
df_p1.count()

24998

In [98]:
df.join(df_p1, df['id'] == df_p1['id'], 'inner').printSchema()
df.join(df_p1, df['id'] == df_p1['id'], 'inner').count()

root
 |-- id: integer (nullable = true)
 |-- budget: float (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: float (nullable = true)
 |-- title: string (nullable = true)
 |-- mean_pop: double (nullable = false)
 |-- variance: double (nullable = true)
 |-- cast: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- video: string (nullable = true)
 |-- id: integer (nullable = true)



24998

In [100]:
# Left Join
df.join(df_p1, df['id'] == df_p1['id'],'left').printSchema()
df.join(df_p1, df['id'] == df_p1['id'],'left').count()

root
 |-- id: integer (nullable = true)
 |-- budget: float (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: float (nullable = true)
 |-- title: string (nullable = true)
 |-- mean_pop: double (nullable = false)
 |-- variance: double (nullable = true)
 |-- cast: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- video: string (nullable = true)
 |-- id: integer (nullable = true)



43998

In [101]:
# Right Join
df.join(df_p1, df['id'] == df_p1['id'],'right').printSchema()
df.join(df_p1, df['id'] == df_p1['id'],'right').count()

root
 |-- id: integer (nullable = true)
 |-- budget: float (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: float (nullable = true)
 |-- title: string (nullable = true)
 |-- mean_pop: double (nullable = true)
 |-- variance: double (nullable = true)
 |-- cast: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- video: string (nullable = true)
 |-- id: integer (nullable = true)



24998

In [103]:
# Outer Join
df.join(df_p1, df['id'] == df_p1['id'],'outer').printSchema()
df.join(df_p1, df['id'] == df_p1['id'],'outer').count()

root
 |-- id: integer (nullable = true)
 |-- budget: float (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: float (nullable = true)
 |-- title: string (nullable = true)
 |-- mean_pop: double (nullable = true)
 |-- variance: double (nullable = true)
 |-- cast: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- video: string (nullable = true)
 |-- id: integer (nullable = true)



43998

In [104]:
# Left Semi Join
df.join(df_p1, df['id'] == df_p1['id'],'left_semi').printSchema()
df.join(df_p1, df['id'] == df_p1['id'],'left_semi').count()

root
 |-- id: integer (nullable = true)
 |-- budget: float (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: float (nullable = true)
 |-- title: string (nullable = true)
 |-- mean_pop: double (nullable = false)
 |-- variance: double (nullable = true)



24998

In [106]:
df.join(broadcast(df_p1), df['id'] == df_p1['id'], 'inner').printSchema()
df.join(broadcast(df_p1), df['id'] == df_p1['id'], 'inner').count()

root
 |-- id: integer (nullable = true)
 |-- budget: float (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: float (nullable = true)
 |-- title: string (nullable = true)
 |-- mean_pop: double (nullable = false)
 |-- variance: double (nullable = true)
 |-- cast: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- video: string (nullable = true)
 |-- id: integer (nullable = true)



24998

In [111]:
df = df.withColumn('release_year', year(df['release_date']))
df.printSchema()
df.count()

root
 |-- id: integer (nullable = true)
 |-- budget: float (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: float (nullable = true)
 |-- title: string (nullable = true)
 |-- mean_pop: double (nullable = false)
 |-- variance: double (nullable = true)
 |-- release_year: integer (nullable = true)



43998

In [112]:
df.dropDuplicates(['title', 'release_year']).count()

43643

In [113]:
# Histogram Demonstration
%matplotlib inline
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
# Processing the data in Spark. We can use the histogram function from the RDD 
histogram_data = df.select('popularity').rdd.flatMap(lambda x: x).histogram(25)
# Loading the Computed Histogram into a Pandas Dataframe for plotting
hist_df=pd.DataFrame(list(zip(*histogram_data)), columns=['bin', 'frequency'])

In [116]:
histogram_data

([0.6000000238418579,
  7.776000022888184,
  14.95200002193451,
  22.128000020980835,
  29.30400002002716,
  36.480000019073486,
  43.65600001811981,
  50.83200001716614,
  58.00800001621246,
  65.18400001525879,
  72.36000001430511,
  79.53600001335144,
  86.71200001239777,
  93.88800001144409,
  101.06400001049042,
  108.24000000953674,
  115.41600000858307,
  122.5920000076294,
  129.76800000667572,
  136.94400000572205,
  144.12000000476837,
  151.2960000038147,
  158.47200000286102,
  165.64800000190735,
  172.82400000095367,
  180.0],
 [35637,
  6835,
  981,
  168,
  49,
  12,
  3,
  3,
  3,
  5,
  5,
  15,
  11,
  15,
  10,
  8,
  7,
  3,
  1,
  3,
  3,
  2,
  2,
  0,
  2])