In [110]:
%load_ext autoreload
%autoreload 2
!pip install plotly

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
[33mYou are using pip version 9.0.3, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [111]:
####-----------------Common Stuff-------------------####

In [112]:
# Common
# Import necessary libraries and packages

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import pandas as pd
import matplotlib.mlab as mlab
import numpy as np
import plotly.plotly as py
import plotly.figure_factory as ff
import plotly
import plotly.graph_objs as go
plotly.tools.set_credentials_file(username='pbd', api_key='nrkkjdZy9uNG8DaoOM2l')

%matplotlib inline

In [113]:
from plotly import __version__
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
# requires version >= 1.9.0
print (__version__)
init_notebook_mode(connected=True)

2.5.1


In [114]:
## Common
## Perform pre processing on data i.e. StringIndexing -> OneHotEncoding -> VectorAssembler 

categorical_columns= ['directorName', 'writeName', 'genre', 'runtime', 'year']

### sringIndexer

indexers = [
            StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
            for c in categorical_columns
]

### oneHotEncoder

encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
            for indexer in indexers
]

### vectorAssembler

assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

In [115]:
####-----------------Common Stuff-------------------####

In [116]:
####-----------------Start Training-------------------####

In [117]:
##Training
## Load training data with header

data = spark.read.options(header='true', inferschema='true', delimiter=',').csv("2000-2018.csv")  
display(data)
data.cache()
data.show(5)
data.printSchema()




DataFrame[title: string, year: int, avgRating: double, votes: int, directorName: string, writeName: string, runtime: int, genre: string]

+-------------+----+---------+-----+--------------+-----------------+-------+--------------------+
|        title|year|avgRating|votes|  directorName|        writeName|runtime|               genre|
+-------------+----+---------+-----+--------------+-----------------+-------+--------------------+
| Isle of Dogs|2018|      8.2|18140|  Wes Anderson|    Roman Coppola|    101|Adventure,Animati...|
| Isle of Dogs|2018|      8.2|18140|  Wes Anderson|   Kunichi Nomura|    101|Adventure,Animati...|
| Isle of Dogs|2018|      8.2|18140|  Wes Anderson|     Wes Anderson|    101|Adventure,Animati...|
| Isle of Dogs|2018|      8.2|18140|  Wes Anderson|Jason Schwartzman|    101|Adventure,Animati...|
|A Quiet Place|2018|      8.1|68050|John Krasinski|      Bryan Woods|     90| Drama,Horror,Sci-Fi|
+-------------+----+---------+-----+--------------+-----------------+-------+--------------------+
only showing top 5 rows

root
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- 

In [118]:
## Training

## Feed the training data to the pipeline i.e. VectorAssembler -> Pipeline

pipeline = Pipeline(stages=indexers + encoders+[assembler])
model=pipeline.fit(data)
transformed = model.transform(data)

#transformed.orderBy('votes').show(5)
#titles_df = transformed.select("title")
#tr = transformed.select("title", "votes")
transformed.printSchema()

