#Author : Yogiraj Awati
from pyspark.ml.feature import PCA,Tokenizer,RegexTokenizer
from pyspark.ml.linalg import Vectors
import codecs

In [None]:
from pyspark import SparkContext
sc = SparkContext("local","simple app")

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
CONFERENCE_PREFIX="#*"
publicationPath = "/home/yogi/Desktop/hw3/publications.txt"
stopWordsPath="/home/yogi/Desktop/hw3/stopwords_english.txt"

In [None]:
#Read data
destinationFile = sc.textFile(publicationPath)
#get titles
titlesRdd = destinationFile.filter(lambda x: x!="" and x.startswith(CONFERENCE_PREFIX)).map(lambda line : line[2:])

Helper Functions

In [None]:
#Tokenize words
from pyspark.sql import Row
row = Row("val")
publicationTitles = titlesRdd.map(row).toDF()
tokenizer = RegexTokenizer(inputCol="val",outputCol="words",pattern="\\W")
tokenizedWords = tokenizer.transform(publicationTitles) 
# we get list of rows where each row is a sentence and list of words in that sentence

In [None]:
#Count words and create sparse vectors
from pyspark.ml.feature import CountVectorizer

TOP_FREQUENT_WORDS = 1000

def perform_countvector_operation(tokenizedWords,inputColName):
    cv = CountVectorizer(inputCol=inputColName, outputCol="features", vocabSize=TOP_FREQUENT_WORDS)
    # fit a CountVectorizerModel from the corpus.
    cv_model = cv.fit(tokenizedWords)
    count_vectors = cv_model.transform(tokenizedWords)
    print ("Count Vectors: ")
    count_vectors.show(truncate=True)
    return cv_model,count_vectors

def perfom_pca(count_vectors,NUMBER_OF_COMPONENTS):
    pca = PCA(k=NUMBER_OF_COMPONENTS, inputCol="features", outputCol="pcaFeatures")
    pca_model = pca.fit(count_vectors)
    pca_result = pca_model.transform(count_vectors).select("pcaFeatures")
    pca_result.show(truncate=True)
    return pca_model,pca_result

import matplotlib.pyplot as plt
def render_graph(pca_model):
    eigen_values = pca_model.explainedVariance
    plt.plot(eigen_values)
    plt.ylabel("Eigen Value")
    plt.xlabel("Principal Component")
    print ("Drawing plot:")
    plt.show()
    
import numpy as np

def calculateKofTotalVariance(pca_model,k):
    eigen_values = pca_model.explainedVariance
    totalSumOfEigenValues = np.sum(eigen_values) # get total sum of the eigen_values
    requiredSum = (k * totalSumOfEigenValues)/100 # get 50 % of the total sum
    numberofComponents = 0
    cumsum = 0
    for v in eigen_values:
        cumsum = cumsum + v
        numberofComponents = numberofComponents + 1
        if cumsum >= requiredSum:
            print ("Number of components required for 50% of the total variance: ",numberofComponents)
            break
def calculateImportantWords(cv_model,pca_model):
    vocab = cv_model.vocabulary
    pc_array = pca_model.pc.toArray()
    transpose_array = pc_array.transpose()
    words = set() #stores unique words from whose value is > 0.2
    count = 0
    loop = True
    while (loop):
        for index,value in enumerate (transpose_array[count]): # for each row in transpose matrix
            if (np.abs(value) > 0.2) :
                # fetch corresponsing word from the vocab
                words.add(vocab[index])
        count = count + 1
        if(count == len(transpose_array) - 1): #until last index of the array
              loop = False
    print ("Important Unique Words: ",len(words))
    print (words)

def render_scatter_plot(pca_result):
    sampleData = pca_result.take(1000)
    scatterInput=[]
    for s in sampleData:
        scatterInput.append(s[0]) #convert to array
    transpose_input= np.array(scatterInput).transpose() #take transpose

    xAxis=transpose_input[0]
    yAxis=transpose_input[1]

    plt.scatter(xAxis,yAxis)
    print("Scatter plot: ")
    plt.show()

def performOperation(tokenizedWords,inputColName):
    s_cv_model,s_count_vectors= perform_countvector_operation(tokenizedWords,inputColName)
    s_pca_model,s_pca_result = perfom_pca(s_count_vectors,50) #do pca analysis
    render_graph(s_pca_model)
    calculateKofTotalVariance(s_pca_model,50)
    calculateImportantWords(cv_model,pca_model)
    render_scatter_plot(s_pca_result)

2A) Transform titles to word count vectors. Truncate your (sparse) vectors to the 1000 most frequent words and perform PCA with 50 components on the counts.

In [None]:
cv_model,count_vectors= perform_countvector_operation(tokenizedWords,"words")

In [None]:
#Do PCA Analysis
#PCA for 50 components
pca_model,pca_result = perfom_pca(count_vectors,50) #do pca analysis

2B)
Plot the eigenvalues of the principal components. Calculate how many components
are needed to explain 50% of the total variance?

In [None]:
render_graph(pca_model)

In [None]:
calculateKofTotalVariance(pca_model,50)

2C)Identify which words are important in each of the principal components. To do so, take the sum of squares of each of the component vectors to check how they are normalized. For each component, then print out the words for which the
absolute value of the component is larger than 0.20 of the norm.

In [None]:
calculateImportantWords(cv_model,pca_model)

