In [3]:
import pandas as pd
import numpy as np
from pprint import pprint
import pyspark

In [30]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [31]:
# reading in data
# data = spark.read.format("csv").option("header", "true").load("/FileStore/tables/enxedclg1508119193983/movies729.csv")
data = spark.read.format("csv").option("header", "true").load("movies729.csv")

In [32]:
# take a look at our data in dataframe form
# function to select a few rows of data, convert to a Pandas dataframe, and transpose
def preview(df, n=5):
    return pd.DataFrame(df.take(n), columns=df.columns)

preview(data)

Unnamed: 0,title,release_date,mpaa,runtime,rating,genres,number_reviews,production_budget,opening_weekend,gross,fb_likes
0,Star Wars: The Force Awakens,18-Dec-15,PG-13,136,8.1,"Action,Adventure,Fantasy,Sci-Fi",670255,245000000,247966675,936627416,267000
1,The Wedding Ringer,16-Jan-15,R,101,6.7,Comedy,59500,23000000,24500000,64460211,74000
2,The Visit,11-Sep-15,PG-13,94,6.2,"Horror,Thriller",83345,5000000,25427560,65069140,29000
3,Magic Mike XXL,1-Jul-15,R,115,5.7,"Comedy,Drama,Music",42965,14800000,11600000,66009973,41000
4,War Room,28-Aug-15,PG,120,6.4,Drama,10010,3000000,11351389,67790117,30000


In [33]:
# practice using sql queries in Spark
# some EDA with mpaa counts

data.registerTempTable('data')
mpaa_count = spark.sql(r"""SELECT mpaa, COUNT(title) AS count
                      FROM data
                      GROUP BY mpaa
                      ORDER BY count DESC""")
mpaa_count.show()

+---------+-----+
|     mpaa|count|
+---------+-----+
|        R|  306|
|    PG-13|  278|
|       PG|  103|
|Not Rated|   23|
|        G|   10|
|  Unrated|    8|
|    NC-17|    1|
+---------+-----+



### Preprocessing
1. Drop columns that are not eventually going into the predictive model
2. Cast the columns into the appropriate data type
3. Vectorize our features
4. Transform and scale our features and target

In [34]:
from pyspark.sql.functions import udf

# columns not used in analysis
for field in ['title', 'release_date', 'mpaa', 'runtime', 'genres','number_reviews']:
    data = data.drop(field)

# convert numerical value strings to double    
for column in data.schema.names:
    data = data.withColumn(column, data[column].cast('double'))

In [35]:
# train test split
train, test = data.randomSplit([0.7, 0.3], seed=5555)

In Spark, before running the model, we need to combine the features into one vector. This can be done using the VectorAssembler

In [36]:
from pyspark.ml.feature import VectorAssembler

# the feature columns
features = [f for f in data.schema.names if f != 'gross']

assembler = VectorAssembler(inputCols=features, outputCol='features')

train_pack = assembler.transform(train)
test_pack = assembler.transform(test)

# remove the columns packed into the feature vector
for field in features:
    train_pack = train_pack.drop(field)
    test_pack  = test_pack.drop(field)

# see what's returned
train_pack.show(10)

