In [1]:
# Import all necessary libraries and setup the environment for matplotlib
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.clustering import KMeans
from pyspark.sql.window import Window  
from pyspark.ml.linalg import Vectors,VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import StructType,StructField,StringType
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import pandas as pd

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1589975122815_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
spark = SparkSession.builder.appName("SentenceVectorExploration").getOrCreate()
spark.sparkContext.getConf().get('spark.executor.cores')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'4'

In [3]:
def processSentenceVectorExploration(method_to_apply):
  
    # step 0 - read file and clean data  and return as data frame
    cleaned_data_to_process = readFileAndGetCleanedDataTable()
    
    # step 1 - check which vector method to apply
    if method_to_apply == "TDIDF":
        genre_features_mapped_data  = getTDIDFMappedData(cleaned_data_to_process)
    elif method_to_apply == "GoogleEncoder":
        genre_features_mapped_data = getGoogleEncoderMappedData(cleaned_data_to_process)
    
    # setp 2 - get kmeans predicitions
    predictions =  getKmeansPredictions(genre_features_mapped_data)
   
    # step 3 - select only predcitions and genre from predcitions fetched 
    prediction_genre = predictions.select('prediction','genre').cache()
   
    # setp 4 - now label the clusters 
    labeled_cluster_rdd_map = getLabeledClusters(prediction_genre)
    print(" Labeled Clusters ",labeled_cluster_rdd_map)
   
    # setp 5 - update the predicitons with new labels in our cached prediciton genre table
    updated_prediction_genre_df = updatePredictionsWithLabeledClusters(labeled_cluster_rdd_map,prediction_genre)
    
    # setp 6 - group by genre to count the number of sentences in each genre on the pre processed table
    cleaned_data_count_map = cleaned_data_to_process.groupBy('genre').count().rdd.collectAsMap() 
   
    # step 7 - now finally calculate percentages using the pre processed data set with processed data set
    calculatePercentages(cleaned_data_count_map,updated_prediction_genre_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# read tab delimited train data and clean the data and convert into a data frame
def readFileAndGetCleanedDataTable():
    train_datafile = "s3://comp5349-spam9926/assignment/train.tsv"
    train_df = spark.read.format("org.apache.spark.csv").options(header=True,inferSchema="true").option("delimiter","\t").csv(train_datafile)
    # join sentence 1 and sentence 2 into as joined_sentence column
    train_df = train_df.withColumn('joined_sentence',F.concat(F.col('sentence1'),F.lit(' '),F.col('sentence2')))
    # select only genre and joined sentence filtering the null in joined sentences
    train_clean_df = train_df.select('genre','joined_sentence').filter(train_df.joined_sentence.isNotNull())
    return train_clean_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def getTDIDFMappedData(train_clean_df):
    tokenizer = Tokenizer(inputCol="joined_sentence", outputCol="words")
    hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=512)
    idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=1)
    pipeline = Pipeline(stages= [tokenizer,hashingTF,idf])
    model = pipeline.fit(train_clean_df)
    genre_features_mapped_data = model.transform(train_clean_df)
    return genre_features_mapped_data

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
def getGoogleEncoderMappedData(train_clean_df):
    train_text = train_clean_df.rdd.map(lambda x : (x.genre,x.joined_sentence)).mapPartitions(applyEncoder).cache()
    schema = StructType([StructField("genre",StringType()),StructField("features",VectorUDT())])
    genre_features_mapped_data = train_text.map(lambda v: (v[0],Vectors.dense(v[1]))).toDF(schema).cache()
    return genre_features_mapped_data

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
 def applyEncoder(rev_text_partition):
    embed = hub.Module("https://tfhub.dev/google/universal-sentence-encoder/2") 
    genres = []
    sentences = []
    for item in rev_text_partition:
        genres.append(item[0])
        sentences.append(item[1])
    
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        message_embeddings = session.run(embed(sentences))
    return [(genre,vector) for genre,vector in zip(genres,message_embeddings)]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
def getKmeansPredictions(genre_features_mapped_data):
    kmeans = KMeans(featuresCol='features',k=5,seed=1024)
    model = kmeans.fit(genre_features_mapped_data)
    predictions = model.transform(genre_features_mapped_data)
    return predictions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# {0: 'fiction', 1: 'telephone', 2: 'travel', 3: 'government', 4: 'telephone'}
def getLabeledClusters(prediction_genre):
    predictions_val = prediction_genre.groupBy("prediction","genre").agg(F.count("genre").alias("genreCount"))
    predictions_val_max = predictions_val.withColumn("maxItemCount",F.max("genreCount").over(Window.partitionBy("prediction")))
    labeled_cluster = predictions_val_max.filter("maxItemCount - genreCount == 0").drop("maxItemCount","genreCount").sort('prediction')
    labeled_cluster_rdd_map = labeled_cluster.rdd.collectAsMap()
    return labeled_cluster_rdd_map

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
def updatePredictionsWithLabeledClusters(labeled_cluster_rdd_map,prediction_genre):
    predicitions_udf = F.udf(lambda x : labeled_cluster_rdd_map.get(x))
    updated_prediction_genre_df = prediction_genre.withColumn('prediction', predicitions_udf(prediction_genre.prediction))
    updated_prediction_genre_df = updated_prediction_genre_df.groupBy(['prediction','genre']).count().sort('genre')
    return updated_prediction_genre_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
def calculatePercentages(train_df_count_map,updated_prediction_genre_df):
    calculate_percentage_udf = F.udf(lambda x,y : '{:.2f}%'.format((y/train_df_count_map.get(x))*100))
    true_label_percentages= updated_prediction_genre_df.withColumn('percentage', calculate_percentage_udf(F.col('genre'),F.col('count')))
    true_label_percentages.show()
    plotConfusionMatrix(true_label_percentages,train_df_count_map.keys())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
def plotConfusionMatrix(true_label_percentages,labels):
    result = true_label_percentages.take(true_label_percentages.count())
    result_data = {}
    for row in result:
        temp = result_data.get(row.genre, {})
        temp[row.prediction] = row.percentage
        result_data[row.genre] = temp
    pd_data = [{} for i in range(len(labels))]
    for genre1 in labels:
        for index, genre2 in enumerate(labels):
            pd_data[index][genre1] = result_data[genre1].get(genre2, "0.00%")
    print(pd.DataFrame(pd_data,index=labels))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
processSentenceVectorExploration("TDIDF")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
processSentenceVectorExploration("GoogleEncoder")