In [83]:
import os
import re
import json
import matplotlib.pyplot as plt
import numpy as np
import string
import pandas as pd
import pyspark.sql.functions as F
import nltk
import cPickle as pickle

from os import listdir
from os.path import isfile, join
from scipy.sparse import csr_matrix
from sklearn.metrics import mean_squared_error
from sklearn.manifold import TSNE
from itertools import chain
from scipy.sparse import coo_matrix
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry, IndexedRow, IndexedRowMatrix
from pyspark.sql import Row
from sklearn.ensemble import AdaBoostClassifier
from sklearn.linear_model import SGDClassifier, RidgeClassifierCV
from operator import add
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType
from __future__ import print_function, unicode_literals, with_statement, absolute_import, generators, nested_scopes, division
from pyspark.sql.functions import mean, stddev
%matplotlib notebook

In [2]:
# Local Spark
# spark session is now the entry point of spark program
# for line 8, can use local[n] for run spark locally with n cores

runLocal = True
if runLocal == True:
    spark = SparkSession.builder \
        .master('local[10]') \
        .appName("Word Count") \
        .config("spark.driver.maxResultSize", "15g")\
        .getOrCreate()
else:
    #p = subprocess.Popen(['sinteractive', '--partition=broadwl', '--cpus-per-task=10', '--time=00:30:00'], stdout=subprocess.PIPE, shell = True)
    #print(p.stdout.read())
    # start spark master program on this machine and register worker nodes
    os.system('start-spark-slurm.sh&') # use & to put it into background
    # get ip address of this machine
    ip = os.popen('hostname -i').read().strip('\n')
    # change 'local' to be ip of master node
    spark = SparkSession.builder \
        .master('spark://' + ip + ':7077') \
        .appName('Word Count') \
        .config("spark.driver.maxResultSize", "15g")\
        .getOrCreate()
    print('Spark Session web UI: http://{}:4040'.format(ip))
    

In [3]:
wiki_file = open('/project/cmsc25025/wikipedia/wiki-text.txt','r').readlines()[0]
num_chunks = 10 #make it multiple of number of your cores
chunks, chunk_size = len(wiki_file), int(len(wiki_file)/num_chunks)
wiki_chunks = [wiki_file[i:i+chunk_size] for i in range(0, chunks, chunk_size)]
wiki_data = spark.sparkContext.parallelize(wiki_chunks)

# Get vocab

In [4]:
def trans(t):
    return t.lower().encode('utf-8').translate(string.maketrans("",""), string.punctuation).strip().split()

def normlizeStr(text, stopwordLst = None, stemmer = None):
    
    sents = nltk.sent_tokenize(text)
    tl = [nltk.word_tokenize(s) for s in sents]
    tokenLst = list(chain(*tl))
    
    #Lowering the case and removing non-words
    workingIter = (w.lower() for w in tokenLst if w.isalpha())

    #Now we can use the semmer, if provided
    if stemmer is not None:
        workingIter = (stemmer.stem(w) for w in workingIter)
        
    #And remove the stopwords
    if stopwordLst is not None:
        workingIter = (w for w in workingIter if w not in stopwordLst)
            
    return list(workingIter)

#initialize our stemmer and our stop words
stop_words_nltk = nltk.corpus.stopwords.words('english')
snowball = nltk.stem.snowball.SnowballStemmer('english')
wordnet = nltk.stem.WordNetLemmatizer()

# def rmv(l):
#     l = filter(lambda a: (a in vocab)==True, l)

In [8]:
words = wiki_data.flatMap(lambda x: trans(x)).collect()
word_dict = nltk.FreqDist(words)
for key in word_dict.keys():
    if word_dict[key] <= 500:
        word_dict.pop(key, None)
#len(word_dict): 13343
vocab = sorted(word_dict.keys())
vocab_dim = len(vocab)

In [82]:
wiki = wiki_data.flatMap(lambda x: trans(x)).filter(lambda a: (a in vocab)==True).collect()

In [11]:
#wikiRDD_ = wikiRDD.filter(lambda a: (a in vocab)==True).cache()

In [91]:
wiki[:5]

