# Set up PySpark in Colab

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.8.0-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
!java -version

openjdk version "11.0.7" 2020-04-14
OpenJDK Runtime Environment (build 11.0.7+10-post-Ubuntu-2ubuntu218.04)
OpenJDK 64-Bit Server VM (build 11.0.7+10-post-Ubuntu-2ubuntu218.04, mixed mode, sharing)


In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('FinalProj').getOrCreate()

In [0]:
from pyspark.sql.types import IntegerType

# Import Dataset and Clean the Data

In [0]:
# this helps you connect to the google drive
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
!ls '/content/drive/My Drive/ConFiveDance/Big Data Final Project/movie_list.csv'

'/content/drive/My Drive/ConFiveDance/Big Data Final Project/movie_list.csv'


In [0]:
# read the movie_list.csv which contains the basic info of movie e.g. revenue
movie = spark.read\
            .option('header', 'true')\
            .option('inferSchema', 'true')\
            .csv('/content/drive/My Drive/ConFiveDance/Big Data Final Project/movie_list.csv')

In [0]:
movie.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- mojo_url: string (nullable = true)
 |-- gross_revenue: integer (nullable = true)
 |-- open_weekend_revenue: double (nullable = true)
 |-- release_date: timestamp (nullable = true)
 |-- distributer: string (nullable = true)
 |-- budget: double (nullable = true)
 |-- genres: string (nullable = true)



In [0]:
# convert the budget & open_weekend_revenue columns into integer type
movie = movie.withColumn("budget", movie["budget"].cast(IntegerType()))
movie = movie.withColumn("open_weekend_revenue", movie["open_weekend_revenue"].cast(IntegerType()))

In [0]:
# examine the schema again
movie.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- mojo_url: string (nullable = true)
 |-- gross_revenue: integer (nullable = true)
 |-- open_weekend_revenue: integer (nullable = true)
 |-- release_date: timestamp (nullable = true)
 |-- distributer: string (nullable = true)
 |-- budget: integer (nullable = true)
 |-- genres: string (nullable = true)



In [0]:
!ls '/content/drive/My Drive/ConFiveDance/Big Data Final Project/rt_reviews_all.csv'

'/content/drive/My Drive/ConFiveDance/Big Data Final Project/rt_reviews_all.csv'


In [0]:
# read the rt_reviews_new.csv which contains the reviews of movies webscraped from rotten tomato
review = spark.read\
            .option('header', 'true')\
            .option('inferSchema', 'true')\
            .csv('/content/drive/My Drive/ConFiveDance/Big Data Final Project/rt_reviews_all.csv')

In [0]:
review.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- release_date_rt: string (nullable = true)
 |-- critic: string (nullable = true)
 |-- fresh_or_rotten: string (nullable = true)
 |-- review: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- original_score: string (nullable = true)
 |-- top_critic: string (nullable = true)



In [0]:
review.show(5)

+---+--------------------+---------------+---------------+---------------+--------------------+-----------------+--------------+----------+
|_c0|         movie_title|release_date_rt|         critic|fresh_or_rotten|              review|      review_date|original_score|top_critic|
+---+--------------------+---------------+---------------+---------------+--------------------+-----------------+--------------+----------+
|  0|Beauty and the Beast|   Mar 17, 2017|  Avaryl Halley|          fresh|It was fun in parts.|     May 11, 2020|          null|     False|
|  1|Beauty and the Beast|   Mar 17, 2017|   Andrew Galdi|          fresh|I actually liked ...|     May 11, 2020|          null|     False|
|  2|Beauty and the Beast|   Mar 17, 2017| Steven Prokopy|         rotten|As elaborate and ...|      May 5, 2020|          null|     False|
|  3|Beauty and the Beast|   Mar 17, 2017|    Tim Brennan|          fresh|The tale of Beaut...|February 19, 2020|          null|     False|
|  4|Beauty and the 

