In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col, sum as _sum, when
from pyspark.ml.feature import VectorAssembler, StandardScaler


spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()
sc = spark.sparkContext

In [3]:
df = spark.read.csv('movies.csv', header=True, inferSchema=True)

In [4]:
null_counts = df.select([_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])

null_counts.show()

+---+-----+------+-----------------+--------+----------+--------------------+------------+------+-------+-------+------+-------+------------+----------+-------+--------+-----------+-------------+---------------+
| id|title|genres|original_language|overview|popularity|production_companies|release_date|budget|revenue|runtime|status|tagline|vote_average|vote_count|credits|keywords|poster_path|backdrop_path|recommendations|
+---+-----+------+-----------------+--------+----------+--------------------+------------+------+-------+-------+------+-------+------------+----------+-------+--------+-----------+-------------+---------------+
|  0|    4|210307|                9|  118240|         7|              384902|       51569|     5|      2|  34334|     2| 613810|          27|         5| 224638|  511524|     184582|       498988|         686050|
+---+-----+------+-----------------+--------+----------+--------------------+------------+------+-------+-------+------+-------+------------+----------+

In [5]:
df_filled = df.fillna({"genres": " ", "credits": " "})

In [6]:
null_counts = df_filled.select([_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_filled.columns])

null_counts.show()

+---+-----+------+-----------------+--------+----------+--------------------+------------+------+-------+-------+------+-------+------------+----------+-------+--------+-----------+-------------+---------------+
| id|title|genres|original_language|overview|popularity|production_companies|release_date|budget|revenue|runtime|status|tagline|vote_average|vote_count|credits|keywords|poster_path|backdrop_path|recommendations|
+---+-----+------+-----------------+--------+----------+--------------------+------------+------+-------+-------+------+-------+------------+----------+-------+--------+-----------+-------------+---------------+
|  0|    4|     0|                9|  118240|         7|              384902|       51569|     5|      2|  34334|     2| 613810|          27|         5|      0|  511524|     184582|       498988|         686050|
+---+-----+------+-----------------+--------+----------+--------------------+------------+------+-------+-------+------+-------+------------+----------+

In [7]:
df_without_status = df_filled.filter(df_filled.status == 'Released')

In [8]:
df_cols_dropped = df_without_status.drop("id", 'production_companies', 'tagline','keywords', 'poster_path', 'backdrop_path', 'recommendations', 'status')

In [9]:
df_cols_dropped.show()

+--------------------+--------------------+-----------------+--------------------+----------+------------+-----------+------------+-------+------------+----------+--------------------+
|               title|              genres|original_language|            overview|popularity|release_date|     budget|     revenue|runtime|vote_average|vote_count|             credits|
+--------------------+--------------------+-----------------+--------------------+----------+------------+-----------+------------+-------+------------+----------+--------------------+
|   Meg 2: The Trench|Action-Science Fi...|               en|An exploratory di...|  8763.998|  2023-08-02|129000000.0| 352056482.0|  116.0|       7.079|    1365.0|Jason Statham-Wu ...|
| The Pope's Exorcist|Horror-Mystery-Th...|               en|Father Gabriele A...|  5953.227|  2023-04-05| 18000000.0|  65675816.0|  103.0|       7.433|     545.0|Russell Crowe-Dan...|
|Deadpool & Wolverine|Action-Comedy-Sci...|               en|A listless Wad

In [10]:
df_cols_dropped.count()

717058

In [11]:
df_no_na = df_cols_dropped.dropna()

In [12]:
df_no_na.show()

+--------------------+--------------------+-----------------+--------------------+----------+------------+-----------+------------+-------+------------+----------+--------------------+
|               title|              genres|original_language|            overview|popularity|release_date|     budget|     revenue|runtime|vote_average|vote_count|             credits|
+--------------------+--------------------+-----------------+--------------------+----------+------------+-----------+------------+-------+------------+----------+--------------------+
|   Meg 2: The Trench|Action-Science Fi...|               en|An exploratory di...|  8763.998|  2023-08-02|129000000.0| 352056482.0|  116.0|       7.079|    1365.0|Jason Statham-Wu ...|
| The Pope's Exorcist|Horror-Mystery-Th...|               en|Father Gabriele A...|  5953.227|  2023-04-05| 18000000.0|  65675816.0|  103.0|       7.433|     545.0|Russell Crowe-Dan...|
|Deadpool & Wolverine|Action-Comedy-Sci...|               en|A listless Wad

In [13]:
df_no_na.count()

553375

In [14]:
null_counts = df_no_na.select([_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_no_na.columns])

null_counts.show()

+-----+------+-----------------+--------+----------+------------+------+-------+-------+------------+----------+-------+
|title|genres|original_language|overview|popularity|release_date|budget|revenue|runtime|vote_average|vote_count|credits|
+-----+------+-----------------+--------+----------+------------+------+-------+-------+------------+----------+-------+
|    0|     0|                0|       0|         0|           0|     0|      0|      0|           0|         0|      0|
+-----+------+-----------------+--------+----------+------------+------+-------+-------+------------+----------+-------+



In [15]:
df_no_dup = df_no_na.dropDuplicates()

In [16]:
df_no_dup.show()

