In [636]:
from pyspark.sql.types import StringType, IntegerType, DoubleType, StructField, StructType,Row,DateType, LongType
from pyspark.sql.functions import udf, col
from StringIO import StringIO
import csv
import json
import ast
from datetime import datetime

#### Parse record with both json and normal fields

In [637]:
def parseRecord(data, schema):
    rec = StringIO(data.encode('utf-8').strip())
    reader = csv.reader(rec,delimiter=',', quotechar='"')
    row = reader.next()
    rdict = {}
    size = len(schema)
    for i,field in enumerate(row):
        if i<size:
            rdict[schema[i]] = field
    if i<size-1:
        for ind in range(i+1,size):
            print ind
            rdict[schema[ind]]='0'
    return Row(**rdict)
    

#### UDFs

In [638]:
def getNames(vec):
    theList = ast.literal_eval(vec.strip())
    res=''
    for aDict in theList:
        res+=aDict['name']+"|"
    return res[:-1]

def getDate(string):
    try:
        dt = datetime.strptime(string, '%Y-%m-%d')
    except ValueError:
        dt = datetime.strptime('1900-01-01', '%Y-%m-%d')

def getActor(jsonData,index):
    res = ""
    theList = ast.literal_eval(jsonData)
    for k in theList:
        if k['order'] == index:
            res = k['name']
            return res
    return "unknown"

def makeGetActor(num):
    return udf(lambda x: getActor(x,num))

def getDirector(stringdata):
    data = ast.literal_eval(stringdata)
    directors = [crewMember['name'] for crewMember in data if crewMember['job'] == 'Director']
    if directors:
        return directors[0]
    else:
        return "unknown"


#### Register udfs

In [639]:
getNames_udf = udf(getNames)
getDate_udf = udf(getDate,DateType())
getDirector_udf = udf(getDirector)

#### Preprocess movies

In [640]:
movieRDD = sc.textFile('tmdb_5000_movies.csv')
columns = movieRDD.take(1)[0].split(",")
movieDF = movieRDD.filter(lambda x:x.startswith('budget')==False).map(lambda x:parseRecord(x,columns)).toDF()
movieDF = movieDF.withColumn('genre_string', getNames_udf(movieDF.genres)).withColumn('keywords_string', getNames_udf(movieDF.keywords))
movieDF = movieDF.withColumn('prd_companies',getNames_udf(movieDF.production_companies))
movieDF = movieDF.withColumn('releasedate',getDate_udf(movieDF.release_date)).withColumn('languages',getNames_udf(movieDF.spoken_languages))

In [728]:
newSchema = StructType(
[
 StructField('budget',LongType(),True), StructField('id',IntegerType(),True), StructField('original_language',StringType(),True), 
 StructField('original_title', StringType(),True), StructField('popularity', DoubleType(),True), StructField('revenue',DoubleType(),True), 
 StructField('runtime', IntegerType(), True),StructField('status', StringType(),True), 
 StructField('vote_average',DoubleType(), True),StructField('vote_count',IntegerType(),True),StructField('genre_string', StringType(),True), 
 StructField('keywords_string',StringType(),True),StructField('prd_companies',StringType(),True),StructField('releasedate',DateType(),True),
 StructField('languages',StringType(),True)]
)

In [729]:
movies = movieDF.select([movieDF['budget'].cast(LongType()), movieDF['id'].cast(IntegerType()) , 
                         'original_language', 'original_title', movieDF['popularity'].cast(DoubleType()) , 
                         movieDF['revenue'].cast(DoubleType()), movieDF['runtime'].cast(IntegerType()), 
                         'status', movieDF['vote_average'].cast(DoubleType()), movieDF['vote_count'].cast(IntegerType()), 
                         'genre_string', 'keywords_string', 'prd_companies', 'releasedate', 
                         'languages']).rdd.toDF(schema=newSchema)

#### Preprocess credits

In [730]:
creditsRDD = sc.textFile('tmdb_5000_credits.csv')
columns = creditsRDD.take(1)[0].split(",")
creditsDF = creditsRDD.filter(lambda x : x.startswith('movie_id')==False).map(lambda x: parseRecord(x,columns)).toDF()
creditsDF = creditsDF.withColumn('actor1', makeGetActor(0)(creditsDF.cast)).withColumn('actor2', makeGetActor(1)(creditsDF.cast))
creditsDF = creditsDF.withColumn('actor3', makeGetActor(2)(creditsDF.cast)).withColumn('director', getDirector_udf(creditsDF.crew))

In [731]:
movieCredits = creditsDF.select(creditsDF.movie_id.cast(IntegerType()), creditsDF.title,creditsDF.director,
                               creditsDF.actor1, creditsDF.actor2,creditsDF.actor3)

#### Join movies and movieCredits on movieID to get director and actor names

In [732]:
combined = movies.join(movieCredits, movies.id == movieCredits.movie_id)

In [733]:
from pyspark.ml.feature import StringIndexer,VectorAssembler,StandardScaler

In [734]:
strcols = [ k for k,v in combined.dtypes if v == 'string']

for col in strcols:
    indexer = StringIndexer(inputCol=col, outputCol=col+"_val")
    combined = indexer.fit(combined).transform(combined)
combined = combined.na.fill(0)
numerics = [k for k,v in combined.dtypes if v not in ['string', 'date']]
numerics.remove('revenue')
assembler = VectorAssembler(inputCols=numerics, outputCol='features')
final_data = assembler.transform(combined).select('features','revenue')
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
final_data = scaler.fit(final_data).transform(final_data)

In [745]:
from pyspark.ml.regression import RandomForestRegressor

In [746]:
rfc = RandomForestRegressor(labelCol='revenue', numTrees=150, featuresCol='scaledFeatures')
train_data,test_data = final_data.randomSplit([0.7,0.3])
rfcModel = rfc.fit(train_data)
rfcpreds = rfcModel.transform(test_data)

In [750]:
rfcpreds.show()

+--------------------+-------------+--------------------+--------------------+
|            features|      revenue|      scaledFeatures|          prediction|
+--------------------+-------------+--------------------+--------------------+
|[0.0,1591.0,3.481...|          0.0|[0.0,0.0179379550...|1.0006087427440144E7|
|[0.0,63574.0,5.75...|          0.0|[0.0,0.7167740757...|   5843089.128004328|
|[0.0,125052.0,0.3...|          0.0|[0.0,1.4099165024...|   5512645.393062637|
|[4.0,68202.0,0.28...|          0.0|[9.82260588432448...|    4036847.31713399|
|[2000000.0,92182....|  1.3101672E7|[0.04911302942162...|1.1125657196603218E7|
|[6500000.0,11033....|     3.1899E7|[0.15961734562027...|1.6692682731684936E7|
|[2.4E7,4935.0,49....| 2.34710455E8|[0.58935635305946...|1.3325200120975776E8|
|[2.8E7,11141.0,6....|  3.0016165E7|[0.68758241190271...|2.6272622002093524E7|
|[4.7E7,8592.0,7.8...| 1.03738726E8|[1.15415619140812...| 4.310695453091238E7|
|[2.8E8,99861.0,13...|1.405403694E9|[6.8758241190271

In [757]:
rfcModel.featureImportances

SparseVector(19, {0: 0.2859, 1: 0.0153, 2: 0.1695, 3: 0.0355, 4: 0.0157, 5: 0.3477, 6: 0.0104, 7: 0.0001, 8: 0.0133, 10: 0.0085, 11: 0.0085, 12: 0.0227, 13: 0.0079, 14: 0.0123, 15: 0.0168, 16: 0.0117, 17: 0.0091, 18: 0.0091})