In [20]:
import re
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from os import environ
environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.10:0.4.1 pyspark-shell'

%matplotlib inline

import findspark
findspark.init()

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.functions import min

from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

DATA_DIR = 'data/'

# Subtitle data

In [21]:
#Read the subtitle file and transform it into a DataFrame
df = sqlContext.read.format('com.databricks.spark.xml').options(rootTag='document',rowTag='s').option("valueTag", "content").load(DATA_DIR+'6653249.xml')
df.printSchema()

root
 |-- _emphasis: boolean (nullable = true)
 |-- _id: long (nullable = true)
 |-- time: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _id: string (nullable = true)
 |    |    |-- _value: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- w: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _emphasis: boolean (nullable = true)
 |    |    |-- _id: double (nullable = true)
 |    |    |-- content: string (nullable = true)



In [22]:
#We select only time and w columns, keeping only the (word) value of w
data = df.select(col('time._value').alias('time'),explode('w.content').alias('word'))
data.show()

+--------------------+--------------+
|                time|          word|
+--------------------+--------------+
|[00:01:40,634, 00...|      Terminal|
|[00:01:40,634, 00...|             A|
|[00:01:40,634, 00...|            of|
|[00:01:40,634, 00...|        Boston|
|[00:01:40,634, 00...| international|
|[00:01:40,634, 00...|       airport|
|[00:01:40,634, 00...|             .|
|[00:01:43,670, 00...|Transportation|
|[00:01:43,670, 00...|       between|
|[00:01:43,670, 00...|     terminals|
|[00:01:43,670, 00...|           ...|
|[00:01:53,612, 00...|          Take|
|[00:01:53,612, 00...|          your|
|[00:01:53,612, 00...|        laptop|
|[00:01:53,612, 00...|           out|
|[00:01:53,612, 00...|            of|
|[00:01:53,612, 00...|          your|
|[00:01:53,612, 00...|           bag|
|[00:01:53,612, 00...|             .|
|[00:02:14,264, 00...|           The|
+--------------------+--------------+
only showing top 20 rows



In [23]:
#We keep the starting time of the subtitle only
clean_data = data.withColumn('startingTime',data['time'].getItem(0)).select(col('startingTime'), col('word'))
clean_data.show()

+------------+--------------+
|startingTime|          word|
+------------+--------------+
|00:01:40,634|      Terminal|
|00:01:40,634|             A|
|00:01:40,634|            of|
|00:01:40,634|        Boston|
|00:01:40,634| international|
|00:01:40,634|       airport|
|00:01:40,634|             .|
|00:01:43,670|Transportation|
|00:01:43,670|       between|
|00:01:43,670|     terminals|
|00:01:43,670|           ...|
|00:01:53,612|          Take|
|00:01:53,612|          your|
|00:01:53,612|        laptop|
|00:01:53,612|           out|
|00:01:53,612|            of|
|00:01:53,612|          your|
|00:01:53,612|           bag|
|00:01:53,612|             .|
|00:02:14,264|           The|
+------------+--------------+
only showing top 20 rows



In [24]:
#We filter out the stop words and punctuation
#We use the stop-words list from NLTK
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

import string
import nltk

nltk.download('stopwords')

stopWords = set(stopwords.words('english'))
alphabet = list(string.ascii_lowercase)

#Checks whether a word is a stop-word or a sequence of non-alphabetic characters and sets them to None
def isAWord(x):
    if(len(x)==0 or x.lower() in stopWords or not any(c.isalpha() for c in x)):
        return None
    else:
        return x
    
udf_is_a_word = udf(isAWord, StringType())

newData = clean_data.select(('startingTime'),udf_is_a_word('word').alias('word'))
cleanData = newData.na.drop(subset=["word"])
cleanData.show()

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/michal/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
+------------+--------------+
|startingTime|          word|
+------------+--------------+
|00:01:40,634|      Terminal|
|00:01:40,634|        Boston|
|00:01:40,634| international|
|00:01:40,634|       airport|
|00:01:43,670|Transportation|
|00:01:43,670|     terminals|
|00:01:53,612|          Take|
|00:01:53,612|        laptop|
|00:01:53,612|           bag|
|00:02:14,264|        Boston|
|00:02:14,264|        police|
|00:02:14,264|    department|
|00:02:14,264|      requests|
|00:02:14,264|        safety|
|00:02:14,264|      security|
|00:02:14,264|           per|
|00:02:14,264|           FAA|
|00:02:14,264|   regulations|
|00:02:41,656|           'll|
|00:02:41,656|          home|
+------------+--------------+
only showing top 20 rows



### Metadata

In [25]:
from pyspark.sql.types import DateType

In [26]:
metadata = sqlContext.read.format('com.databricks.spark.xml').options(rootTag='metadata',rowTag='subtitle').load(DATA_DIR+'6653249.xml')
metadata.show()

+------+---+----------+----------+------------+--------+
|blocks|cds|confidence|      date|    duration|language|
+------+---+----------+----------+------------+--------+
|   870|1/1|       1.0|2016-06-10|01:32:10,143| English|
+------+---+----------+----------+------------+--------+



We'll only use the date of the subtitles therefore we define a method to just get this one single value

In [27]:
def getDate(filename):
    metadata = sqlContext.read.format('com.databricks.spark.xml').options(rootTag='metadata',rowTag='subtitle').load(DATA_DIR+filename)
    date = metadata.select('date').head()[0]
    return date

In [28]:
getDate('6653249.xml')

'2016-06-10'

# IMDB Data 

We'll be using four of the IMDB datasets: 
    1. basics - for various metadata about the movie/tv series
    2. episode - to match the subtitles tv series episodes with the overall series in the database
    3. principals - for the character name info
    4. ratings - for the ratings and views info
All of them are described here: https://www.imdb.com/interfaces/
All of the datasets fit on our local machines, however we'll need to use the cluster when joining this data with the subtitle files

In [29]:
#Loading datasets

basics = spark.read.csv(DATA_DIR+'title.basics.tsv',sep='\t',header=True)
episode = spark.read.csv(DATA_DIR+'title.episode.tsv',sep='\t',header=True)
principals = spark.read.csv(DATA_DIR+'title.principals.tsv',sep='\t',header=True)
ratings = spark.read.csv(DATA_DIR+'title.ratings.tsv',sep='\t',header=True)

We only need the following fields from every table:
    - basics:
        - all fields withouth isAdult, originalTitle
    - episode:
        - all fields
    - principals:
        - tconst, ordering, nconst, characters
    - ratings:
        - all fields

In [30]:
#remove the originalTitle and isAdult columns
basics_clean = basics.drop('isAdult', 'originalTitle')

#remove the rows where the category of a person is not 'actor' and where the character name is empty
#remove the category and job columns, since category is always 'actor' now
principals_clean = principals.drop('category','job').filter(col('category')!='actor').filter(col('characters') != '\\N')

episode_clean = episode
ratings_clean = ratings

In [31]:
#Note that we leave the endYear column because it is valid for the TV Series
basics_clean.show()

+---------+---------+--------------------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|     1892|     \N|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|     1892|     \N|            \N|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|     1893|     \N|             1|        Comedy,Short|
|tt0000006|    short|   Chinese Opium Den|     1894|     \N|             1|               Short|
|tt0000007|    short|Corbett and Court...|     1894|     \N|             1|         Short,Sport|
|tt0000008|    short|Edison Ki

In [32]:
principals_clean.show()

+---------+--------+---------+--------------------+
|   tconst|ordering|   nconst|          characters|
+---------+--------+---------+--------------------+
|tt0000001|       1|nm1588970|         ["Herself"]|
|tt0000009|       1|nm0063086|["Miss Geraldine ...|
|tt0000009|       3|nm1309758|["Himself - the D...|
|tt0000012|       1|nm2880396|         ["Herself"]|
|tt0000012|       2|nm9735580|         ["Himself"]|
|tt0000012|       3|nm0525900|         ["Herself"]|
|tt0000012|       4|nm9735581|         ["Herself"]|
|tt0000012|       7|nm9735579|         ["Herself"]|
|tt0000012|       8|nm9653419|         ["Herself"]|
|tt0000013|       1|nm0525908|         ["Himself"]|
|tt0000013|       2|nm1715062|         ["Himself"]|
|tt0000016|       1|nm0525900|["Herself (on the...|
|tt0000016|       2|nm9735581|["Herself (on the...|
|tt0000017|       2|nm3692829|        ["The girl"]|
|tt0000024|       1|nm0256651|["Herself - Empre...|
|tt0000024|       2|nm0435118|["Himself - Emper...|
|tt0000028| 

In [33]:
episode_clean.show()

+---------+------------+------------+-------------+
|   tconst|parentTconst|seasonNumber|episodeNumber|
+---------+------------+------------+-------------+
|tt0041951|   tt0041038|           1|            9|
|tt0042816|   tt0989125|           1|           17|
|tt0042889|   tt0989125|          \N|           \N|
|tt0043426|   tt0040051|           3|           42|
|tt0043631|   tt0989125|           2|           16|
|tt0043693|   tt0989125|           2|            8|
|tt0043710|   tt0989125|           3|            3|
|tt0044093|   tt0959862|           1|            6|
|tt0044901|   tt0989125|           3|           46|
|tt0045519|   tt0989125|           4|           11|
|tt0045960|   tt0044284|           2|            3|
|tt0046135|   tt0989125|           4|            5|
|tt0046150|   tt0341798|          \N|           \N|
|tt0046855|   tt0046643|           1|            4|
|tt0046864|   tt0989125|           5|           20|
|tt0047810|   tt0914702|           3|           36|
|tt0047852| 