### import packages

In [2]:
import pyspark
import numpy as np
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, lit, size, explode, monotonically_increasing_id, split, udf
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, FloatType, ArrayType
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, VectorAssembler, PCA, Normalizer
from pyspark.ml.clustering import KMeans, LDA
from pyspark.ml.linalg import Vectors
import tensorflow as tf
import tensorflow_hub as hub

ModuleNotFoundError: No module named 'pyspark'

### create new Spark session

In [1]:
sc = SparkSession \
    .builder \
    .appName("Assignment") \
    .getOrCreate()
sqlContext = SQLContext(sparkContext=sc.sparkContext, sparkSession=sc)

NameError: name 'SparkSession' is not defined

### file path


In [1]:
dev_matched_path = "s3://mnliassignment/dev_matched.tsv"
dev_mismatched_path = "s3://mnliassignment/dev_mismatched.tsv"
test_matched_path = "s3://mnliassignment/test_matched.tsv"
test_mismatched_path = "s3://mnliassignment/test_mismatched.tsv"
train_path = "s3://mnliassignment/train.tsv"

### schema for all files

In [None]:
data_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("promptID", IntegerType(), True),
    StructField("pairID", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("sentence1_binary_parse", StringType(), True),
    StructField("sentence2_binary_parse", StringType(), True),
    StructField("sentence1_parse", StringType(), True),
    StructField("sentence2_parse", StringType(), True),
    StructField("sentence1", StringType(), True),
    StructField("sentence2", StringType(), True),
    StructField("label1", StringType(), True),
    StructField("label2", StringType(), True),
    StructField("label3", StringType(), True),
    StructField("label4", StringType(), True),
    StructField("label5", StringType(), True),
    StructField("gold_label", StringType(), True)])

### load files 

In [None]:
dev_matched_df= sqlContext.read.format("com.databricks.spark.csv")\
                    .option("delimiter", "\t")\
                    .option("header", "true")\
                    .schema(data_schema)\
                    .load(dev_matched_path)\
                    .repartition(5)\
                    .select("genre", "sentence1", "sentence2")

dev_mismatched_df= sqlContext.read.format("com.databricks.spark.csv")\
                    .option("delimiter", "\t")\
                    .option("header", "true")\
                    .schema(data_schema)\
                    .load(dev_mismatched_path)\
                    .repartition(5)\
                    .select("genre", "sentence1", "sentence2")

test_matched_df= sqlContext.read.format("com.databricks.spark.csv")\
                    .option("delimiter", "\t")\
                    .option("header", "true")\
                    .schema(data_schema)\
                    .load(test_matched_path)\
                    .repartition(5)\
                    .select("genre", "sentence1", "sentence2")

test_mismatched_df= sqlContext.read.format("com.databricks.spark.csv")\
                    .option("delimiter", "\t")\
                    .option("header", "true")\
                    .schema(data_schema)\
                    .load(test_mismatched_path)\
                    .repartition(5)\
                    .select("genre", "sentence1", "sentence2")

train_df= sqlContext.read.format("com.databricks.spark.csv")\
                    .option("delimiter", "\t")\
                    .option("header", "true")\
                    .schema(data_schema)\
                    .load(train_path)\
                    .repartition(5)\
                    .select("genre", "sentence1", "sentence2")

### 3.1 Vocab Exploration - Matched and Mismatched Sets

In [None]:
#tokenizer to split the sentence to list of words
tokenizer = Tokenizer(inputCol="combine_sentence", outputCol="words")

#dev_mismatched
dev_mismatched_combine_sentence = dev_mismatched_df.select(concat(col("sentence1"), lit(" "), col("sentence2")).alias("combine_sentence"))
dev_mismatched_transform = tokenizer.transform(dev_mismatched_combine_sentence.na.fill({'combine_sentence': 'the' })).filter(size('words') > 0)

