In [1]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install spark-nlp
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash
!cd ~/.ivy2/cache/com.johnsnowlabs.nlp/spark-nlp_2.11/jars && ls -lt

Collecting spark-nlp
[?25l  Downloading https://files.pythonhosted.org/packages/6c/35/3d06b93fefdeab0f6f544b1fc48e5e49c049697c38611ef870383031380b/spark_nlp-3.0.3-py2.py3-none-any.whl (43kB)
[K     |████████████████████████████████| 51kB 1.6MB/s 
[?25hInstalling collected packages: spark-nlp
Successfully installed spark-nlp-3.0.3
--2021-05-21 21:09:33--  http://setup.johnsnowlabs.com/colab.sh
Resolving setup.johnsnowlabs.com (setup.johnsnowlabs.com)... 51.158.130.125
Connecting to setup.johnsnowlabs.com (setup.johnsnowlabs.com)|51.158.130.125|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh [following]
--2021-05-21 21:09:34--  https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubu

In [None]:
# Enter this in console:

''' 

function ConnectButton(){
    console.log("Connect pushed"); 
    document.querySelector("#top-toolbar > colab-connect-button").shadowRoot.querySelector("#connect").click() 
}
setInterval(ConnectButton,60000);

'''

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
import pandas as pd

import sparknlp
from pyspark.sql import functions as F
import pyspark.sql.types as T
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import Tokenizer, Normalizer, LemmatizerModel, StopWordsCleaner
from sparknlp.annotator import PerceptronModel, Chunker
from nltk.corpus import stopwords
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.clustering import LDA

In [3]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [4]:
spark = sparknlp.start()

In [5]:
movies = spark.read.parquet("movies.snappy.parquet")
reviews = spark.read.parquet("reviews.snappy.parquet")

# Identifying most popular genres

In [6]:
movies = movies.withColumn("genres_list", F.split(F.col("genres"), ', '))

In [7]:
movies_by_genre = movies.select(
    F.explode(F.col("genres_list")).alias('genre'),
    'audience_rating',
    'audience_count'
).groupby('genre').mean()

In [8]:
top_genres = [row['genre'] for row in 
              movies_by_genre.sort("avg(audience_count)", ascending=False).collect()[:10]]

In [9]:
top_genres

['Science Fiction & Fantasy',
 'Kids & Family',
 'Animation',
 'Romance',
 'Action & Adventure',
 'Comedy',
 'Sports & Fitness',
 'Mystery & Suspense',
 'Drama',
 'Horror']

# Merging datasets

In [10]:
sci_fi = movies.where(F.array_contains(F.col("genres_list"), top_genres[0]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

kids = movies.where(F.array_contains(F.col("genres_list"), top_genres[1]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

animation = movies.where(F.array_contains(F.col("genres_list"), top_genres[2]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

romance = movies.where(F.array_contains(F.col("genres_list"), top_genres[3]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

action = movies.where(F.array_contains(F.col("genres_list"), top_genres[4]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

comedy = movies.where(F.array_contains(F.col("genres_list"), top_genres[5]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

sports = movies.where(F.array_contains(F.col("genres_list"), top_genres[6]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

mystery = movies.where(F.array_contains(F.col("genres_list"), top_genres[7]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

drama = movies.where(F.array_contains(F.col("genres_list"), top_genres[8]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

horror = movies.where(F.array_contains(F.col("genres_list"), top_genres[9]) == True)\
  .select("rotten_tomatoes_link", "original_release_date", "streaming_release_date", "audience_rating", "audience_count")\
  .join(reviews.select('rotten_tomatoes_link', 'critic_name', 'review_type', 'review_date', 'review_content'), on="rotten_tomatoes_link")

In [11]:
genre_dfs = {
  'sci_fi': sci_fi,
  'kids': kids,
  'animation': animation,
  'romance': romance,
  'action': action,
  'comedy': comedy,
  'sports': sports,
  'mystery': mystery,
  'drama': drama,
  'horror': horror
}

# Text Analysis

In [13]:
documentAssembler = DocumentAssembler().setInputCol("review_content").setOutputCol('document')

tokenizer = Tokenizer().setInputCols(['document']).setOutputCol('tokenized')

normalizer = Normalizer().setInputCols(['tokenized']).setOutputCol('normalized').setLowercase(True)

lemmatizer = LemmatizerModel.pretrained().setInputCols(['normalized']).setOutputCol('lemmatized')
     
eng_stopwords = stopwords.words('english')

stopwords_cleaner = StopWordsCleaner().setInputCols(['lemmatized']).setOutputCol('no_stop_lemmatized').setStopWords(eng_stopwords)

pos_tagger = PerceptronModel.pretrained('pos_anc').setInputCols(['document', 'lemmatized']).setOutputCol('pos')

allowed_tags = ['<JJ>+<NN>', '<NN>+<NN>']
chunker = Chunker().setInputCols(['document', 'pos']).setOutputCol('ngrams').setRegexParsers(allowed_tags)

finisher = Finisher().setInputCols(['ngrams'])

pipeline = Pipeline().setStages([documentAssembler, tokenizer, normalizer, lemmatizer, stopwords_cleaner, pos_tagger, chunker,finisher])

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]


In [21]:
for key in list(genre_dfs.keys()):
  genre_df = genre_dfs[key]

  for rating in ["Rotten", "Fresh"][:1]:
    df = genre_df.where(F.col("review_type") == rating)

    review_text = df.select('review_content').filter(F.col("review_content").isNotNull())
    processed_review = pipeline.fit(review_text).transform(review_text)

    tfizer = CountVectorizer(inputCol='finished_ngrams',
                            outputCol='tf_features')
    tf_model = tfizer.fit(processed_review)
    tf_result = tf_model.transform(processed_review)

    idfizer = IDF(inputCol='tf_features', 
                  outputCol='tf_idf_features')
    idf_model = idfizer.fit(tf_result)
    tfidf_result = idf_model.transform(tf_result)

    num_topics = 6
    max_iter = 10
    lda = LDA(k=num_topics, 
              maxIter=max_iter, 
              featuresCol='tf_idf_features')
    lda_model = lda.fit(tfidf_result)
    vocab = tf_model.vocabulary

    def get_words(token_list):
        return [vocab[token_id] for token_id in token_list]
    udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

    num_top_words = 7
    topics = lda_model\
        .describeTopics(num_top_words)\
        .withColumn('topicWords', udf_to_words(F.col('termIndices')))

    topics.coalesce(1).write.save(f'{key}_{rating}_topics.snappy.parquet')
    tfidf_result.coalesce(1).write.save(f'{key}_{rating}_tfidf.snappy.parquet')