+--------------------+--------------------+-----------------+--------------------+----------+------------+-----------+------------+-------+------------+----------+--------------------+
|               title|              genres|original_language|            overview|popularity|release_date|     budget|     revenue|runtime|vote_average|vote_count|             credits|
+--------------------+--------------------+-----------------+--------------------+----------+------------+-----------+------------+-------+------------+----------+--------------------+
|John Wick: Chapter 4|Action-Thriller-C...|               en|With the price on...|   659.169|  2023-03-22| 90000000.0| 426978565.0|  170.0|       7.827|    4159.0|Keanu Reeves-Donn...|
|Harry Potter and ...|   Adventure-Fantasy|               en|Harry Ron and Her...|   117.479|  2010-11-17|250000000.0| 954305868.0|  146.0|       7.754|   18658.0|Daniel Radcliffe-...|
|            Renfield|Comedy-Horror-Fan...|               en|Having grown s

In [17]:
df_no_dup.count()

553345

In [18]:
# df_no_dup_mov = df_no_dup.dropDuplicates(subset=["title"])

In [19]:
# df_no_dup_mov.show()

In [20]:
# df_no_dup_mov.count()

In [21]:
# df_no_dup_mov.show()

In [22]:
# df_no_dup_mov.columns

In [23]:
# df_year.coalesce(1).write.option("header", "true").csv("Project_data_2")

In [24]:
df_year = df_no_dup.withColumn("release_year", F.split(F.col("release_date"), "-")[0]).alias('release_year')

In [25]:
df_year.show()

+--------------------+--------------------+-----------------+--------------------+----------+------------+-----------+------------+-------+------------+----------+--------------------+------------+
|               title|              genres|original_language|            overview|popularity|release_date|     budget|     revenue|runtime|vote_average|vote_count|             credits|release_year|
+--------------------+--------------------+-----------------+--------------------+----------+------------+-----------+------------+-------+------------+----------+--------------------+------------+
|John Wick: Chapter 4|Action-Thriller-C...|               en|With the price on...|   659.169|  2023-03-22| 90000000.0| 426978565.0|  170.0|       7.827|    4159.0|Keanu Reeves-Donn...|        2023|
|Harry Potter and ...|   Adventure-Fantasy|               en|Harry Ron and Her...|   117.479|  2010-11-17|250000000.0| 954305868.0|  146.0|       7.754|   18658.0|Daniel Radcliffe-...|        2010|
|         

In [26]:
# df_year.coalesce(1).write.option("header", "true").csv("Project_data_1")

In [27]:
df_year = df_year.drop('release_date')

In [28]:
# numerical_features = ['popularity', 'budget', 'revenue', 'runtime', 'vote_average', 'vote_count', 'release_year']

# for col_name in numerical_features:
#     df_year = df_year.withColumn(col_name, col(col_name).cast("float"))


In [29]:
null_counts = df_year.select([_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_year.columns])

null_counts.show()

+-----+------+-----------------+--------+----------+------+-------+-------+------------+----------+-------+------------+
|title|genres|original_language|overview|popularity|budget|revenue|runtime|vote_average|vote_count|credits|release_year|
+-----+------+-----------------+--------+----------+------+-------+-------+------------+----------+-------+------------+
|    0|     0|                0|       0|         0|     0|      0|      0|           0|         0|      0|           0|
+-----+------+-----------------+--------+----------+------+-------+-------+------------+----------+-------+------------+



In [30]:
# df_year = df_year.dropna()

In [31]:
# null_counts = df_year.select([_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_year.columns])

# null_counts.show()

In [32]:
df_year.count()

553345

In [33]:
df_year.show()

+--------------------+--------------------+-----------------+--------------------+----------+-----------+------------+-------+------------+----------+--------------------+------------+
|               title|              genres|original_language|            overview|popularity|     budget|     revenue|runtime|vote_average|vote_count|             credits|release_year|
+--------------------+--------------------+-----------------+--------------------+----------+-----------+------------+-------+------------+----------+--------------------+------------+
|John Wick: Chapter 4|Action-Thriller-C...|               en|With the price on...|   659.169| 90000000.0| 426978565.0|  170.0|       7.827|    4159.0|Keanu Reeves-Donn...|        2023|
|Harry Potter and ...|   Adventure-Fantasy|               en|Harry Ron and Her...|   117.479|250000000.0| 954305868.0|  146.0|       7.754|   18658.0|Daniel Radcliffe-...|        2010|
|            Renfield|Comedy-Horror-Fan...|               en|Having grown s

In [34]:
df_year.coalesce(1).write.mode("overwrite").csv("project_data_csv")

In [44]:
df_year.coalesce(1).write.mode("overwrite").parquet("project_data_parquet")

In [35]:

# assembler = VectorAssembler(inputCols=numerical_features, outputCol="numerical_features")
# df_vector = assembler.transform(df_year)


# scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_numerical_features")
# scaler_model = scaler.fit(df_vector)
# df_scaler = scaler_model.transform(df_vector)

In [36]:
# df_scaler = df_scaler.withColumn(
#     'combined_text', 
#     F.concat_ws(" ", col('title'), col('original_language'), col('overview'), col('credits'), col('genres'))
# )

In [37]:
# null_counts = df_scaler.select([_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_scaler.columns])

# null_counts.show()

In [38]:
# df_scaler.show()

In [39]:
# df_string = df_scaler.selectExpr([f"cast({col} as string) as {col}" for col in df_scaler.columns])

In [40]:
# df_string.show()

In [41]:
# df_string.write.csv('file:///home/talentum/shared/Project/data')

In [42]:
# df_no_dup_mov.coalesce(1).write.option("header", "true").csv("Project_data")