# Chapter 2

## Connect and Read in Data

In [None]:
# Connect to Spark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Data_Wrangling").getOrCreate()

In [17]:
# Set parameters
file_location = "Ch02/Chapter2_Data/movie_data_part1.csv"
file_type = "csv"
infer_schema = "False"
first_row_is_header = "True"
delimiter = "|"

In [18]:
# Read in dataframe
df_raw = (spark.read.format(file_type)
                    .option("inferSchema", infer_schema)
                    .option("header", first_row_is_header)
                    .option("sep", delimiter)
                    .load(file_location))

## Exploratory Analysis

In [19]:
df_raw.printSchema()

root
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: string (nullable = true)



In [20]:
df_raw.dtypes

[('belongs_to_collection', 'string'),
 ('budget', 'string'),
 ('id', 'string'),
 ('original_language', 'string'),
 ('original_title', 'string'),
 ('overview', 'string'),
 ('popularity', 'string'),
 ('production_companies', 'string'),
 ('production_countries', 'string'),
 ('release_date', 'string'),
 ('revenue', 'string'),
 ('runtime', 'string'),
 ('status', 'string'),
 ('tagline', 'string'),
 ('title', 'string'),
 ('vote_average', 'string')]

In [21]:
df_raw.count()

43998

In [23]:
print(f'The total number of records is {df_raw.count()}!')

The total number of records is 43998!


In [24]:
select_columns=['id','budget','popularity','release_date','revenue','title']

In [26]:
df = df_raw.select(*select_columns)

In [27]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- title: string (nullable = true)



In [28]:
df.show()

+-----+-------+------------------+------------+-------+--------------------+
|   id| budget|        popularity|release_date|revenue|               title|
+-----+-------+------------------+------------+-------+--------------------+
|43000|      0|             2.503|  1962-05-23|      0|The Elusive Corporal|
|43001|      0|              5.51|  1962-11-12|      0|  Sundays and Cybele|
|43002|      0|              5.62|  1962-05-24|      0|Lonely Are the Brave|
|43003|      0|             7.159|  1975-03-12|      0|          F for Fake|
|43004| 500000|             3.988|  1962-10-09|      0|Long Day's Journe...|
|43006|      0|             3.194|  1962-03-09|      0|           My Geisha|
|43007|      0|             2.689|  1962-10-31|      0|Period of Adjustment|
|43008|      0|             6.537|  1959-03-13|      0|    The Hanging Tree|
|43010|      0|             4.297|  1962-01-01|      0|Sherlock Holmes a...|
|43011|      0|             4.417|  1962-01-01|      0|  Sodom and Gomorrah|

## Missing Values

In [29]:
# Import sql functions
from pyspark.sql.functions import *

In [31]:
# Calculate the missing values in one column
df.filter((df['popularity']=="")|df['popularity'].isNull()|isnan(df['popularity'])).count()

215

In [33]:
# Calculate the missing values in all columns
df.select([count(when((col(c)=='')|col(c).isNull()|isnan(c), c)).alias(c) for c in df.columns]).show()

+---+------+----------+------------+-------+-----+
| id|budget|popularity|release_date|revenue|title|
+---+------+----------+------------+-------+-----+
|125|   125|       215|         221|    215|  304|
+---+------+----------+------------+-------+-----+



In [35]:
# One way frequencies
df.groupBy(df['title']).count().show()

+--------------------+-----+
|               title|count|
+--------------------+-----+
|   The Corn Is Green|    1|
|Meet The Browns -...|    1|
|Morenita, El Esca...|    1|
| Father Takes a Wife|    1|
|The Werewolf of W...|    1|
|My Wife Is a Gang...|    1|
|Depeche Mode: Tou...|    1|
|  A Woman Is a Woman|    1|
|History Is Made a...|    1|
|      Colombian Love|    1|
|        Ace Attorney|    1|
|     Not Like Others|    1|
|40 Guns to Apache...|    1|
|          Middle Men|    1|
|         It's a Gift|    1|
|    La Vie de Bohème|    1|
|Rasputin: The Mad...|    1|
|The Ballad of Jac...|    1|
|         How to Deal|    1|
|             Freaked|    1|
+--------------------+-----+
only showing top 20 rows



In [36]:
# One way freq and sort
df.groupBy(df['title']).count().sort(desc("count")).show(10, False)