#test_mismatched
test_mismatched_combine_sentence = test_mismatched_df.select(concat(col("sentence1"), lit(" "), col("sentence2")).alias("combine_sentence"))
test_mismatched_transform = tokenizer.transform(test_mismatched_combine_sentence.na.fill({'combine_sentence': 'the' })).filter(size('words') > 0)

#dev_matched
dev_matched_combine_sentence = dev_matched_df.select(concat(col("sentence1"), lit(" "), col("sentence2")).alias("combine_sentence"))
dev_matched_transform = tokenizer.transform(dev_matched_combine_sentence.na.fill({'combine_sentence': 'the' })).filter(size('words') > 0)

#test_matched
test_matched_combine_sentence = test_matched_df.select(concat(col("sentence1"), lit(" "), col("sentence2")).alias("combine_sentence"))
test_matched_transform = tokenizer.transform(test_matched_combine_sentence.na.fill({'combine_sentence': 'the' })).filter(size('words') > 0)

In [None]:
dev_match = dev_matched_transform.select(explode("words").alias("words")).distinct()
test_match = test_matched_transform.select(explode("words").alias("words")).distinct()
match_concat = dev_match.union(test_match).distinct()

dev_mismatch = dev_mismatched_transform.select(explode("words").alias("words")).distinct()
test_mismatch = test_mismatched_transform.select(explode("words").alias("words")).distinct()
mismatch_concat = dev_mismatch.union(test_mismatch).distinct()


In [None]:
dev_match = dev_matched_transform.select(explode("words").alias("words")).distinct()
test_match = test_matched_transform.select(explode("words").alias("words")).distinct()
match_concat = dev_match.union(test_match).distinct()

dev_mismatch = dev_mismatched_transform.select(explode("words").alias("words")).distinct()
test_mismatch = test_mismatched_transform.select(explode("words").alias("words")).distinct()
mismatch_concat = dev_mismatch.union(test_mismatch).distinct()


#### 3.1.1 number of common words

In [None]:
intersect = match_concat.intersect(mismatch_concat)
intersect.count()

#### 3.1.2 unique words to matched sets

In [None]:
unique_matched = match_concat.subtract(mismatch_concat)
unique_matched.count()

#### 3.1.3 unique words to mismatched sets


In [None]:
unique_mismatched = mismatch_concat.subtract(match_concat)
unique_mismatched.count()

### 3.2 Vocab Exploration - Training Sets

#### 3.2.1 percentage of words in 5,4,3,2,1 genre

In [None]:
tokenizer = Tokenizer(inputCol="combine_sentence", outputCol="words")

train_sentence_genre = train_df \
                  .select(concat(col("sentence1"), lit(" "), col("sentence2")) \
                  .alias("combine_sentence"), "genre")
                  
train_tokenized = tokenizer \
                  .transform(train_sentence_genre.na.fill({'combine_sentence': 'the' })) \
                  .filter(size('words') > 0).cache()

train_word_genre = train_tokenized \
                  .select(explode("words").alias("word"), "genre") \
                  .distinct()

train_word_appear = train_word_genre \
                  .groupBy('word').count() \
                  .select('word', col('count').alias('appear')).cache()

train_five_genre = train_word_appear.filter("appear = 5")
train_four_genre = train_word_appear.filter("appear = 4")
train_three_genre = train_word_appear.filter("appear = 3")
train_two_genre = train_word_appear.filter("appear = 2")
train_one_genre = train_word_appear.filter("appear = 1")

In [None]:
train_all_count = train_word_appear.count()
train_five_genre_count = train_five_genre.count()
train_four_genre_count = train_four_genre.count()
train_three_genre_count = train_three_genre.count()
train_two_genre_count = train_two_genre.count()
train_one_genre_count = train_one_genre.count()

fivegenre = train_five_genre_count/train_all_count*100
fourgenre = train_four_genre_count/train_all_count*100
threegenre = train_three_genre_count/train_all_count*100
twogenre = train_two_genre_count/train_all_count*100
onegenre = train_one_genre_count/train_all_count*100