+------------+--------------------+
|       gross|            features|
+------------+--------------------+
|   9353573.0|[2.6,3.0E7,636887...|
| 7.4158157E7|[3.4,7.9E7,2.5003...|
| 2.5615792E7|[3.5,2.5E7,1.2622...|
|   8742261.0|[3.7,1.8E7,472111...|
|      3506.0|[3.9,650000.0,70....|
|     14686.0|[4.1,1000000.0,86...|
|   4930798.0|[4.1,1.7E7,295500...|
|1.66147885E8|[4.1,4.0E7,9.4395...|
| 7.9566871E7|[4.3,3.0E7,3.2324...|
| 9.5328937E7|[4.3,1.0E8,937255...|
+------------+--------------------+
only showing top 10 rows



standardizing our features so that they have 0 mean and unit variance

In [37]:
from pyspark.ml.feature import StandardScaler

In [38]:
# standardizing features

standardscale = StandardScaler(withMean=True, withStd=True, inputCol='features', outputCol='features_scaled')
standardscale = standardscale.fit(train_pack)

train_pack = standardscale.transform(train_pack)
train_pack = train_pack.drop('features')
test_pack  = standardscale.transform(test_pack)
test_pack = test_pack.drop('features')

train_pack.show(10)

+------------+--------------------+
|       gross|     features_scaled|
+------------+--------------------+
|   9353573.0|[-4.3720155579867...|
| 7.4158157E7|[-3.4773744484751...|
| 2.5615792E7|[-3.3655443097862...|
|   8742261.0|[-3.1418840324083...|
|      3506.0|[-2.9182237550304...|
|     14686.0|[-2.6945634776525...|
|   4930798.0|[-2.6945634776525...|
|1.66147885E8|[-2.6945634776525...|
| 7.9566871E7|[-2.4709032002747...|
| 9.5328937E7|[-2.4709032002747...|
+------------+--------------------+
only showing top 10 rows



I am also going to add two other columns to my Spark dataframe: 
1. the log transformed gross 
2. standardized gross 

In [42]:
from pyspark.sql.functions import log, mean, stddev

In [43]:
def standardize(df, column):
  average = df.agg(mean(df[column]).alias("mean")).collect()[0]["mean"]
  std = df.agg(stddev(df[column]).alias("std")).collect()[0]["std"]
  return (df[column] - average) / std

In [44]:
# log transforming target
train_pack = train_pack.withColumn('log_gross', log(train_pack.gross))
test_pack = test_pack.withColumn('log_gross', log(test_pack.gross))


# standardizing target
# dropping original gross column
train_pack = train_pack.withColumn('gross_scaled', standardize(train_pack,'gross'))
train_pack = train_pack.drop('gross')

test_pack = test_pack.withColumn('gross_scaled', standardize(test_pack,'gross'))
test_pack = test_pack.drop('gross')


train_pack.show(10)

+--------------------+------------------+--------------------+
|     features_scaled|         log_gross|        gross_scaled|
+--------------------+------------------+--------------------+
|[-4.3720155579867...|16.051268967305777| -0.6324265720878373|
|[-3.4773744484751...|18.121710627241004| 0.09077202318335206|
|[-3.3655443097862...| 17.05871959426062|-0.45094536796819584|
|[-3.1418840324083...| 15.98367940982794| -0.6392486200416119|
|[-2.9182237550304...|  8.16223106548118| -0.7367703530785693|
|[-2.6945634776525...| 9.594649938011555| -0.7366455878259272|
|[-2.6945634776525...|15.411011399045726| -0.6817833377635575|
|[-2.6945634776525...|18.928388823222527|  1.1173481997849761|
|[-2.4709032002747...|18.192108370714735| 0.15113155526558278|
|[-2.4709032002747...|18.372843963668053|  0.3270312106012574|
+--------------------+------------------+--------------------+
only showing top 10 rows



### Modeling.

I will build a predicitive model of gross revenue using linear regression.

I will try two models and see which one predicts better:
1. log_gross as the response variable
2. gross_scaled as a the response variable

In [52]:
from pyspark.ml.regression import LinearRegression

In [51]:
lr = LinearRegression(labelCol='log_gross', 
                        featuresCol='features_scaled',
                        predictionCol='prediction')

lr_model = lr.fit(train_pack)
# transform ~ predict
lr_pred = lr_model.transform(test_pack)

In [49]:
print('R2:',lr_model.summary.r2)
print('MSE:',lr_model.summary.meanSquaredError)
print('\n')
print('Estimated Coefficeint:')
pprint(list(zip(data.schema.names, lr_model.coefficients)))

R2: 0.3350991076008999
MSE: 4.991448705041381


Estimated Coefficeint:
[('rating', 0.1771739881851411),
 ('production_budget', 0.87076304274451455),
 ('opening_weekend', 0.68590296489508062),
 ('gross', 0.37035362207759798)]


The model isn't really predicitive at all. Let's try again with the standardized gross value

In [53]:
lr2 = LinearRegression(labelCol='gross_scaled', 
                        featuresCol='features_scaled',
                        predictionCol='prediction')

lr2_model = lr2.fit(train_pack)
# transform ~ predict
lr2_pred = lr2_model.transform(test_pack)


In [54]:
print('R2:',lr2_model.summary.r2)
print('MSE:',lr2_model.summary.meanSquaredError)
print('\n')
print('Estimated Coefficeint:')
pprint(list(zip(data.schema.names, lr2_model.coefficients)))

R2: 0.7564348554482099
MSE: 0.24309220252353414


Estimated Coefficeint:
[('rating', 0.11788177782631756),
 ('production_budget', 0.2570232449241947),
 ('opening_weekend', 0.65400579259557645),
 ('gross', 0.043520333116376868)]


Much better results!