In [1]:
from numpy import log
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))

import re

def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        return [w.lower() for w in words]
    except ValueError as e:
        return []

wiki = sc.textFile("/data/wiki/en_articles_part/articles-part", 16).map(parse_article)
stop_words = open("/datasets/stop_words_en.txt", "r").read()

# Get the word counts and find their probability of occurance

In [2]:
wiki_counts = wiki.flatMap(lambda x: x[1:]).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)

In [3]:
# Filter out the stop words
wiki_counts = wiki_counts.filter(lambda x: x[0] not in stop_words)

In [4]:
n_words = wiki_counts.map(lambda x: x[1]).sum()

In [5]:
words_prob = wiki_counts.map(lambda x: (x[0], float(x[1]) / n_words))

# Get the bigrams and their probability

In [6]:
wiki_bigrams = wiki.flatMap(lambda x: zip(x[1:], x[2:]+[''])).map(lambda x: (x[0]+'_'+x[1], 1))

In [7]:
bigram_counts = wiki_bigrams.reduceByKey(lambda x, y: x+y)

In [8]:
n_bigrams = bigram_counts.map(lambda x: x[1]).sum()

In [9]:
bigram_counts_limited = bigram_counts.filter(lambda x: x[1] >= 500)

In [10]:
bigram_prob = bigram_counts_limited.map(lambda x: (x[0], float(x[1])/n_bigrams))

# Calculate the NPMI for the bigrams

In [11]:
# Split the bigrams into their words
bigram_split = bigram_prob.map(lambda x: (x[0], x[0].split('_')[0], x[0].split('_')[1], x[1]))

In [12]:
# Add the probabilities of each word occuring (I would love to see a cleaner way to do this...)
# Note that the final order is (bigram a_b, P(ab), P(a), P(b))
all_probs = bigram_split.map(lambda x: (x[1], (x[0], x[2], x[3]))).join(words_prob)
all_probs = all_probs.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][0][2], x[1][1]))).join(words_prob)
all_probs = all_probs.map(lambda x: (x[1][0][0], x[1][0][1], x[1][0][2], x[1][1]))

In [13]:
# Now we can caluclate the NPMI for each row
npmi = all_probs.map(lambda x: (-log(x[1] / (x[2]*x[3]))/x[1], x[0])).sortByKey(ascending=False).map(lambda x: x[1])

In [14]:
npmi.take(39)

[u'united_states',
 u'new_york',
 u'external_links',
 u'university_press',
 u'united_kingdom',
 u'american_actor',
 u'references_external',
 u'19th_century',
 u'air_force',
 u'20th_century',
 u'years_later',
 u'american_actress',
 u'north_american',
 u'north_america',
 u'york_city',
 u'new_zealand',
 u'prime_minister',
 u'american_baseball',
 u'soviet_union',
 u'united_nations',
 u'high_school',
 u'american_football',
 u'baseball_player',
 u'american_singer-songwriter',
 u'south_africa',
 u'catholic_church',
 u'roman_catholic',
 u'notes_references',
 u'took_place',
 u'roman_empire',
 u'supreme_court',
 u'los_angeles',
 u'san_francisco']