In [1]:
#Path of csv file
%fs ls  /FileStore/tables/movie_metadata1.csv

In [2]:
%sql DROP TABLE IF EXISTS movie_metadata

In [3]:
%sql CREATE TABLE movie_metadata (
  color STRING,
  director_name STRING,
  num_critic_for_reviews DOUBLE,
  duration DOUBLE,
  director_facebook_likes DOUBLE,
  actor_3_facebook_likes DOUBLE,
  actor_2_name DOUBLE,
  actor_1_facebook_likes DOUBLE,
  gross DOUBLE,
  genres STRING,
  actor_1_name STRING,
  movie_title STRING,
  num_voted_users DOUBLE,
  cast_total_facebook_likes DOUBLE,
  actor_3_name STRING,
  facenumber_in_poster DOUBLE,
  plot_keywords STRING,
  movie_imdb_link STRING,
  num_user_for_reviews DOUBLE,
  language STRING,
  country STRING,
  content_rating STRING,
  budget DOUBLE,
  title_year DOUBLE,
  actor_2_facebook_likes DOUBLE,
  imdb_score DOUBLE,
  aspect_ratio DOUBLE,
  movie_facebook_likes DOUBLE
  )
  
USING com.databricks.spark.csv
OPTIONS (path "/FileStore/tables/movie_metadata.csv", header "true")

In [4]:
# Importing Dataset
dataset = spark.table("movie_metadata")
cols = dataset.columns


In [5]:
# Dropping null values
df=dataset.drop( 'color' ,
  'director_name' ,
  'num_critic_for_reviews' ,
  
  
  
  'actor_2_name' ,
  
  'gross' ,
  'genres' ,
  'actor_1_name' ,
  'movie_title' ,
  'num_voted_users' ,
  'cast_total_facebook_likes' ,
  'actor_3_name' ,
  
  'plot_keywords' ,
  'movie_imdb_link' ,
  'num_user_for_reviews' ,
  'language' ,
  'country' ,
  'content_rating' ,
  
  'title_year' ,
  
   
  'aspect_ratio' ,
  'movie_facebook_likes' )
df = df.na.drop()
display(df)

In [6]:
from pyspark.sql.functions import col  # for indicating a column using a string in the line below
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
df.printSchema()

In [7]:
from pyspark.ml.feature import VectorAssembler
featuresCols = df.columns
featuresCols.remove('imdb_score')
assembler = VectorAssembler(
    inputCols=featuresCols,
    outputCol="features")


In [8]:
from pyspark.ml import Pipeline

# Create a Pipeline.
pipeline = Pipeline(stages=[assembler]) # ref[1]
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)


In [9]:
selectedcols = ["imdb_score", "features"]
df = df.select(selectedcols)
display(df)

In [10]:
trainingData, testData = df.randomSplit([0.8, 0.2])
print trainingData.count()
print testData.count()


In [11]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="imdb_score", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [12]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

In [13]:
predictions.printSchema()

In [14]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("imdb_score", "prediction")
display(selected)

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol=rf.getLabelCol(), predictionCol=rf.getPredictionCol())


In [16]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [17]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=5) # ref[2]

cvModel = cv.fit(trainingData)

In [18]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [19]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [20]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("imdb_score", "prediction")
display(selected)

In [21]:
bestModel = cvModel.bestModel

In [22]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(df)

In [None]:
ref[1]:https://spark.apache.org/docs/latest/ml-pipeline.html#example-pipeline
ref[2]:https://spark.apache.org/docs/latest/ml-tuning.html

The text in the document by Neha Gaikwad and Nupur Deshpande is licensed under CC BY 3.0 https://creativecommons.org/licenses/by/3.0/us/
The code in the document by Neha Gaikwad and Nupur Deshpande is licensed under the MIT License https://opensource.org/licenses/MIT