In [None]:
#! /usr/bin/python2

from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local[2]"))

import sys
import re

if sys.version_info[0] >= 3:
    unicode = str

In [None]:
def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub(r"^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split(r"\W*\s+\W*", text, flags=re.UNICODE)
        return words
    except ValueError as e:
        return []
    
def pairs_starting_from_word(words, first_word='word'):
    pairs = []
    
    for i, word in enumerate(words[:-1]):
        if (word == first_word):
            pair = '{}_{}'.format(word, words[i+1])
            cnt = 1
            pairs.append((pair, cnt))
        else:
            continue
    return pairs

def create_bigrams(words):
    bigrams = []
    for i, word in enumerate(words[:-1]):
        pair = u'_'.join((word, words[i+1]))#.encode('utf-8')
        cnt = 1
        bigrams.append((pair, cnt))
    return bigrams

from math import log
def calc_npmi(pair, cnt, words_occurrences_dict, total_num_of_words, total_num_of_pairs):
        word1, word2 = pair.split('_')
        p_a = words_occurrences_dict[word1] / total_num_of_words
        p_b = words_occurrences_dict[word2] / total_num_of_words

        pmi_ab = cnt / total_num_of_pairs
        pmi_a_b = log(pmi_ab / (p_a * p_b))

        nmpi_a_b = pmi_a_b / -log(pmi_ab)
        return (pair, nmpi_a_b)
    

In [None]:
wiki = sc.textFile("/data/wiki/en_articles_part/articles-part", 16).map(parse_article)

with open('/datasets/stop_words_en.txt', 'r') as f:
    stop_words = set(f.read().split())
    #stop_words = map(str.lower, stop_words_1)
    

# lowercase all words
wiki_lower = wiki.map(lambda words: [x.lower() for x in words])
stop_words_lower = [x.lower() for x in stop_words]

# words not in stop_words_en.txt
wiki_filt = wiki_lower.map(lambda words: [x for x in words if x not in stop_words_lower])

# create bigrams
wiki_bigrams = wiki_filt.flatMap(create_bigrams)

# aggregate counters
wiki_red = wiki_bigrams.reduceByKey(lambda a, b: a + b)

# filter values by counter
wiki_red_filt = wiki_red.filter(lambda pair_cnt: pair_cnt[1] >= 500)

# total number of words
tot_num_words = wiki_filt.map(lambda words: len(words))
tot_num_words = tot_num_words.reduce(lambda a, b: a + b)
tot_num_words

# total number of words pairs
tot_num_pairs = wiki_filt.map(lambda words: len(words) - 1)
tot_num_pairs = tot_num_pairs.reduce(lambda a, b: a + b)
tot_num_pairs

# number of each word occurrences
words_occ = wiki_filt.flatMap(lambda words: [(x, 1) for x in words])
words_occ = words_occ.reduceByKey(lambda a, b: a + b)
#words_occ = words_occ.filter(lambda pair_cnt: pair_cnt[1] >= 500)
words_occ = words_occ.collect()

words_occ_dict = dict()
for item, cnt in words_occ:
    words_occ_dict[item] = cnt

pairs_npmi = wiki_red_filt\
    .map(lambda pair_cnt: calc_npmi(pair_cnt[0], pair_cnt[1], words_occ_dict, tot_num_words, tot_num_pairs))\
    .map(lambda a_b: (a_b[1], a_b[0]))\
    .sortByKey(False)\
    .map(lambda a_b: (a_b[1], a_b[0]))\
    .take(39)

#print(len(pairs_npmi))

for pair, npmi in pairs_npmi:
    print (unicode(pair))