root
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- avgRating: double (nullable = true)
 |-- votes: integer (nullable = true)
 |-- directorName: string (nullable = true)
 |-- writeName: string (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- directorName_indexed: double (nullable = false)
 |-- writeName_indexed: double (nullable = false)
 |-- genre_indexed: double (nullable = false)
 |-- runtime_indexed: double (nullable = false)
 |-- year_indexed: double (nullable = false)
 |-- directorName_indexed_encoded: vector (nullable = true)
 |-- writeName_indexed_encoded: vector (nullable = true)
 |-- genre_indexed_encoded: vector (nullable = true)
 |-- runtime_indexed_encoded: vector (nullable = true)
 |-- year_indexed_encoded: vector (nullable = true)
 |-- features: vector (nullable = true)



In [119]:
## Training
## Select required columns from the resulting dataframe (training data)

final = transformed.select("features", "avgRating")
final.printSchema()
final.show()

root
 |-- features: vector (nullable = true)
 |-- avgRating: double (nullable = true)

+--------------------+---------+
|            features|avgRating|
+--------------------+---------+
|(9275,[140,3317,8...|      8.2|
|(9275,[140,6879,8...|      8.2|
|(9275,[140,2696,8...|      8.2|
|(9275,[140,4528,8...|      8.2|
|(9275,[1478,6743,...|      8.1|
|(9275,[1478,8394,...|      8.1|
|(9275,[1478,4899,...|      8.1|
|(9275,[930,7441,8...|      8.1|
|(9275,[930,8139,8...|      8.1|
|(9275,[930,7093,8...|      8.1|
|(9275,[2074,2588,...|      7.9|
|(9275,[16,4695,87...|      7.8|
|(9275,[16,2678,87...|      7.8|
|(9275,[522,2431,8...|      7.7|
|(9275,[522,2445,8...|      7.7|
|(9275,[522,7165,8...|      7.7|
|(9275,[522,3746,8...|      7.7|
|(9275,[954,3271,8...|      7.4|
|(9275,[1021,3271,...|      7.4|
|(9275,[1182,7189,...|      7.2|
+--------------------+---------+
only showing top 20 rows



In [120]:
##Training
# To train the model
## Randomly split the data set for testing and training

lr_train,lr_test = final.randomSplit([0.8, 0.2])

## Create a Linear Regression Model with required parameters

lr = LinearRegression(labelCol="avgRating",
                      fitIntercept=True, 
                      maxIter=100, 
                      regParam=0.02, 
                      elasticNetParam=0.02)

## fit the Model to the training dataset

lrModel = lr.fit(lr_train)


In [121]:
##Training
### Collect information about the trained model
## Training summary 

trainingSummary = lrModel.summary

print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

print("Intercept: %s" % str(lrModel.intercept))
print("numIterations: %d" % trainingSummary.totalIterations)

#print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
#print("Coefficients: %s" % str(lrModel.coefficients))
trainingSummary.predictions.printSchema()
trainingSummary.predictions.show()
training_residuals = trainingSummary.residuals


RMSE: 0.232170
r2: 0.953972
Intercept: 6.55913030077
numIterations: 101
root
 |-- features: vector (nullable = true)
 |-- avgRating: double (nullable = true)
 |-- prediction: double (nullable = false)

+--------------------+---------+------------------+
|            features|avgRating|        prediction|
+--------------------+---------+------------------+
|(9275,[0,2821,877...|      6.5| 6.453075406425931|
|(9275,[0,2821,881...|      7.0| 7.015970245970519|
|(9275,[0,2821,881...|      7.1| 7.036552340206393|
|(9275,[0,3127,877...|      6.5|    6.393244905562|
|(9275,[0,3127,881...|      7.0|6.9561397451065865|
|(9275,[0,3127,881...|      7.1| 6.976721839342462|
|(9275,[0,3127,881...|      6.6| 6.787823977078457|
|(9275,[0,3201,877...|      6.5|6.4585175965392185|
|(9275,[0,3201,881...|      7.0| 7.021412436083805|
|(9275,[0,3201,881...|      7.1|7.0419945303196805|
|(9275,[0,3323,881...|      7.0| 6.952174996852843|
|(9275,[0,3323,881...|      7.1|6.9727570910887176|
|(9275,[0,3323,881

In [122]:
##Training
## Model Evaluation

prediction_and_labels = lrModel.evaluate(final)

print("RMSE: %f" % prediction_and_labels.rootMeanSquaredError)
print("r2: %f" % prediction_and_labels.r2)

prediction_and_labels.predictions.show(5000)

RMSE: 0.332595
r2: 0.904892
+--------------------+---------+------------------+
|            features|avgRating|        prediction|
+--------------------+---------+------------------+
|(9275,[140,3317,8...|      8.2| 7.811530483192627|
|(9275,[140,6879,8...|      8.2| 8.148927978437355|
|(9275,[140,2696,8...|      8.2| 7.846447484070745|
|(9275,[140,4528,8...|      8.2| 7.698443734401689|
|(9275,[1478,6743,...|      8.1| 8.046913152139265|
|(9275,[1478,8394,...|      8.1| 8.046913152139265|
|(9275,[1478,4899,...|      8.1| 7.198633808646184|
|(9275,[930,7441,8...|      8.1| 8.051623738451998|
|(9275,[930,8139,8...|      8.1| 8.051623738451998|
|(9275,[930,7093,8...|      8.1| 8.051623738451998|
|(9275,[2074,2588,...|      7.9| 7.826483457074563|
|(9275,[16,4695,87...|      7.8|  7.06947841044505|
|(9275,[16,2678,87...|      7.8| 7.371625597687209|
|(9275,[522,2431,8...|      7.7| 7.677566291097426|
|(9275,[522,2445,8...|      7.7| 7.678846979722312|
|(9275,[522,7165,8...|      7.7| 7.6

In [123]:
##Training
## Training and Testing Residuals

training_residuals = trainingSummary.residuals
training_residuals.toDF('residuals')
train_resi = training_residuals.write.csv("train.csv")

prediction_residuals = prediction_and_labels.residuals
prediction_residuals.toDF('residuals')
predict_resi = prediction_residuals.write.csv("predict.csv")

In [124]:
####-----------------Training Done-------------------####

In [125]:
## Bridge
## Save the trained Model then load it

lrModel.save("pythonLinearRegression")
Linear_Regression_model = LinearRegressionModel.load("pythonLinearRegression/")

In [126]:
####-----------------Start Prediction-------------------####

In [127]:
## Prediction

## Load new data to be predicted by our model

new_data = spark.read.options(header='true', inferschema='true', delimiter=',').csv("2000-2018.csv")  
display(new_data)
new_data.cache()
new_data.show(500)
new_data.printSchema()

DataFrame[title: string, year: int, avgRating: double, votes: int, directorName: string, writeName: string, runtime: int, genre: string]

+--------------------+----+---------+------+--------------------+--------------------+-------+--------------------+
|               title|year|avgRating| votes|        directorName|           writeName|runtime|               genre|
+--------------------+----+---------+------+--------------------+--------------------+-------+--------------------+
|        Isle of Dogs|2018|      8.2| 18140|        Wes Anderson|       Roman Coppola|    101|Adventure,Animati...|
|        Isle of Dogs|2018|      8.2| 18140|        Wes Anderson|      Kunichi Nomura|    101|Adventure,Animati...|
|        Isle of Dogs|2018|      8.2| 18140|        Wes Anderson|        Wes Anderson|    101|Adventure,Animati...|
|        Isle of Dogs|2018|      8.2| 18140|        Wes Anderson|   Jason Schwartzman|    101|Adventure,Animati...|
|       A Quiet Place|2018|      8.1| 68050|      John Krasinski|         Bryan Woods|     90| Drama,Horror,Sci-Fi|
|       A Quiet Place|2018|      8.1| 68050|      John Krasinski|       

In [128]:
## Prediction

## Feed the New data to the pipeline i.e. VectorAssembler -> Pipeline
pipeline = Pipeline(stages=indexers + encoders+[assembler])
model_new=pipeline.fit(new_data)
transformed_new = model_new.transform(new_data)

#transformed_new.orderBy('votes').show(5)
#titles_df_new = transformed_new.select("title")
#tr_new = transformed_new.select("title", "votes")
transformed_new.printSchema()


root
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- avgRating: double (nullable = true)
 |-- votes: integer (nullable = true)
 |-- directorName: string (nullable = true)
 |-- writeName: string (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- directorName_indexed: double (nullable = false)
 |-- writeName_indexed: double (nullable = false)
 |-- genre_indexed: double (nullable = false)
 |-- runtime_indexed: double (nullable = false)
 |-- year_indexed: double (nullable = false)
 |-- directorName_indexed_encoded: vector (nullable = true)
 |-- writeName_indexed_encoded: vector (nullable = true)
 |-- genre_indexed_encoded: vector (nullable = true)
 |-- runtime_indexed_encoded: vector (nullable = true)
 |-- year_indexed_encoded: vector (nullable = true)
 |-- features: vector (nullable = true)



In [129]:
## Prediction
## Select required columns from the resulting dataframe (New data)

final_new = transformed_new.select("features", "avgRating")
final_new.printSchema()
final_new.show()

root
 |-- features: vector (nullable = true)
 |-- avgRating: double (nullable = true)

+--------------------+---------+
|            features|avgRating|
+--------------------+---------+
|(9275,[140,3317,8...|      8.2|
|(9275,[140,6879,8...|      8.2|
|(9275,[140,2696,8...|      8.2|
|(9275,[140,4528,8...|      8.2|
|(9275,[1478,6743,...|      8.1|
|(9275,[1478,8394,...|      8.1|
|(9275,[1478,4899,...|      8.1|
|(9275,[930,7441,8...|      8.1|
|(9275,[930,8139,8...|      8.1|
|(9275,[930,7093,8...|      8.1|
|(9275,[2074,2588,...|      7.9|
|(9275,[16,4695,87...|      7.8|
|(9275,[16,2678,87...|      7.8|
|(9275,[522,2431,8...|      7.7|
|(9275,[522,2445,8...|      7.7|
|(9275,[522,7165,8...|      7.7|
|(9275,[522,3746,8...|      7.7|
|(9275,[954,3271,8...|      7.4|
|(9275,[1021,3271,...|      7.4|
|(9275,[1182,7189,...|      7.2|
+--------------------+---------+
only showing top 20 rows



In [130]:
## Prediction
## Use the loaded model to predict new data

new_data_predict = Linear_Regression_model.transform(lr_test)
new_data_predict.printSchema()

new_data_predict.show()

root
 |-- features: vector (nullable = true)
 |-- avgRating: double (nullable = true)
 |-- prediction: double (nullable = false)

+--------------------+---------+------------------+
|            features|avgRating|        prediction|
+--------------------+---------+------------------+
|(9275,[0,2821,881...|      6.6| 6.847654477942388|
|(9275,[0,3201,881...|      6.6|6.8530966680556755|
|(9275,[0,3426,881...|      7.1| 6.935759534406529|
|(9275,[0,3437,881...|      6.6| 6.840027830502715|
|(9275,[0,3614,881...|      7.1| 6.977588911570789|
|(9275,[0,3614,881...|      6.6|6.7886910493067845|
|(9275,[0,3615,881...|      6.6| 6.840027830502715|
|(9275,[0,3747,881...|      7.1| 6.935759534406529|
|(9275,[0,4053,881...|      7.1| 6.834315675818278|
|(9275,[0,4867,881...|      7.0| 6.952174996852843|
|(9275,[0,4867,881...|      7.1|6.9727570910887176|
|(9275,[0,6549,881...|      7.0| 6.952174996852843|
|(9275,[0,8104,881...|      7.1|6.9727570910887176|
|(9275,[0,8446,881...|      7.0| 6.952

In [131]:
####-----------------Prediction Done-------------------####

In [132]:
## Extract relevant information for visualization

actual_and_predicted = new_data_predict.select("avgRating", "prediction")


In [133]:
## Store the predicted values

actual_and_predicted.write.csv("out1.csv")

In [134]:
## Load the stored csv file for visulization

out = spark.read.csv("out1.csv").withColumnRenamed('_c0','label').withColumnRenamed('_c1','prediction')
#out.orderBy('_c2').show(3)
out.show()
out.printSchema()

+-----+------------------+
|label|        prediction|
+-----+------------------+
|  6.6| 6.847654477942388|
|  6.6|6.8530966680556755|
|  7.1| 6.935759534406529|
|  6.6| 6.840027830502715|
|  7.1| 6.977588911570789|
|  6.6|6.7886910493067845|
|  6.6| 6.840027830502715|
|  7.1| 6.935759534406529|
|  7.1| 6.834315675818278|
|  7.0| 6.952174996852843|
|  7.1|6.9727570910887176|
|  7.0| 6.952174996852843|
|  7.1|6.9727570910887176|
|  7.0| 6.952174996852843|
|  7.6|7.5168330727154595|
|  7.8| 7.297435468107926|
|  7.5| 6.964382963616419|
|  8.1| 7.593668035721258|
|  7.5| 6.935281092277366|
|  7.3|  7.31871744693613|
+-----+------------------+
only showing top 20 rows

root
 |-- label: string (nullable = true)
 |-- prediction: string (nullable = true)



In [135]:
##Convert the dataframe to numpy array

#x = np.array(out.select('label', 'prediction').collect())

In [136]:
## Take first few elements of the array
#x = x[1:500]
#display(x)
#print x.dtype
## Cast x to float to properly plot
#y = x.astype(float)
#display(y)

In [137]:
## Plot avgRatings vs Predictions

#plt.plot(y, '.')
#plt.axis([2000, 2016, 0, 10])
#plt.show()

In [138]:
## Plot residuals for training and testing data

#t_r = np.array(training_residuals.select('residuals').collect())
#plt.plot(t_r, '.')

#p_r = np.array(prediction_residuals.select('residuals').collect())
#plt.plot(p_r, '.')

In [139]:
df = pd.read_csv("out.csv")
df1 = df.iloc[500:540,:]

In [140]:
table = ff.create_table(df1)
py.iplot(table, filename='table1')

In [141]:
predited_rating = Bar(y=df1.Prediction,
                  name='Predicted Rating',
                  marker=dict(color='#206ec1'))

actual_rating = Bar(y=df1.Label,
                name='Actual Rating',
                marker=dict(color='#8db72c'))

data = [predited_rating, actual_rating]

layout = Layout(title="Actual and Predicted Ratings",
                xaxis=dict(title='Movies'),
                yaxis=dict(title='Ratings (1 - 10)'))
fig = Figure(data=data, layout=layout)

py.iplot(fig, filename='styled_bar')

In [142]:
trace_actual = go.Scatter(
    y = df.Label,
    name='Actual Rating',
    mode = 'markers',
    marker=dict(color='#8db72c'))


trace_predicted = go.Scatter(
    y = df.Prediction,
    name='Predicted Rating',
    mode = 'line',
    marker=dict(color='#206ec1'))

layout = Layout(title="Scatter Plot",
                xaxis=dict(title='Movies'),
                yaxis=dict(title='Ratings (1 - 10)'))

fig = Figure(data=data, layout=layout)

data = [trace_actual, trace_predicted]
py.iplot(data, filename='basic-scatter')

In [143]:
residual_df = pd.read_csv("train1.csv")

In [144]:
trace_actual = go.Scatter(
    y = residual_df.Residual,
    name='Actual Rating',
    mode = 'line',
    marker=dict(color='#8db72c'))


layout = Layout(title="Scatter Plot",
                xaxis=dict(title='Movies'),
                yaxis=dict(title='Ratings (1 - 10)'))

fig = Figure(data=data, layout=layout)

data = [trace_actual]
py.iplot(data, filename='basic-scatter')

In [145]:
residual_df = pd.read_csv("predict1.csv")

In [146]:
trace_actual = go.Scatter(
    y = residual_df.Residual,
    name='Actual Rating',
    mode = 'line',
    marker=dict(color='#206ec1'))


layout = Layout(title="Scatter Plot",
                xaxis=dict(title='Movies'),
                yaxis=dict(title='Ratings (1 - 10)'))

fig = Figure(data=data, layout=layout)

data = [trace_actual]
py.iplot(data, filename='basic-scatter')