+--------------------+-----+
|title               |count|
+--------------------+-----+
|NULL                |304  |
|Les Misérables      |8    |
|The Three Musketeers|8    |
|Cinderella          |8    |
|A Christmas Carol   |7    |
|Hamlet              |7    |
|The Island          |7    |
|Dracula             |7    |
|Frankenstein        |7    |
|Framed              |6    |
+--------------------+-----+
only showing top 10 rows



In [39]:
# Sort and filter one way frequencies
df_temp = df.filter((df['title']!='') & (df['title'].isNotNull()) & (~isnan(df['title'])))

In [42]:
# Find titles that are repeated more than 4 times
(df_temp.groupBy(df_temp['title'])
        .count()
        .filter("`count`>4")
        .sort(col("count").desc())
        .show(10,False))

+--------------------+-----+
|title               |count|
+--------------------+-----+
|Les Misérables      |8    |
|The Three Musketeers|8    |
|Cinderella          |8    |
|A Christmas Carol   |7    |
|Frankenstein        |7    |
|Hamlet              |7    |
|The Island          |7    |
|Dracula             |7    |
|First Love          |6    |
|Beauty and the Beast|6    |
+--------------------+-----+
only showing top 10 rows



## Casting Variables

In [44]:
# Show types
df.dtypes

[('id', 'string'),
 ('budget', 'string'),
 ('popularity', 'string'),
 ('release_date', 'string'),
 ('revenue', 'string'),
 ('title', 'string')]

In [46]:
# Fix the budget variable
df = df.withColumn('budget', df['budget'].cast("float"))

In [47]:
df.dtypes

[('id', 'string'),
 ('budget', 'float'),
 ('popularity', 'string'),
 ('release_date', 'string'),
 ('revenue', 'string'),
 ('title', 'string')]

In [50]:
df.show(10)

+-----+--------+----------+------------+-------+--------------------+
|   id|  budget|popularity|release_date|revenue|               title|
+-----+--------+----------+------------+-------+--------------------+
|43000|     0.0|     2.503|  1962-05-23|      0|The Elusive Corporal|
|43001|     0.0|      5.51|  1962-11-12|      0|  Sundays and Cybele|
|43002|     0.0|      5.62|  1962-05-24|      0|Lonely Are the Brave|
|43003|     0.0|     7.159|  1975-03-12|      0|          F for Fake|
|43004|500000.0|     3.988|  1962-10-09|      0|Long Day's Journe...|
|43006|     0.0|     3.194|  1962-03-09|      0|           My Geisha|
|43007|     0.0|     2.689|  1962-10-31|      0|Period of Adjustment|
|43008|     0.0|     6.537|  1959-03-13|      0|    The Hanging Tree|
|43010|     0.0|     4.297|  1962-01-01|      0|Sherlock Holmes a...|
|43011|     0.0|     4.417|  1962-01-01|      0|  Sodom and Gomorrah|
+-----+--------+----------+------------+-------+--------------------+
only showing top 10 

In [52]:
# Looping over columns
from pyspark.sql.types import *
int_vars = ['id']
float_vars = ['budget', 'popularity', 'revenue']
date_vars = ['release_date']

for column in float_vars:
    df=df.withColumn(column, df[column].cast(FloatType()))

for column in int_vars:
    df=df.withColumn(column, df[column].cast(IntegerType()))

for column in date_vars:
    df=df.withColumn(column, df[column].cast(DateType()))

df.dtypes

[('id', 'int'),
 ('budget', 'float'),
 ('popularity', 'float'),
 ('release_date', 'date'),
 ('revenue', 'float'),
 ('title', 'string')]

In [53]:
df.show(10, False)

+-----+--------+----------+------------+-------+---------------------------------------+
|id   |budget  |popularity|release_date|revenue|title                                  |
+-----+--------+----------+------------+-------+---------------------------------------+
|43000|0.0     |2.503     |1962-05-23  |0.0    |The Elusive Corporal                   |
|43001|0.0     |5.51      |1962-11-12  |0.0    |Sundays and Cybele                     |
|43002|0.0     |5.62      |1962-05-24  |0.0    |Lonely Are the Brave                   |
|43003|0.0     |7.159     |1975-03-12  |0.0    |F for Fake                             |
|43004|500000.0|3.988     |1962-10-09  |0.0    |Long Day's Journey Into Night          |
|43006|0.0     |3.194     |1962-03-09  |0.0    |My Geisha                              |
|43007|0.0     |2.689     |1962-10-31  |0.0    |Period of Adjustment                   |
|43008|0.0     |6.537     |1959-03-13  |0.0    |The Hanging Tree                       |
|43010|0.0     |4.297

## Descriptive Statistics