In [0]:
from pyspark.sql.functions import unix_timestamp, to_timestamp
# convert the date string format to timestamp format
review = review.withColumn("release_date_rt", to_timestamp(unix_timestamp("release_date_rt", "MMM d, yyyy")))
review = review.withColumn("review_date", to_timestamp(unix_timestamp("review_date", "MMM d, yyyy")))

In [0]:
review.show(5)

+---+--------------------+-------------------+---------------+---------------+--------------------+-------------------+--------------+----------+
|_c0|         movie_title|    release_date_rt|         critic|fresh_or_rotten|              review|        review_date|original_score|top_critic|
+---+--------------------+-------------------+---------------+---------------+--------------------+-------------------+--------------+----------+
|  0|Beauty and the Beast|2017-03-17 00:00:00|  Avaryl Halley|          fresh|It was fun in parts.|2020-05-11 00:00:00|          null|     False|
|  1|Beauty and the Beast|2017-03-17 00:00:00|   Andrew Galdi|          fresh|I actually liked ...|2020-05-11 00:00:00|          null|     False|
|  2|Beauty and the Beast|2017-03-17 00:00:00| Steven Prokopy|         rotten|As elaborate and ...|2020-05-05 00:00:00|          null|     False|
|  3|Beauty and the Beast|2017-03-17 00:00:00|    Tim Brennan|          fresh|The tale of Beaut...|2020-02-19 00:00:00|     