print("Total number of words in training set: "+ str(train_all_count))
print("Percentage of words appearing in 5 genres: "+ str(round(fivegenre, 3))+ "%")
print("Percentage of words appearing in 4 genres: "+ str(round(fourgenre, 3)) + "%")
print("Percentage of words appearing in 3 genres: "+ str(round(threegenre, 3)) + "%")
print("Percentage of words appearing in 2 genres: "+ str(round(twogenre, 3)) + "%")
print("Percentage of words appearing in 1 genres: "+ str(round(onegenre, 3)) + "%")

#### 3.2.2 same percentage of words after removing stop words

In [None]:
def clean_str(x):
  punc='!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
  j=[]
  for i in x:
    for ch in punc:
      i = i.replace(ch, '')
    j.append(i)
  return j

clean_str_udf = udf(lambda z: clean_str(z) if z is not None else None, ArrayType(StringType()))

#stopword from CoreNLP
stopwordList = ['!!', '?!', '??', '!?', '`', '``', "''", '-lrb-', '-rrb-', '-lsb-', '-rsb-', ',', '.', ':', ';', '"', "'", '?', '<', '>', '{', '}', '[', ']', '+', '-', '(', ')', '&', '%', '$', '@', '!', '^', '#', '*', '..', '...', "'ll", "'s", "'m", 'a', 'about', 'above', 'after', 'again', 'against', 'all', 'am', 'an', 'and', 'any', 'are', "aren't", 'as', 'at', 'be', 'because', 'been', 'before', 'being', 'below', 'between', 'both', 'but', 'by', 'can', "can't", 'cannot', 'could', "couldn't", 'did', "didn't", 'do', 'does', "doesn't", 'doing', "don't", 'down', 'during', 'each', 'few', 'for', 'from', 'further', 'had', "hadn't", 'has', "hasn't", 'have', "haven't", 'having', 'he', "he'd", "he'll", "he's", 'her', 'here', "here's", 'hers', 'herself', 'him', 'himself', 'his', 'how', "how's", 'i', "i'd", "i'll", "i'm", "i've", 'if', 'in', 'into', 'is', "isn't", 'it', "it's", 'its', 'itself', "let's", 'me', 'more', 'most', "mustn't", 'my', 'myself', 'no', 'nor', 'not', 'of', 'off', 'on', 'once', 'only', 'or', 'other', 'ought', 'our', 'ours', 'ourselves', 'out', 'over', 'own', 'same', "shan't", 'she', "she'd", "she'll", "she's", 'should', "shouldn't", 'so', 'some', 'such', 'than', 'that', "that's", 'the', 'their', 'theirs', 'them', 'themselves', 'then', 'there', "there's", 'these', 'they', "they'd", "they'll", "they're", "they've", 'this', 'those', 'through', 'to', 'too', 'under', 'until', 'up', 'very', 'was', "wasn't", 'we', "we'd", "we'll", "we're", "we've", 'were', "weren't", 'what', "what's", 'when', "when's", 'where', "where's", 'which', 'while', 'who', "who's", 'whom', 'why', "why's", 'with', "won't", 'would', "wouldn't", 'you', "you'd", "you'll", "you're", "you've", 'your', 'yours', 'yourself', 'yourselves', '###', 'return', 'arent', 'cant', 'couldnt', 'didnt', 'doesnt', 'dont', 'hadnt', 'hasnt', 'havent', 'hes', 'heres', 'hows', 'im', 'isnt', 'its', 'lets', 'mustnt', 'shant', 'shes', 'shouldnt', 'thats', 'theres', 'theyll', 'theyre', 'theyve', 'wasnt', 'were', 'werent', 'whats', 'whens', 'wheres', 'whos', 'whys', 'wont', 'wouldnt', 'youd', 'youll', 'youre', 'youve']

tokenizer = Tokenizer(inputCol="combine_sentence", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stopwordList)

In [None]:
#train_tokenized from 3.2.1

train_remover = remover \
                .transform(train_tokenized) \
                .select('filtered', 'genre')

