In [1]:
#! /usr/bin/env python

from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local[2]"))

import math
import re

In [3]:
with open('/datasets/stop_words_en.txt') as f:
    stop_words = [word.strip() for word in f]

In [17]:
def npmi(word1, word2, count, word_count_dict, total_words, total_pairs):
    p_a = word_count_dict[word1] / total_words
    p_b = word_count_dict[word2] / total_words
    p_a_b = count / total_pairs
    return - math.log(p_a_b / (p_a * p_b)) / math.log(p_a_b) 

In [5]:
def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        words = [word.lower() for word in words]
        return words
    except ValueError as e:
        return []

In [6]:
wiki = sc.textFile("/data/wiki/en_articles_part/articles-part", 16).map(parse_article)

wiki_filtered = wiki.filter(lambda x: x != [])
wiki_filtered = wiki_filtered.map(lambda x: [word for word in x if word not in stop_words])

In [7]:
total_words = wiki_filtered.map(lambda words: len(words)).reduce(lambda a,b: a+b)

wiki_mapped = wiki_filtered.flatMap(lambda words: list(zip(words[:-1], words[1:])))

total_pairs = wiki_mapped.map(lambda words: 1).reduce(lambda x,y: x+y)

In [8]:
word_occur = wiki_filtered.flatMap(lambda words: [(word, 1) for word in words]).reduceByKey(lambda x,y: x+y)
word_occur = word_occur.collect()
word_count_dict = dict()
for word, count in word_occur:
    word_count_dict[word] = count

In [35]:
total_words = float(total_words)
total_pairs = float(total_pairs)

In [9]:
wiki_mostoccur = wiki_mapped.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).filter(lambda x: x[1] >= 500)
wiki_npmi = wiki_mostoccur.map(lambda (word, count): (word, npmi(word[0], word[1], count, 
                                                                 word_count_dict, total_words, total_pairs)))

In [37]:
wiki_npmi =  wiki_npmi.map(lambda (word, npmi_value): (npmi_value, word)) \
                      .sortByKey(ascending=False) \
                      .map(lambda (npmi_value, word): word[0]+'_'+word[1])

for pair in wiki_npmi.take(39):
    print "%s" % (pair)

los_angeles
external_links
united_states
prime_minister
san_francisco
et_al
new_york
supreme_court
19th_century
20th_century
references_external
soviet_union
air_force
baseball_player
university_press
roman_catholic
united_kingdom
references_reading
notes_references
award_best
north_america
new_zealand
civil_war
catholic_church
world_war
war_ii
south_africa
took_place
roman_empire
united_nations
american_singer-songwriter
high_school
american_actor
american_actress
american_baseball
york_city
american_football
years_later
north_american
