In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, isnan, count, udf, round as spark_round
from pyspark.sql.types import StringType, FloatType


from src.utils.s3_manager import S3Manager
import seaborn as sns
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import os 
import nltk


In [2]:
data_dir = "/Users/ilan/big-data-airflow-project/data"

In [3]:
spark = SparkSession.builder \
    .appName("EDA with Spark") \
    .getOrCreate()

24/05/21 20:35:17 WARN Utils: Your hostname, Ordinateur-portable-de-Ilan.local resolves to a loopback address: 127.0.0.1; using 192.168.1.18 instead (on interface en0)
24/05/21 20:35:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/21 20:35:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [12]:
df = spark.read.parquet(data_dir+"/allocine_movies.parquet", header=True, inferSchema=True)

In [13]:
df.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Runtime: string (nullable = true)
 |-- Genre: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Actors: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Action: integer (nullable = true)
 |-- Adventure: integer (nullable = true)
 |-- Animation: integer (nullable = true)
 |-- Biopic: integer (nullable = true)
 |-- Comedy: integer (nullable = true)
 |-- Crime: integer (nullable = true)
 |-- Documentary: integer (nullable = true)
 |-- Drama: integer (nullable = true)
 |-- Horror: integer (nullable = true)
 |-- Famille: integer (nullable = true)
 |-- Guerre: integer (nullable = true)
 |-- MusicalRomance: integer (nullable = true)
 |-- Sci-Fi: integer (nullable = true)
 |-- Thriller: integer (nullable = true)
 |-- Western: integer (nullable = true)


In [None]:
df.show()

In [None]:
print("The shape of the allocine dataset is ", (df.count(), len(df.columns)))

In [None]:
df = df.drop("Release Date", "Director")

# Renaming columns to match the netflix dataset

In [None]:
df = df.withColumnRenamed("Duration", "Runtime")
df = df.withColumnRenamed("Synopsis", "Summary")

# Dealing with missing values

In [None]:
missing_values = df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns])
missing_values.show()

#### Runtime column

In [None]:
df = df.dropna(subset=["Runtime"])

In [None]:
def convert_runtime_to_interval(runtime):
    hours, minutes = map(int, runtime.replace('min', '').replace('h', '').split())
    total_hours = hours + minutes / 60
    if total_hours > 2:
        return '> 2 hrs'
    elif total_hours < 0.5:
        return '< 30 minutes'
    elif total_hours < 1 and total_hours >= 0.5 :
        return '30 - 60 mins'
    else:
        return '1-2 hour'
    

In [None]:
convert_runtime_udf = udf(convert_runtime_to_interval, StringType())

In [None]:
df = df.withColumn("Runtime", convert_runtime_udf(df["Runtime"]))

In [None]:
df.show()

#### Rating column

We're gonna merge the spectator rating and the press rating into one column called rating

In [None]:
from pyspark.sql.functions import regexp_replace

df = df.withColumn("Press Rating", regexp_replace(col("Press Rating"), ",", "."))
df = df.withColumn("Press Rating", when(col("Press Rating") == "--", None).otherwise(col("Press Rating")))
df = df.withColumn("Press Rating", col("Press Rating").cast(FloatType()))

df = df.withColumn("Spectator Rating", regexp_replace(col("Spectator Rating"), ",", "."))
df = df.withColumn("Spectator Rating", when(col("Spectator Rating") == "--", None).otherwise(col("Spectator Rating")))
df = df.withColumn("Spectator Rating", col("Spectator Rating").cast(FloatType()))

In [None]:
from pyspark.sql.functions import mean

mean_press = df.select(mean(col("Press Rating")).alias("mean_press")).collect()[0]["mean_press"]
df = df.na.fill({"Press Rating": mean_press})

mean_spectator = df.select(mean(col("Spectator Rating")).alias("mean_spectator")).collect()[0]["mean_spectator"]
df = df.na.fill({"Spectator Rating": mean_spectator})

In [None]:
df = df.withColumn("Rating", spark_round((col("Press Rating") + col("Spectator Rating")) / 2, 1))
df = df.drop("Press Rating", "Spectator Rating")

In [None]:
df.show()

#### Transform the genre column. We will create one col for each genre and fill it with 1 if the movie belongs to this genre, 0 otherwise

In [None]:
from pyspark.sql.functions import split

df = df.withColumn("Genre", split(col("Genre"), ", "))
genres = ["Action", "Adventure", "Drama", "Sci-Fi", "Crime", "Thriller", "Comedy", "Biography", "Documentary"]

In [None]:
from pyspark.sql.functions import array_contains

for genre in genres:
    df = df.withColumn(genre, array_contains(col("Genre"), genre).cast("integer"))

In [None]:
df.show()

In [None]:
df.groupBy("Genre").count().orderBy(col("count").desc()).show()

In [9]:
print(df.printSchema())

NameError: name 'df' is not defined