train_cleaned = train_remover \
            .withColumn('words', clean_str_udf(train_remover.filtered)) \
            .select('words', 'genre')

train_cleaned_word_genre = train_cleaned \
                        .select(explode('words').alias('word'), 'genre') \
                        .distinct()

train_cleaned_word_appear = train_cleaned_word_genre \
                            .groupBy('word').count() \
                            .select('word', col('count').alias('appear')).cache()

train_cleaned_five_genre = train_cleaned_word_appear.filter("appear = 5")
train_cleaned_four_genre = train_cleaned_word_appear.filter("appear = 4")
train_cleaned_three_genre = train_cleaned_word_appear.filter("appear = 3")
train_cleaned_two_genre = train_cleaned_word_appear.filter("appear = 2")
train_cleaned_one_genre = train_cleaned_word_appear.filter("appear = 1")

In [None]:
train_cleaned_count = train_cleaned_word_appear.count()
train_cleaned_five_genre_count = train_cleaned_five_genre.count() 
train_cleaned_four_genre_count = train_cleaned_four_genre.count() 
train_cleaned_three_genre_count = train_cleaned_three_genre.count() 
train_cleaned_two_genre_count = train_cleaned_two_genre.count() 
train_cleaned_one_genre_count = train_cleaned_one_genre.count() 

cleaned_fivegenre = train_cleaned_five_genre_count/train_cleaned_count*100
cleaned_fourgenre = train_cleaned_four_genre_count/train_cleaned_count*100
cleaned_threegenre = train_cleaned_three_genre_count/train_cleaned_count*100
cleaned_twogenre = train_cleaned_two_genre_count/train_cleaned_count*100
cleaned_onegenre = train_cleaned_one_genre_count/train_cleaned_count*100

print("Total number of words in training set (after removing stopwords): "+ str(train_cleaned_count))
print("Percentage of words appearing in 5 genres: "+ str(round(cleaned_fivegenre, 3)) + "%")
print("Percentage of words appearing in 4 genres: "+ str(round(cleaned_fourgenre, 3)) + "%")
print("Percentage of words appearing in 3 genres: "+ str(round(cleaned_threegenre, 3)) + "%")
print("Percentage of words appearing in 2 genres: "+ str(round(cleaned_twogenre, 3)) + "%")
print("Percentage of words appearing in 1 genres: "+ str(round(cleaned_onegenre, 3)) + "%")

### 4 Sentence Vector Exploration

#### 4.1 TD IDF

In [None]:
sen1 = train_df.drop("sentence2")
sen2 = train_df.drop("sentence1")
combine_train_df = sen1.union(sen2).withColumnRenamed("sentence1","sentence")
combine_train_df= combine_train_df.na.fill({'sentence': '' })

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# wordsData = tokenizer.transform(combine_train_df)

#.filter(size('words') > 0)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
# featurizedData = hashingTF.transform(wordsData)


idf = IDF(inputCol="rawFeatures", outputCol="features")
# idfModel = idf.fit(featurizedData)
# rescaledData = idfModel.transform(featurizedData)

kmeans = KMeans(featuresCol='features',k=5, seed=23, maxIter=100)
# model = kmeans.fit(rescaledData)
# results = model.transform(rescaledData)

pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, kmeans])
model = pipeline.fit(combine_train_df)
results = model.transform(combine_train_df).cache()


In [None]:
# calculate the cluster label name
results.filter(results["prediction"]==0).groupBy("genre").count().show()
results.filter(results["prediction"]==1).groupBy("genre").count().show()
results.filter(results["prediction"]==2).groupBy("genre").count().show()
results.filter(results["prediction"]==3).groupBy("genre").count().show()
results.filter(results["prediction"]==4).groupBy("genre").count().show()