['anarchism', 'originated', 'as', 'a', 'term']

In [90]:
with open("vocab.txt", "w") as v_file:
    for item in vocab:
        v_file.write(item + ", ")

In [89]:
with open("wiki.txt", "w") as text_file:
    for item in wiki:
        text_file.write(item + ", ")

# PMI Embeddings

In [64]:
M = csr_matrix((vocab_dim, vocab_dim), dtype=np.int8)

In [78]:
vect_dict = {}
for i in range(vocab_dim):
    data = np.array([1])
    row = np.array([i])
    col = np.array([0])
    vect_dict[vocab[i]] = csr_matrix((data, (row, col)), (vocab_dim, 1))

In [92]:
text_len = len(wiki)

In [None]:
for i in range(text_len):
    this_word = wiki[i]
    index = vocab.index(this_word)
    if i<5:
        context = wiki[:i+5]
    elif i>text_len-5:
        context = wiki[i-5:]
    else:
        context = wiki[i-5:i+5]
    count_vect = csr_matrix((vocab_dim, 1), dtype=np.int8)
    for word in context:
        if word != wiki[i]:
            count_vect += vect_dict[word]
    #print()
    M[index] += count_vect.T

In [106]:
vocab.index("aa")

1

In [102]:
count_vect.T.shape

(1, 13343)

(b)Factorize M to obtain word embeddings

In [None]:
U, s, V = scipy.sparse.linalg.svds(scipy.sparse.csr(M), k=50)

# Glove

In [42]:
#pretrained 
p_vectors_file = 'glove.6B.50d.txt'

with open(p_vectors_file, 'r') as f:
    p_vectors = {}
    for line in f:
        try:
            vals = line.rstrip().split(' ')
            p_vectors[vals[0]] = [float(x) for x in vals[1:]]
        except ValueError:
            continue
p_vectsRDD = spark.sparkContext.parallelize(p_vectors.items())

In [88]:
len(p_vectors)

390034

# Experiments

- the PMI embeddings
- the local GloVe embeddings
- the downloaded pretrained GloVe embeddings

## (a) Read your local GloVe embeddings

In [43]:
vocab_file   = 'GloVe/wiki-vocab.txt'
vectors_file = 'GloVe/wiki-vectors.txt'

with open(vocab_file, 'r') as f:
    words = [x.rstrip().split(' ')[0] for x in f.readlines()]

with open(vectors_file, 'r') as f:
    vectors = {}
    for line in f:
        vals = line.rstrip().split(' ')
        vectors[vals[0]] = [float(x) for x in vals[1:]]
        
vocabulary  = {w: idx for idx, w in enumerate(words)}
ivocabulary = {idx: w for idx, w in enumerate(words)}

vectsRDD = spark.sparkContext.parallelize(vectors.items())

## (b) For each of the following words, find the 5 closest words in the embedding space: 
physics, republican, einstein, algebra, fish. 

Report and comment on your results and the differences between the three sets of embeddings.

In [44]:
def find_closest(v, n, label):
    if label == 'local':
        vect = vectors
        RDD = vectsRDD
    elif label == "pre":
        vect = p_vectors
        RDD = p_vectsRDD        
    
    if type(v)== type(' '):
        v = np.array(vect[v])
        
    rdd = RDD.map(lambda x: (x[0], mean_squared_error(v,np.array(x[1]))))
    tuples = sorted(rdd.collect(), key=lambda x: x[1])[1:1+n]
    words = [x[0] for x in tuples]
    return words

In [93]:
word_list = ['physics', 'republican', 'einstein', 'algebra', 'fish']
print("Results from local GloVe embeddings:")
for word in word_list:
    print(word, ": ", find_closest(word,5, 'local'), sep = '')

print("\nResults from pretrained GloVe embeddings:")
for word in word_list:
    print(word, ": ", find_closest(word,5, 'pre'), sep = '')

Results from local GloVe embeddings:
physics: [u'mechanics', u'chemistry', u'mathematics', u'quantum', u'theoretical']
republican: [u'democrat', u'senator', u'representative', u'republicans', u'democrats']
einstein: [u'relativity', u'maxwell', u'bose', u'formulated', u'condensate']
algebra: [u'algebraic', u'algebras', u'mathematics', u'finite', u'boolean']
fish: [u'feed', u'fresh', u'hunting', u'salmon', u'wild']