In [0]:
review.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- release_date_rt: timestamp (nullable = true)
 |-- critic: string (nullable = true)
 |-- fresh_or_rotten: string (nullable = true)
 |-- review: string (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- original_score: string (nullable = true)
 |-- top_critic: string (nullable = true)



# Feature Engineering

## Link the critics to the movies dataframes
Some movies share the same title but were released on different dates. Therefore, when joining the movie dataframe and the review dataframe, we need to make sure the movie titles from the two datasets equal and the movie release dates equal as well. In this way, the critics can be correctly linked to the movies they commentted on. We also select the pre-released reviews only, because we care about whether harbinger of failures can predict an outcome of a movie.

In [0]:
movie.createOrReplaceTempView("t1")
review.createOrReplaceTempView("t2")
# join the movie and reviews dataframes, and only select the pre-released reviews
review_movie = spark.sql("""
                        SELECT t1.movie_title, DATE(release_date), gross_revenue, open_weekend_revenue, budget, critic, fresh_or_rotten,
                        DATE(review_date), original_score, top_critic
                        FROM t1 
                        JOIN t2 ON t1.movie_title = t2.movie_title
                        AND t1.release_date = t2.release_date_rt
                        WHERE DATE(t2.release_date_rt) > DATE(t2.review_date)
                        """)
review_movie.show(10)

+--------------------+------------+-------------+--------------------+---------+--------------------+---------------+-----------+--------------+----------+
|         movie_title|release_date|gross_revenue|open_weekend_revenue|   budget|              critic|fresh_or_rotten|review_date|original_score|top_critic|
+--------------------+------------+-------------+--------------------+---------+--------------------+---------------+-----------+--------------+----------+
|Beauty and the Beast|  2017-03-17|    504014165|           174750616|160000000|      Leonard Maltin|          fresh| 2017-03-16|          null|     False|
|Beauty and the Beast|  2017-03-17|    504014165|           174750616|160000000|        Gary Wolcott|          fresh| 2017-03-16|           5/5|     False|
|Beauty and the Beast|  2017-03-17|    504014165|           174750616|160000000|       Willie Waffle|          fresh| 2017-03-16|         3.5/4|     False|
|Beauty and the Beast|  2017-03-17|    504014165|           1747

In [0]:
review_movie.select("movie_title").distinct().count()

1455

In [0]:
review_movie.count()

46405

## Split the classification and prediction set & Define success of a movie
The classification set covers movies released from 2017 to 2018, and the prediction set includes movies released in 2019.
The purpose of the split is to use classification set to evaluate the critics on whether they are potentially harbingers of failures. Then we examine whether these critics can help predict the success or failure of the movies in the prediction set. 

To define the success of a movie, we calculate the following metric: <br/> 

\begin{align}
 \frac{Opening\;Weekend\;Revenue}{Gross\;Revenue}
\end{align}

If the metric is greater or equal to 0.3, then we say the movie is successful.

We also indicate whether the critic recommended a movie and whether the critic recommended a flop movie.

In [0]:
review_movie.createOrReplaceTempView("t1")
class_set = spark.sql("""
                        SELECT movie_title, release_date, gross_revenue, open_weekend_revenue, budget, critic, fresh_or_rotten,
                        review_date, original_score, top_critic, open_weekend_revenue/gross_revenue AS open2gross, 
                        CASE 
                          WHEN open_weekend_revenue/gross_revenue >=0.3 THEN 1
                          ELSE 0
                        END AS SuccessCT1,
                        CASE
                          WHEN fresh_or_rotten = "fresh" THEN 1
                          ELSE 0
                        END AS Whether_Recommend,
                        CASE
                          WHEN open_weekend_revenue/gross_revenue < 0.3 AND fresh_or_rotten = "fresh" THEN 1
                          ELSE 0
                        END AS Whether_Recommend_Flop
                        FROM t1 
                        WHERE release_date BETWEEN DATE("2017-01-01") AND DATE("2018-12-31")
                        """)
class_set.show(10)

+--------------------+------------+-------------+--------------------+---------+--------------------+---------------+-----------+--------------+----------+-----------------+----------+-----------------+----------------------+
|         movie_title|release_date|gross_revenue|open_weekend_revenue|   budget|              critic|fresh_or_rotten|review_date|original_score|top_critic|       open2gross|SuccessCT1|Whether_Recommend|Whether_Recommend_Flop|
+--------------------+------------+-------------+--------------------+---------+--------------------+---------------+-----------+--------------+----------+-----------------+----------+-----------------+----------------------+
|Beauty and the Beast|  2017-03-17|    504014165|           174750616|160000000|      Leonard Maltin|          fresh| 2017-03-16|          null|     False|0.346717668143315|         1|                1|                     0|
|Beauty and the Beast|  2017-03-17|    504014165|           174750616|160000000|        Gary Wol

In [0]:
class_set.select("movie_title").distinct().count()

1002

In [0]:
review_movie.createOrReplaceTempView("t1")
pred_set = spark.sql("""
                        SELECT movie_title, release_date, gross_revenue, open_weekend_revenue, budget, critic, fresh_or_rotten,
                        review_date, original_score, top_critic, open_weekend_revenue/gross_revenue AS open2gross, 
                        CASE 
                          WHEN open_weekend_revenue/gross_revenue >=0.3 THEN 1
                          ELSE 0
                        END AS SuccessCT1,
                        CASE
                          WHEN fresh_or_rotten = "fresh" THEN 1
                          ELSE 0
                        END AS Whether_Recommend,
                        CASE
                          WHEN open_weekend_revenue/gross_revenue < 0.3 AND fresh_or_rotten = "fresh" THEN 1
                          ELSE 0
                        END AS Whether_Recommend_Flop
                        FROM t1 
                        WHERE release_date BETWEEN DATE("2019-01-01") AND DATE("2019-12-31")
                        """)
pred_set.show(10)

+-----------------+------------+-------------+--------------------+---------+-------------------+---------------+-----------+--------------+----------+-------------------+----------+-----------------+----------------------+
|      movie_title|release_date|gross_revenue|open_weekend_revenue|   budget|             critic|fresh_or_rotten|review_date|original_score|top_critic|         open2gross|SuccessCT1|Whether_Recommend|Whether_Recommend_Flop|
+-----------------+------------+-------------+--------------------+---------+-------------------+---------------+-----------+--------------+----------+-------------------+----------+-----------------+----------------------+
|Avengers: Endgame|  2019-04-26|    858373000|           357115007|356000000|Jeffrey M. Anderson|          fresh| 2019-04-25|           4/4|     False|0.41603709226641566|         1|                1|                     0|
|Avengers: Endgame|  2019-04-26|    858373000|           357115007|356000000|           Rob Dean|       

In [0]:
pred_set.select("movie_title").distinct().count()

453

In [0]:
# calculate the proportion of successful movies in the prediction set
pred_set.filter(pred_set.SuccessCT1 == 1).select("movie_title").distinct().count()/pred_set.select("movie_title").distinct().count()

0.4481236203090508

## Calculate FlopAffinity Index
We use flop affinity index to indicate the extent of a critic having a taste being different from the mass population.


\begin{align}
 FlopAffinity = \frac{Number\, of\, Flops\, Recommended}{Number\,of\,New\,Movies\,Recommended}
\end{align}


In [0]:
class_set.createOrReplaceTempView("t1")
class_critic = spark.sql("""
                        SELECT movie_title, critic, top_critic, open2gross, 
                        SuccessCT1, Whether_Recommend, Whether_Recommend_Flop,
                        (SUM(Whether_Recommend_Flop) OVER(
                          PARTITION BY critic 
                          RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
                          /SUM(Whether_Recommend) OVER(
                          PARTITION BY critic 
                          RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) AS FlopAffinity
                        FROM t1
                        """)
class_critic.show(10)

+--------------------+-------------+----------+--------------------+----------+-----------------+----------------------+-------------------+
|         movie_title|       critic|top_critic|          open2gross|SuccessCT1|Whether_Recommend|Whether_Recommend_Flop|       FlopAffinity|
+--------------------+-------------+----------+--------------------+----------+-----------------+----------------------+-------------------+
|      The Red Turtle|Ashley Moreno|     False|0.022788061268539026|         0|                1|                     1| 0.6666666666666666|
|My Entire High Sc...|Ashley Moreno|     False| 0.19865569153492155|         0|                1|                     1| 0.6666666666666666|
|   Support the Girls|Ashley Moreno|     False|   0.396262507357269|         1|                1|                     0| 0.6666666666666666|
|        Wonder Woman| Cary Darling|      True| 0.25026812605736476|         0|                1|                     1|0.47619047619047616|
|      Thor: 

## Group Critics According to their FlopAffinity
We first filter out those critics who recommended no more than  2 movies. Then we group the rest critics into four groups according to the quartile of the FlopAffinity index.

25% percentile of FlopAffinity is 40%. <br/>
50% percentile of FlopAffinity is 60%. <br/>
75% percentile of FlopAffinity is 75%. <br/>

Group1: FlopAffinity $\in$ [0, 40%] <br/>
Group2: FlopAffinity $\in$ (40%, 60%] <br/>
Group3: FlopAffinity $\in$ (60%, 75%] <br/>
Group4: FlopAffinity $>$ 75% <br/>
Other: critics who recommended no more than 2 movies


In [0]:
class_critic.createOrReplaceTempView("t1")
class_dist = spark.sql("""
                        SELECT movie_title, critic, top_critic, open2gross, 
                        SuccessCT1, Whether_Recommend, Whether_Recommend_Flop,
                        FlopAffinity, (SUM(Whether_Recommend) OVER(PARTITION BY critic RANGE BETWEEN UNBOUNDED PRECEDING
                        AND UNBOUNDED FOLLOWING)) AS Total_Recommend_byCritic
                        FROM t1
                        """)
class_dist.show(10)

+--------------------+-------------+----------+--------------------+----------+-----------------+----------------------+-------------------+------------------------+
|         movie_title|       critic|top_critic|          open2gross|SuccessCT1|Whether_Recommend|Whether_Recommend_Flop|       FlopAffinity|Total_Recommend_byCritic|
+--------------------+-------------+----------+--------------------+----------+-----------------+----------------------+-------------------+------------------------+
|      The Red Turtle|Ashley Moreno|     False|0.022788061268539026|         0|                1|                     1| 0.6666666666666666|                       3|
|My Entire High Sc...|Ashley Moreno|     False| 0.19865569153492155|         0|                1|                     1| 0.6666666666666666|                       3|
|   Support the Girls|Ashley Moreno|     False|   0.396262507357269|         1|                1|                     0| 0.6666666666666666|                       3|
|   

In [0]:
critic_flop_affinity = class_dist.filter(class_dist.Total_Recommend_byCritic > 2).select("critic", "FlopAffinity").distinct()

In [0]:
import pyspark.sql.functions as F

flop_affinity_percentile = critic_flop_affinity.agg(F.expr('percentile(FlopAffinity, array(0.25))')[0].alias('%25'),
                             F.expr('percentile(FlopAffinity, array(0.50))')[0].alias('%50'),
                             F.expr('percentile(FlopAffinity, array(0.75))')[0].alias('%75'))
flop_affinity_percentile.show()

+---+---+----+
|%25|%50| %75|
+---+---+----+
|0.4|0.6|0.75|
+---+---+----+



In [0]:
class_critic.createOrReplaceTempView("t1")
class_group = spark.sql("""
                        SELECT movie_title, critic, top_critic, open2gross, 
                        SuccessCT1, Whether_Recommend, Whether_Recommend_Flop,
                        FlopAffinity,  
                        CASE 
                          WHEN (SUM(Whether_Recommend) OVER (PARTITION BY critic 
                          RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) <= 2 THEN "Other" 
                          WHEN FlopAffinity <= 0.4 THEN "Group1"
                          WHEN FlopAffinity <= 0.6 THEN "Group2"
                          WHEN FlopAffinity <= 0.75 THEN "Group3"
                          ELSE "Group4"
                        END AS Group
                        FROM t1
                        """)
class_group.show(10)

+--------------------+-------------+----------+--------------------+----------+-----------------+----------------------+-------------------+------+
|         movie_title|       critic|top_critic|          open2gross|SuccessCT1|Whether_Recommend|Whether_Recommend_Flop|       FlopAffinity| Group|
+--------------------+-------------+----------+--------------------+----------+-----------------+----------------------+-------------------+------+
|      The Red Turtle|Ashley Moreno|     False|0.022788061268539026|         0|                1|                     1| 0.6666666666666666|Group3|
|My Entire High Sc...|Ashley Moreno|     False| 0.19865569153492155|         0|                1|                     1| 0.6666666666666666|Group3|
|   Support the Girls|Ashley Moreno|     False|   0.396262507357269|         1|                1|                     0| 0.6666666666666666|Group3|
|        Wonder Woman| Cary Darling|      True| 0.25026812605736476|         0|                1|               

Look at the number of critics in each group

In [0]:
class_group.filter(class_group.Group == "Group1").select("critic").distinct().count()

315

In [0]:
class_group.filter(class_group.Group == "Group2").select("critic").distinct().count()

307

In [0]:
class_group.filter(class_group.Group == "Group3").select("critic").distinct().count()

298

In [0]:
class_group.filter(class_group.Group == "Group4").select("critic").distinct().count()

276

In [0]:
class_group.filter(class_group.Group == "Other").select("critic").distinct().count()

983

## Join Critics in Classification Set and Critics in the Prediction Set

In [0]:
class_group.createOrReplaceTempView("class")
pred_set.createOrReplaceTempView("pred")

pred_join_critic = spark.sql("""
                        SELECT DISTINCT pred.movie_title, release_date, pred.critic, pred.top_critic, 
                        pred.SuccessCT1, pred.Whether_Recommend, Group 
                        FROM pred
                        JOIN class ON pred.critic = class.critic
                        """)
pred_join_critic.show(10)

+--------------------+------------+-------------+----------+----------+-----------------+------+
|         movie_title|release_date|       critic|top_critic|SuccessCT1|Whether_Recommend| Group|
+--------------------+------------+-------------+----------+----------+-----------------+------+
|         Being Frank|  2019-06-14|Ashley Moreno|     False|         0|                1|Group3|
|Fast & Furious Pr...|  2019-08-02| Cary Darling|      True|         1|                1|Group2|
|Once Upon a Time....|  2019-07-26| Cary Darling|      True|         0|                1|Group2|
|           Yesterday|  2019-06-28| Cary Darling|      True|         0|                1|Group2|
|          Gemini Man|  2019-10-11| Cary Darling|      True|         1|                0|Group2|
|        Queen & Slim|  2019-11-27| Cary Darling|      True|         0|                0|Group2|
|The Kid Who Would...|  2019-01-25| Cary Darling|      True|         1|                1|Group2|
|         The Kitchen|  2019-0

In [0]:
pred_join_critic.filter(pred_join_critic.Group == "Other").select("critic").distinct().count()

335

In [0]:
pred_join_critic.filter(pred_join_critic.Group == "Group1").select("critic").distinct().count()

242

In [0]:
pred_join_critic.filter(pred_join_critic.Group == "Group2").select("critic").distinct().count()

258

In [0]:
pred_join_critic.filter(pred_join_critic.Group == "Group3").select("critic").distinct().count()

237

In [0]:
pred_join_critic.filter(pred_join_critic.Group == "Group4").select("critic").distinct().count()

210

## Create Regression Tables for 2 models
Model 1 - Not Discriminating the critics

In [0]:
pred_join_critic.createOrReplaceTempView("t1")

pred_reg_model1 = spark.sql("""
                        SELECT movie_title, SuccessCT1, SUM(Whether_Recommend) AS Total_Recommend
                        FROM t1
                        GROUP BY movie_title, SuccessCT1
                        """)
pred_reg_model1.show(10)

+--------------------+----------+---------------+
|         movie_title|SuccessCT1|Total_Recommend|
+--------------------+----------+---------------+
|I Do Not Care If ...|         0|             12|
|               Greta|         1|             43|
|      The Third Wife|         0|              9|
|    The Cat Rescuers|         0|              2|
|          Gemini Man|         1|             27|
|               Buddy|         0|              4|
|             Frankie|         0|             20|
|      A Faithful Man|         0|             15|
|Marianne & Leonar...|         0|             11|
|The Last Black Ma...|         0|             41|
+--------------------+----------+---------------+
only showing top 10 rows



In [0]:
pred_reg_model1.count()

452

Model 2 - Group the Critics

In [0]:
#pivot the table by summing the recommend counts in each critic group
pred_reg_model2 = pred_join_critic.groupby(pred_join_critic.movie_title, 
                                                pred_join_critic.SuccessCT1).pivot("Group").sum("Whether_Recommend")

In [0]:
# replace the null values with 0, as some movies were not reviewed by any customers in one group
pred_reg_model2 = pred_reg_model2.na.fill(0)

In [0]:
pred_reg_model2.count()

452

# Run Logistic Regression Using Spark MLlib

## Model 1

In [0]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

Assembler = VectorAssembler(inputCols = ["Total_Recommend"], 
                                outputCol ='features')
output = Assembler.transform(pred_reg_model1)
finalData = output.select('features','SuccessCT1')
trainData, testData = finalData.randomSplit([0.7, 0.3], seed=1)
lr = LogisticRegression(labelCol='SuccessCT1', featuresCol='features')
# Fit the model
lrModel = lr.fit(trainData)
# predict the model
predict_train=lrModel.transform(trainData)
predict_test=lrModel.transform(testData)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="SuccessCT1")
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

