In [1]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
.appName("MovieLens PySpark Project") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()


spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/18 19:45:15 WARN Utils: Your hostname, sakhawat, resolves to a loopback address: 127.0.1.1; using 10.147.169.209 instead (on interface wlp3s0)
25/12/18 19:45:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/18 19:45:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [3]:
movies = spark.read.csv("dataset/movies_metadata.csv", header=True, inferSchema=True)
ratings = spark.read.csv("dataset/ratings.csv", header=True, inferSchema=True)
keywords = spark.read.csv("dataset/keywords.csv", header=True, inferSchema=True)
credits = spark.read.csv("dataset/credits.csv", header=True, inferSchema=True)
links = spark.read.csv("dataset/links.csv", header=True, inferSchema=True)

                                                                                

In [4]:
movies.printSchema()
ratings.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [5]:
movies_clean = movies \
.withColumn("budget", col("budget").cast("double")) \
.withColumn("revenue", col("revenue").cast("double")) \
.withColumn("vote_average", col("vote_average").cast("double")) \
.withColumn("vote_count", col("vote_count").cast("double")) \
.withColumn("popularity", col("popularity").cast("double")) \
.withColumn("runtime", col("runtime").cast("double"))


movies_clean = movies_clean.filter(
(col("budget") > 0) &
(col("revenue") > 0) &
(col("vote_count") > 0)
)

In [6]:
feature_cols = ["budget", "vote_count", "popularity", "runtime"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [20]:
from pyspark.sql.functions import when, trim, regexp_replace

def clean_numeric(colname):
    return when(
        trim(col(colname)) == "", None
    ).otherwise(
        regexp_replace(col(colname), "[^0-9.]", "")
    ).cast("double")


movies_clean = movies \
    .withColumn("budget", clean_numeric("budget")) \
    .withColumn("revenue", clean_numeric("revenue")) \
    .withColumn("vote_average", clean_numeric("vote_average")) \
    .withColumn("vote_count", clean_numeric("vote_count")) \
    .withColumn("popularity", clean_numeric("popularity")) \
    .withColumn("runtime", clean_numeric("runtime"))


In [21]:
movies_clean = movies_clean.filter(
    col("budget").isNotNull() &
    col("revenue").isNotNull() &
    col("vote_count").isNotNull() &
    col("vote_average").isNotNull() &
    (col("budget") > 0) &
    (col("revenue") > 0)
)


In [22]:
feature_cols = ["budget", "vote_count", "popularity", "runtime"]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="skip"
)


In [23]:
data_rev = assembler.transform(
    movies_clean.select(*feature_cols, "revenue")
).dropna()

train_rev, test_rev = data_rev.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(
    featuresCol="features",
    labelCol="revenue"
)

lr_model = lr.fit(train_rev)


25/12/18 19:43:32 ERROR Executor: Exception in task 4.0 in stage 31.0 (TID 136)
org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value '' of the type "STRING" cannot be cast to "DOUBLE" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"cast" was called from
line 8 in cell [20]

	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:145)
	at org.apache.spark.sql.errors.QueryExecutionErrors.invalidInputInCastToNumberError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowI

NumberFormatException: [CAST_INVALID_INPUT] The value '' of the type "STRING" cannot be cast to "DOUBLE" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"cast" was called from
line 8 in cell [20]
