In [14]:
from pyspark import SparkConf, SparkContext 
from pyspark.ml.feature import HashingTF,IDF,Tokenizer
from pyspark.sql import SQLContext
import tensorflow as tf
import tensorflow_hub as hub
from pyspark.ml.clustering import KMeans,KMeansModel
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.functions import udf

def deleteFirstRow(record):
    try:
        index, promptID, pairID, genre, sentence1_binary_parse, sentence2_binary_parse, \
        sentence1_parse, sentence2_parse, sentence1, sentence2, label1, label2, label3, \
        label4, label5, gold_label = record.split("\t")
        if(index == "index"):
        	return False
        else:
        	return True
    except:
        try:
            index, promptID, pairID, genre, sentence1_binary_parse, \
    		sentence2_binary_parse, sentence1_parse, sentence2_parse, sentence1, \
    		sentence2 = record.split("\t")
            if(index == "index"):
                return False
            else:
                return True
        except:
            try:
                index, promptID, pairID, genre, sentence1_binary_parse, \
                sentence2_binary_parse, sentence1_parse, sentence2_parse, sentence1, \
                sentence2, label1, gold_label = record.split("\t")
                if(index == "index"):
                    return False
                else:
                    return True
            except:
                return ()
                
def extractGenreAndSentencesForFlatmap(record):
    try:
        index, promptID, pairID, genre, sentence1_binary_parse, sentence2_binary_parse, \
        sentence1_parse, sentence2_parse, sentence1, sentence2, label1, \
        gold_label = record.split("\t")
        genre_sentences = [(genre, sentence1), (genre, sentence2)]
        return (genre_sentences)
    except:
        return()
    
def emb(record):
    url = "https://tfhub.dev/google/universal-sentence-encoder/2"
    embed = hub.Module(url)
    data = [row for row in record]
    sentence_list = [row[1] for row in data]
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        embeddings = session.run(embed(sentence_list))
    return [(data[i][0], embeddings[i]) for i in range(len(data))]

def toPrint(result):
    sum_list = [0,0,0,0,0]
    genres = []
    for item in result:
        predict, genre, count = item
        sum_list[predict] += count
        if genre not in genres:
            genres.append(genre)

    cluster = {}
    for item in result:
        predict, genre, count = item
        index = genres.index(genre)
        if predict not in cluster.keys():
            cluster[predict] = [0]*5
        cluster[predict][index] = count/sum_list[predict]*100
    print('{:15s}'.format(""), end = "")
    for g in genres:
        print('{:15s}'.format(g), end = "")
    print()
    i=0
    while i<len(cluster):
        pos = cluster[i].index(max(cluster[i]))
        print('{:15s}'.format(genres[pos]), end = "")
        for p in cluster[i]:
            print('{:15s}'.format(str(round(p,2)) + "%"), end = "")
        i+=1
        print()
        
def toList(record):
    genre, features = record
    return(genre, features.tolist())

input_file_train = 's3://comp5349-slia7223/e-4AT2I9YSEAYUVL0BCTAB2KP5Y/train.tsv'

spark_conf = SparkConf().setAppName("Comp 5349 Assignment 2 Sentence Vector Exploaration")
sc=SparkContext.getOrCreate(spark_conf) 
sqlContext = SQLContext(sc)

text_train = sc.textFile(input_file_train)

pure_text_train = text_train.filter(deleteFirstRow)
genre_and_sentences_after_flatmap = pure_text_train.flatMap(extractGenreAndSentencesForFlatmap)
genre_and_sentences_after_flatmap.persist()

# TFIDF
tfidf_dataFrame = genre_and_sentences_after_flatmap.toDF(["genre","sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
tfidf_words_data = tokenizer.transform(tfidf_dataFrame)

hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=512)
tfidf_featurized_data = hashing_tf.transform(tfidf_words_data)

idf_model = IDF(inputCol="rawFeatures", outputCol="features").fit(tfidf_featurized_data)
tfidf_rescaled_data = idf_model.transform(tfidf_featurized_data)
tfidf_genre_features = tfidf_rescaled_data.select("genre", "features")

# Confusion matrix for TFIDF
tfidf_kmeansmodel = KMeans().setK(5).setFeaturesCol('features').setPredictionCol('prediction').fit(tfidf_genre_features)
tfidf_predictions = tfidf_kmeansmodel.transform(tfidf_genre_features).select("prediction", "genre")
tfidf_res = tfidf_predictions.groupBy(['prediction', 'genre']).count().collect()
print("Confusion matrix for TFIDF:")
toPrint(tfidf_res)
print()



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Confusion matrix for TFIDF:
               travel         telephone      government     slate          fiction        
fiction        20.09%         19.39%         19.19%         20.28%         21.05%         
telephone      0%             100.0%         0%             0%             0%             
travel         50.0%          25.0%          0%             25.0%          0%             
telephone      0%             93.9%          2.44%          2.44%          1.22%          
telephone      0%             100.0%         0%             0%             0%

In [2]:
print("dd")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dd

In [15]:
# pretrained
pretrained_genre_features = genre_and_sentences_after_flatmap.mapPartitions(emb)
pretrained_dataFrame = pretrained_genre_features.map(toList).toDF(["genre","features"])

new_schema = ArrayType(DoubleType(), containsNull=False)
udf_foo = udf(lambda x:x, new_schema)
pretrained_dataFrame = pretrained_dataFrame.withColumn("features",udf_foo("features"))

# Confusion matrix for pretrained
pretrained_kmeansmodel = KMeans().setK(5).setFeaturesCol('features').setPredictionCol('prediction').fit(pretrained_dataFrame)
pretrained_predictions = pretrained_kmeansmodel.transform(pretrained_dataFrame).select("prediction", "genre")
pretrained_res = pretrained_predictions.groupBy(['prediction', 'genre']).count().collect()
print("Confusion matrix for pretrained:")
toPrint(pretrained_res)

genre_and_sentences_after_flatmap.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Confusion matrix for pretrained:
               travel         telephone      fiction        government     slate          
fiction        3.09%          30.67%         46.65%         3.87%          15.72%         
travel         62.97%         21.84%         5.38%          3.48%          6.33%          
slate          29.21%         12.37%         11.0%          3.44%          43.99%         
government     3.09%          15.72%         0.77%          62.89%         17.53%         
telephone      2.97%          36.8%          36.06%         8.18%          15.99%         
PythonRDD[562] at RDD at PythonRDD.scala:53