In [None]:
# calculation of the percentage
# cluster label name by the maximum genre (duplicate allowed)
'''
from the above result, we compute the label name for each row:
cluster 0 = fiction
cluster 1 = government
cluster 2 = telephone
cluster 3 = telephone
cluster 4 = telephone

In the confusion matrix, the column is as followed:
true label 0 = travel
true label 1 = slate
true label 2 = fiction
true label 3 = government
true label 4 = telephone
'''
# row cluster 0: total = 542038
travel_0 = 108250 / 542038
slate_0 = 118928 / 542038
fiction_0 = 123198 / 542038
gov_0 = 105072 / 542038
tele_0 = 86590 / 542038

# row cluster 1: total = 134066
travel_1 = 46145 / 134066
slate_1 = 27015 / 134066
fiction_1 = 8432 / 134066
gov_1 = 47880 / 134066
tele_1 = 4594 / 134066

# row cluster 2: total = 29488
travel_2 = 21 / 29488
slate_2 = 228 / 29488
fiction_2 = 163 / 29488
gov_2 = 58 / 29488
tele_2 = 29018 / 29488

# row cluster 3: total = 75165
travel_3 = 284 / 75165
slate_3 = 8441 / 75165
fiction_3 = 22903 / 75165
gov_3 = 58 / 75165
tele_3 = 41847 / 75165

# row cluster 4: total = 4647
travel_4 = 0 / 4647
slate_4 = 0 / 4647
fiction_4 = 0 / 4647
gov_4 = 0 / 4647
tele_4 = 4647 / 4647

#### 4.2 Universal Sentence Encoder

In [None]:
def review_embed(rev_text_partition):
    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
    embed = hub.Module(module_url)
    # mapPartition would supply element inside a partition using generator stype
    # this does not fit tensorflow stype
    rev_text_list = [text for text in rev_text_partition]
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        message_embeddings = session.run(embed(rev_text_list))
    return message_embeddings


In [None]:
#trabel df to rdd
travel_rdd = combine_train_df.filter(combine_train_df["genre"]=="travel")\
                                    .select('sentence').rdd.map(lambda row: str(row[0])).filter(lambda data: data is not None).cache()
travel_embedding = travel_rdd.mapPartitions(review_embed).cache()

#telephone df to rdd
telephone_rdd = combine_train_df.filter(combine_train_df["genre"]=="telephone")\
                                    .select('sentence').rdd.map(lambda row: str(row[0])).filter(lambda data: data is not None).cache()
telephone_embedding = telephone_rdd.mapPartitions(review_embed).cache()

#slate df to rdd
slate_rdd = combine_train_df.filter(combine_train_df["genre"]=="slate")\
                                    .select('sentence').rdd.map(lambda row: str(row[0])).filter(lambda data: data is not None).cache()
slate_embedding = slate_rdd.mapPartitions(review_embed).cache()

#government df to rdd
government_rdd = combine_train_df.filter(combine_train_df["genre"]=="government")\
                                    .select('sentence').rdd.map(lambda row: str(row[0])).filter(lambda data: data is not None).cache()
government_embedding = government_rdd.mapPartitions(review_embed).cache()

#fiction df to rdd
fiction_rdd = combine_train_df.filter(combine_train_df["genre"]=="fiction")\
                                    .select('sentence').rdd.map(lambda row: str(row[0])).filter(lambda data: data is not None).cache()
fiction_embedding = fiction_rdd.mapPartitions(review_embed).cache()


In [None]:
#fiction embed
fiction_embedding_df = spark.createDataFrame(fiction_embedding.map(lambda v: v.tolist()))
fiction_assembler = VectorAssembler(inputCols=fiction_embedding_df.columns,outputCol="features")
fiction_embedding_vectors = fiction_assembler.transform(fiction_embedding_df).select("features")

#travel embed
travel_embedding_df = spark.createDataFrame(travel_embedding.map(lambda v: v.tolist()))
travel_assembler = VectorAssembler(inputCols=travel_embedding_df.columns,outputCol="features")
travel_embedding_vectors = travel_assembler.transform(travel_embedding_df).select("features")

#telephone embed
tele_embedding_df = spark.createDataFrame(telephone_embedding.map(lambda v: v.tolist()))
tele_assembler = VectorAssembler(inputCols=tele_embedding_df.columns,outputCol="features")
tele_embedding_vectors = tele_assembler.transform(tele_embedding_df).select("features")

