In [1]:
import pyspark

In [2]:
from pyspark import SparkContext
from pyspark import SQLContext
import pandas as pd

In [3]:
#start up spark and sql context for dataframes!
sc = SparkContext('local','MovieSpark')
sqlContext = SQLContext(sc)


In [4]:
my_bucket = 'rogers-bucket-test-814'
my_key = 'movie-corpus-CSV/movie_characters_metadata.csv'
my_key2 = 'movie-corpus-CSV/movie_titles_metadata.csv'

from pyspark.sql.types import *
from s3fs.core import S3FileSystem

#this is a bit convoluted b/c we are using s3 but...
#first we read the data into pandas

s3 = S3FileSystem(anon=False)
df = pd.read_csv(s3.open('{}/{}'.format(my_bucket, my_key), mode='rb'))


In [5]:
#now we convert to a "pyspark" dataframe which is distributed using
#our sqlContext, we also use a struct to make sure all the 
#columns have the names we want and the types (sometimes the conversion from
#pandas to dataframe gets messed up b/c of type issues)
movieSchema = StructType([StructField("character_id", StringType(), False), \
                      StructField("character_name", StringType(), False),\
                      StructField("movie_id", StringType(), False),\
                      StructField("movie_name", StringType(), False),\
                      StructField("character_gender", StringType(), False),\
                      StructField("position_in_credits", StringType(), False)])


pyDF = sqlContext.createDataFrame(df,schema=movieSchema)

In [6]:
#Read in the title csv file and create a data frame for it
dfTitle = pd.read_csv(s3.open('{}/{}'.format(my_bucket, my_key2), mode='rb')) 
pyDFTitle = sqlContext.createDataFrame(dfTitle)
pyDFTitle.printSchema()	


root
 |-- movie_id: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- movie_year: string (nullable = true)
 |-- IMDB_rating: double (nullable = true)
 |-- IMDB_votes: long (nullable = true)
 |-- genres: string (nullable = true)



In [7]:
#look at our schema to make sure everything read well
pyDF.printSchema()
pyDF.show(5)

root
 |-- character_id: string (nullable = false)
 |-- character_name: string (nullable = false)
 |-- movie_id: string (nullable = false)
 |-- movie_name: string (nullable = false)
 |-- character_gender: string (nullable = false)
 |-- position_in_credits: string (nullable = false)

+------------+--------------+--------+--------------------+----------------+-------------------+
|character_id|character_name|movie_id|          movie_name|character_gender|position_in_credits|
+------------+--------------+--------+--------------------+----------------+-------------------+
|          u0|        BIANCA|      m0|10 things i hate ...|               f|                  4|
|          u1|         BRUCE|      m0|10 things i hate ...|               ?|                  ?|
|          u2|       CAMERON|      m0|10 things i hate ...|               m|                  3|
|          u3|      CHASTITY|      m0|10 things i hate ...|               ?|                  ?|
|          u4|          JOEY|      m0|

In [8]:
#Find the average vote
pyDFTitle = pyDFTitle.withColumn('IMDB_Avg_Rating',pyDFTitle['IMDB_rating'] / pyDFTitle['IMDB_votes'])

In [9]:
#Filter for movie after 1990
pyDFTitle = pyDFTitle.filter(pyDFTitle['movie_year'] > '1990')

In [10]:
#Join by movie_id to create new dataframe called pyDFCharMovie
pyDFCharMovie = pyDFTitle.join(pyDF, pyDFTitle.movie_id == pyDF.movie_id)

In [11]:
#Order by movie year
pyDFCharMovie = pyDFCharMovie.orderBy(pyDFCharMovie['movie_year'].desc())

In [12]:
bytes_to_write = pyDFCharMovie.toPandas().to_csv(None).encode()
with s3.open('s3://rogers-bucket-test-814/movie-corpus-CSV/fused_data.csv', 'wb') as f:
   	f.write(bytes_to_write)