2D) Make a scatter plot of some reasonably sized sample (1k-10k titles). 
Explain the structure (or lack thereof) you see based on your results from item b-c.

In [None]:
render_scatter_plot(pca_result)

2E) Run a preprocessing step to remove stop words (a list of stop words is provided which is identical list used in Spark). Rerun steps b-d and evaluate whether this has improved your representation.

In [None]:
from pyspark.ml.feature import StopWordsRemover

stopWordsPointer = sc.textFile(stopWordsPath)
stopWordsList = stopWordsPointer.collect()
stopWordsRemover = StopWordsRemover(inputCol = "words",outputCol = "reducedWords",stopWords=stopWordsList)

validWords = stopWordsRemover.transform(tokenizedWords)

s_cv_model,s_count_vectors= perform_countvector_operation(validWords,"reducedWords")
s_pca_model,s_pca_result = perfom_pca(s_count_vectors,50) #do pca analysis
render_graph(s_pca_model)
calculateKofTotalVariance(s_pca_model,50)
calculateImportantWords(s_cv_model,s_pca_model)
render_scatter_plot(s_pca_result)

2F) Calculate TF-IDF features for all titles and rerun the operations in parts b-d of this exercise. How have your results changed?

In [None]:
from pyspark.mllib.feature import HashingTF,IDF
def performTF_IDF(dataFrame):
    idf=IDF()
    model=idf.fit(dataFrame)
    result = model.transform(dataFrame)
    return result

tfIdfResult = performTF_IDF(s_count_vectors)
performOperation(tfIdfResult,"idfFeatures")

construct two lists of titles: For venues NIPS and VLDB

2I) Merge the two sets of titles. Construct both word count vectors and TF-IDF features. Repeat steps b-d and compare word count results to TF-IDF results.

In [None]:
NIPS_title_set = set()
VLDB_title_set = set()
with open(publicationPath, "r") as fileReader:
    PUBLICATION_VENUE_PREFIX = "#c"
    TITLE_PREFIX="#*"
   
    for eachLine in fileReader:  
        if eachLine.startswith(TITLE_PREFIX):
            readingPublication = True
            title = eachLine[2:].strip()         
        elif eachLine.startswith(PUBLICATION_VENUE_PREFIX) and readingPublication == True:
            venue = eachLine[2:].strip()
            if(venue in "NIPS" and title != ""):
                NIPS_title_set.add(title)
            if(venue in "VLDB" and title != ""):
                VLDB_title_set.add(title)
            title=""      
        elif eachLine == "\n":
            readingPublication = False
            title=""

In [None]:
print(len (NIPS_title_set))
print(len (VLDB_title_set))

In [None]:
titles = NIPS_title_set.union(VLDB_title_set)
publicationTitles = sc.parallelize(titles).map(row).toDF()
tokenizer = RegexTokenizer(inputCol="val",outputCol="words",pattern="\\W")
tokenizedWords = tokenizer.transform(publicationTitles) 
performOperation(tokenizedWords,"words") #performs all the operation


In [None]:
print (len (titles))

In [None]:
validWords = stopWordsRemover.transform(tokenizedWords)
s_cv_model,s_count_vectors= perform_countvector_operation(validWords,"reducedWords")

In [None]:
tfIdfResult = performTF_IDF(s_count_vectors)
performOperation(tfIdfResult,"idfFeatures") #performs all the operation

2J) Now make a scatter plot of these two principal components, showing the titles from each subset in different colors. Again compare word counts and TF-IDF.Did PCA succeed in uncovering the differences between the communities?

In [None]:
s_cv_model,count_vectors= perform_countvector_operation(tokenizedWords,"words")

nips = []
vldb = []
pca = PCA(k=50, inputCol="features", outputCol="pcaFeatures")
pca_model = pca.fit(count_vectors)
pca_result = pca_model.transform(count_vectors)
allTitles = pca_result.collect()

for title in allTitles:
    if title[0] in NIPS_title_set:
        nips.append(title[3])
    if title[0] in VLDB_title_set:
        vldb.append(title[3])

nipsArray = np.array(nips)
nipsTranspose = nipsArray.transpose()
x = nipsTranspose[0]
y = nipsTranspose[1]
plt.scatter(x, y, color='red')

vldbArray = np.array(vldb)
vldbTranspose = vldbArray.transpose()
x = vldbTranspose[0]
y = vldbTranspose[1]
plt.scatter(x, y, color='blue')
plt.show()

In [None]:
s_cv_model,s_count_vectors= perform_countvector_operation(validWords,"reducedWords")
nips = []
vldb = []
pca = PCA(k=50, inputCol="features", outputCol="pcaFeatures")
pca_model = pca.fit(count_vectors)
pca_result = pca_model.transform(count_vectors)
allTitles = pca_result.collect()

for title in allTitles:
    if title[0] in NIPS_title_set:
        nips.append(title[3])
    if title[0] in VLDB_title_set:
        vldb.append(title[3])

nipsArray = np.array(nips)
nipsTranspose = nipsArray.transpose()
x = nipsTranspose[0]
y = nipsTranspose[1]
plt.scatter(x, y, color='red')

vldbArray = np.array(vldb)
vldbTranspose = vldbArray.transpose()
x = vldbTranspose[0]
y = vldbTranspose[1]
plt.scatter(x, y, color='blue')
plt.show()