Results from pretrained GloVe embeddings:
physics: [u'chemistry', u'mathematics', u'theoretical', u'science', u'biology']
republican: [u'gop', u'democrat', u'democratic', u'republicans', u'democrats']
einstein: [u'bohr', u'relativity', u'invented', u'freud', u'gottlieb']
algebra: [u'geometry', u'homological', u'algebraic', u'associative', u'analytic']
fish: [u'salmon', u'shrimp', u'meat', u'bird', u'wild']


## (c) france :  paris ::  england :  ?

In [57]:
def get_analogy(a,b,c,local):
    if local == True:
        v = np.array(vectors[b])-np.array(vectors[a])+np.array(vectors[c])
        print("Result from local GloVe embeddings:", find_closest(v,1,'local')[0])
    p_v = np.array(p_vectors[b])-np.array(p_vectors[a])+np.array(p_vectors[c])
    print("Result from pretrained GloVe embeddings:", find_closest(p_v,1,"pre")[0])

In [53]:
get_analogy('france','paris','england', True)

Result from local GloVe embeddings: edinburgh
Result from pretrained GloVe embeddings: london


## (d) Analogies

In [14]:
path = '/project/cmsc25025/assn4_analogy'
files = [f for f in listdir(path) if isfile(join(path, f))]

In [30]:
def rmv(t):
    words = filter(lambda x: x!= "", re.split('[^a-zA-Z]', t))
    return words

In [37]:
analogies = []
for fi in files:
    with open(path+"/"+fi, 'r') as f:
        words = [rmv(x) for x in f.readlines()]
        analogies+= words

In [61]:
for analogy in analogies:
    try: 
        a, b, c = analogy
        print(a+' : '+b+' = '+c+": ?")
    except: 
        print("Wrong format of the analogy\n")
        
    try:
        get_analogy(a,b,c, False)
    except:
        print("Word not in library.\n")
        continue
    print()

segregate : unify = damage: ?
Result from pretrained GloVe embeddings: overcome

argentina : brazil = iran: ?
Result from pretrained GloVe embeddings: tehran

ashes : urn = money: ?
Result from pretrained GloVe embeddings: paying

telephone : cord = television: ?
Result from pretrained GloVe embeddings: spinal

walk : roam = speak: ?
Result from pretrained GloVe embeddings: resent

novel : epilogue = meal: ?
Result from pretrained GloVe embeddings: meal

chinese : china = french: ?
Result from pretrained GloVe embeddings: french

evening : morning = dinner: ?
Result from pretrained GloVe embeddings: lunch

left : right = horizontal: ?
Result from pretrained GloVe embeddings: vertical

theorem : deduce = hypothesis: ?
Result from pretrained GloVe embeddings: decipher

h : water = bird: ?
Result from pretrained GloVe embeddings: fish

chicken : egg = cow: ?
Result from pretrained GloVe embeddings: cloned

go : green = stop: ?
Result from pretrained GloVe embeddings: yellow

chicago : ill

## (e) Visualize only the pretrained GloVe embeddings in two dimensions

In [96]:
words_ = p_vectors.keys()
W = p_vectsRDD.map(lambda x: np.array(x[1])).collect()

In [98]:
tsne = TSNE(n_components=2, random_state=0)
np.set_printoptions(suppress=True)
Y = tsne.fit_transform(W[:1000])
plt.scatter(Y[:, 0], Y[:, 1])
for label, x, y in zip(words, Y[:, 0], Y[:, 1]):
    plt.annotate(label, xy=(x, y), xytext=(0, 0),textcoords='offset points')
plt.show()

<IPython.core.display.Javascript object>

In this tiny segment of words, we are able to observe that words that surround "parliament" include "go", "territory", "different", "future", "notable", "public", "problem", "independence", "minister" and etc. It's apparent that these words are often used close to "parliament". For example, "go to parliament", "the parliament discuss territory problems", "different parliament could lead to different future".

In [None]:
spark.stop()