In [1]:
from __future__ import division
from pyspark import SparkConf, SparkContext
import re

In [2]:
try:
    sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local").set("spark.cores.max", "4"))
except:
    pass

In [3]:
def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        return words
    except ValueError as e:
        return []

In [4]:
wiki = sc.textFile("/data/wiki/en_articles_part/articles-part", 16).map(parse_article)

In [5]:
with open('/datasets/stop_words_en.txt', 'r') as f:
    stop_words = set(f.read().split())

In [6]:
def create_bigrams(words):
    bigrams = []
    for i, word in enumerate(words[:-1]):
        pair = u'_'.join((word, words[i+1])).encode('utf-8')
        cnt = 1
        bigrams.append((pair, cnt))
    return bigrams

In [7]:
from math import log
def calc_npmi(pair, cnt, words_occurrences_dict, total_num_of_words, total_num_of_pairs):
    word1, word2 = pair.split('_')
    p_a = words_occurrences_dict[word1] / total_num_of_words
    p_b = words_occurrences_dict[word2] / total_num_of_words
    
    pmi_ab = cnt / total_num_of_pairs
    pmi_a_b = log(pmi_ab / (p_a * p_b))
    
    nmpi_a_b = pmi_a_b / -log(pmi_ab)
    return (pair, nmpi_a_b)

In [8]:
wiki_lower = wiki.map(lambda words: [x.lower() for x in words])

wiki_filt = wiki_lower.map(lambda words: [x for x in words if x not in stop_words])

wiki_bigrams = wiki_filt.flatMap(create_bigrams)

wiki_red = wiki_bigrams.reduceByKey(lambda a, b: a + b)

wiki_red_filt = wiki_red.filter(lambda (pair, cnt): cnt >= 500)

In [9]:
tot_num_words = wiki_filt.map(lambda words: len(words))
tot_num_words = tot_num_words.reduce(lambda a, b: a + b)

In [None]:
tot_num_pairs = wiki_filt.map(lambda words: len(words) - 1)
tot_num_pairs = tot_num_pairs.reduce(lambda a, b: a + b)

In [None]:
words_occ = wiki_filt.flatMap(lambda words: [(x, 1) for x in words])
words_occ = words_occ.reduceByKey(lambda a, b: a + b)
words_occ = words_occ.filter(lambda (pair, cnt): cnt >= 500)
words_occ = words_occ.collect()

words_occ_dict = dict()
for item, cnt in words_occ:
    words_occ_dict[item] = cnt

In [None]:
pairs_npmi = wiki_red_filt\
    .map(lambda (pair, cnt): calc_npmi(pair, cnt, words_occ_dict, tot_num_words, tot_num_pairs))\
    .map(lambda (a, b): (b, a))\
    .sortByKey(False)\
    .map(lambda (a, b): (b, a))\
    .take(39)

In [None]:
for pair, npmi in pairs_npmi:
    print pair