The area under ROC for train set is 0.4634233103159657
The area under ROC for test set is 0.4223856209150327


## Model 2

In [0]:
pred_reg_model2.columns

['movie_title', 'SuccessCT1', 'Group1', 'Group2', 'Group3', 'Group4', 'Other']

In [0]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

Assembler = VectorAssembler(inputCols = ["Group1", "Group2", "Group3", "Group4", "Other"], 
                                outputCol ='features')
output = Assembler.transform(pred_reg_model2)
finalData = output.select('features','SuccessCT1')
trainData, testData = finalData.randomSplit([0.7, 0.3], seed=1)
lr = LogisticRegression(labelCol='SuccessCT1', featuresCol='features')
# Fit the model
lrModel2 = lr.fit(trainData)
# predict the model
predict_train2 = lrModel2.transform(trainData)
predict_test2 = lrModel2.transform(testData)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="SuccessCT1")
# predict_test2.select("SuccessCT1", "rawPrediction", "prediction","probability").show(5)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train2)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test2)))

The area under ROC for train set is 0.7611424984306346
The area under ROC for test set is 0.748059640522876


# Perform Likelihood Ratio Test
Unfortunately, likelihood ratio test is not available in PySpark. Therefore we use the scipy package to redo the two logistic regression models and conduct a likelihood ratio test to see if the new predictors: harbinger groups can statistically significantly increase the prediction power.