#government embed
gov_embedding_df = spark.createDataFrame(government_embedding.map(lambda v: v.tolist()))
gov_assembler = VectorAssembler(inputCols=gov_embedding_df.columns,outputCol="features")
gov_embedding_vectors = gov_assembler.transform(gov_embedding_df).select("features")

#slate embed
slate_embedding_df = spark.createDataFrame(slate_embedding.map(lambda v: v.tolist()))
slate_assembler = VectorAssembler(inputCols=slate_embedding_df.columns,outputCol="features")
slate_embedding_vectors = slate_assembler.transform(slate_embedding_df).select("features")


In [None]:
kmeans = KMeans(featuresCol='features',k=5, seed=6)

#travel
travel_model = kmeans.fit(travel_embedding_vectors)
travel_results = travel_model.transform(travel_embedding_vectors)

#government
gov_model = kmeans.fit(gov_embedding_vectors)
gov_results = gov_model.transform(gov_embedding_vectors)

#telephone
tele_model = kmeans.fit(tele_embedding_vectors)
tele_results = tele_model.transform(tele_embedding_vectors)

#slate
slate_model = kmeans.fit(slate_embedding_vectors)
slate_results = slate_model.transform(slate_embedding_vectors)

#fiction
fiction_model = kmeans.fit(fiction_embedding_vectors)
fiction_results = fiction_model.transform(fiction_embedding_vectors)

In [None]:
travel_results.groupBy("prediction").count().show()
gov_results.groupBy("prediction").count().show()
tele_results.groupBy("prediction").count().show()
slate_results.groupBy("prediction").count().show()
fiction_results.groupBy("prediction").count().show()

In [None]:
#calculate cluster label name
# cluter label name by maximum genre, if genre exists in other cluster, then choose the second mximum one (duplicate not allowed)
'''
from the above result, we conclude the cluster label for the row:
cluster 0 = telephone
cluster 1 = fiction
cluster 2 = travel
cluster 3 = slate
cluster 4 = government

In the confusion matrix, the column is as followed:
true label 0 = travel
true label 1 = slate
true label 2 = fiction
true label 3 = government
true label 4 = telephone
'''
# column travel: total = 154700
travel_1 = 31708 / 154700
travel_3 = 25026 / 154700
travel_4 = 26783 / 154700
travel_2 = 39131 / 154700
travel_0 = 32052 / 154700

# column government: total = 154700
gov_1 = 32314 / 154700
gov_3 = 17367 / 154700
gov_4 = 38390 / 154700
gov_2 = 37254 / 154700
gov_0 = 29375 / 154700

# column telephone: total = 166696
tele_1 = 21026 / 166696
tele_3 = 37211 / 166696
tele_4 = 38160 / 166696
tele_2 = 30525 / 166696
tele_0 = 39774 / 166696

# column slate: total = 154612
slate_1 = 38203 / 154612
slate_3 = 37185 / 154612
slate_4 = 31379 / 154612
slate_2 = 24075 / 154612
slate_0 = 23770 / 154612

# column fiction: total = 154696
fiction_1 = 37610 / 154696
fiction_3 = 17164 / 154696
fiction_4 = 40297 / 154696
fiction_2 = 21912 / 154696
fiction_0 = 37713 / 154696


#### Performance Analysis

In [None]:
#Find a K-Means Cluster in TF-IDF

sen1 = train_df.drop("sentence2")
sen2 = train_df.drop("sentence1")
combine_train_df = sen1.union(sen2).withColumnRenamed("sentence1","sentence")
combine_train_df= combine_train_df.na.fill({'sentence': '' })

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(combine_train_df)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=300000)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

kmeans = KMeans(featuresCol='features',k=5, seed=500)
model = kmeans.fit(rescaledData)
results = model.transform(rescaledData)

results.filter(results["prediction"]==0).groupBy("genre").count().show()