In [0]:
y_m1 = pred_reg_model1.select("SuccessCT1").toPandas()
x_m1 = pred_reg_model1.select("Total_Recommend").toPandas()
y_m2 = pred_reg_model2.select("SuccessCT1").toPandas()
x_m2 = pred_reg_model2.select("Group1", "Group2", "Group3", "Group4", "Other").toPandas()

In [0]:
import statsmodels.api as sm
m1 = sm.Logit(y_m1, x_m1)
m1_result = m1.fit()
m1_llf = m1_result.llf # generate the log likelihood statistic of null model
m2 = sm.Logit(y_m2, x_m2)
m2_result = m2.fit()
m2_llf = m2_result.llf # generate the log likelihood statistic of alternative model

  import pandas.util.testing as tm


Optimization terminated successfully.
         Current function value: 0.691974
         Iterations 3
Optimization terminated successfully.
         Current function value: 0.587061
         Iterations 6


In [0]:
from scipy import stats
stats.chisqprob = lambda chisq, df: stats.chi2.sf(chisq, df)
def lrtest(llmin, llmax):
    lr = 2 * (llmax - llmin)
    p = stats.chisqprob(lr, 4) # llmax has 4 dof more than llmin
    return lr, p

In [0]:
lr, p = lrtest(m1_llf, m2_llf)
print('LR test, p value: {:.2f}, {:.4f}'.format(lr, p))

LR test, p value: 94.84, 0.0000


In [0]:
m1_result.summary()

0,1,2,3
Dep. Variable:,SuccessCT1,No. Observations:,452.0
Model:,Logit,Df Residuals:,451.0
Method:,MLE,Df Model:,0.0
Date:,"Wed, 03 Jun 2020",Pseudo R-squ.:,-0.005836
Time:,02:37:00,Log-Likelihood:,-312.77
converged:,True,LL-Null:,-310.96
Covariance Type:,nonrobust,LLR p-value:,

0,1,2,3,4,5,6
,coef,std err,z,P>|z|,[0.025,0.975]
Total_Recommend,-0.0026,0.003,-1.024,0.306,-0.008,0.002


In [0]:
m2_result.summary()

0,1,2,3
Dep. Variable:,SuccessCT1,No. Observations:,452.0
Model:,Logit,Df Residuals:,447.0
Method:,MLE,Df Model:,4.0
Date:,"Wed, 03 Jun 2020",Pseudo R-squ.:,0.1467
Time:,02:37:00,Log-Likelihood:,-265.35
converged:,True,LL-Null:,-310.96
Covariance Type:,nonrobust,LLR p-value:,7.278e-19

0,1,2,3,4,5,6
,coef,std err,z,P>|z|,[0.025,0.975]
Group1,0.0928,0.047,1.977,0.048,0.001,0.185
Group2,0.0816,0.039,2.079,0.038,0.005,0.159
Group3,-0.0758,0.043,-1.761,0.078,-0.160,0.009
Group4,-0.2298,0.054,-4.256,0.000,-0.336,-0.124
Other,-0.0130,0.075,-0.172,0.863,-0.161,0.135
