In [29]:
from __future__ import division
from pyspark import SparkConf, SparkContext
import re

In [2]:
try:
    sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local").set("spark.cores.max", "4"))
except:
    pass

## Demo

In [3]:
def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        return words
    except ValueError as e:
        return []

In [6]:
wiki = sc.textFile("/data/wiki/en_articles_part/articles-part", 4).map(parse_article)

In [7]:
result = wiki.take(1)[0]
for word in result[:50]:
    print word

Anarchism
Anarchism
is
often
defined
as
a
political
philosophy
which
holds
the
state
to
be
undesirable
unnecessary
or
harmful
The
following
sources
cite
anarchism
as
a
political
philosophy
Slevin
Carl
Anarchism
The
Concise
Oxford
Dictionary
of
Politics
Ed
Iain
McLean
and
Alistair
McMillan
Oxford
University
Press
2003
However
others
argue


## Pairs
### Intro
In this assignment you will use Spark to compute various statistics for word pairs. At the same time, you will learn some simple techniques of natural language processing.

Dataset location: /data/wiki/en_articles

Format: article_id <tab> article_text

While parsing the articles, do not forget about Unicode (even though this is an English Wikipedia dump, there are many characters from other languages), remove punctuation marks and transform words to lowercase to get the correct quantities.

### Task
Find all the pairs of two consequent words where the first word is “narodnaya”. For each pair, count the number of occurrences in the Wikipedia dump. Print all the pairs with their count in a lexicographical order. Output format is “word_pair <tab> count”, for example:

```
red_apple	100500

crazy_zoo	42
```

Note that two words in a pair are concatenated with the underscore character, and the result is in the lowercase.

One motivation for counting these continuations is to get a better understanding of the language. Some words, like “the”, have a lot of continuations, while others, like “San”, have just a few (“San Francisco”, for example). One can build a language model with these statistics. If you are interested to learn more, search for “n-gram language model” in the Internet.

In [8]:
def pairs_starting_from_word(words, first_word='word'):
    pairs = []
    
    for i, word in enumerate(words[:-1]):
        if (word == first_word):
            pair = '{}_{}'.format(word, words[i+1])
            cnt = 1
            pairs.append((pair, cnt))
        else:
            continue
    return pairs

In [9]:
# lowercase all words
wiki_lower = wiki.map(lambda words: [x.lower() for x in words])

# find pairs starting from defined word
wiki_pairs = wiki_lower.flatMap(lambda x: pairs_starting_from_word(x, 'narodnaya'))

# filtering empty elements
wiki_pairs = wiki_pairs.filter(lambda x: x != [])

# aggregate counters
wiki_red = wiki_pairs.reduceByKey(lambda a, b: a + b, numPartitions=16)

# sort values by key
wiki_red_sorted = wiki_red.sortByKey()

In [10]:
result = wiki_red_sorted.collect()
for pair, cnt in result:
    print '{}\t{}'.format(pair, cnt)

narodnaya_gazeta	1
narodnaya_volya	9


## Collocations
### Intro
PMI of two words, a & b, is defined as “PMI(a, b) = ln (P(ab) / (P(a) * P(b))”, where P(ab) is the probability of two words coming one after the other, and P(a) and P(b) are probabilities of words a & b respectively.

You will estimate probabilities with occurrence counts, that is “P(a) = # of occurrences of word a / total number of words”, and “P(ab) = # of occurrences of words ‘a b’ / total number of word pairs”.

To build an intuition behind the definition, consider the following cases:

- ***“roman empire”;*** assume that this is a unique combination, and every occurrence of “roman” is followed by “empire”, and, vice versa, every occurrence of “empire” is preceded by “roman”. In this case, “P(ab) = P(a) = P(b)”, so “PMI(a, b) = -ln P(a) = -ln P(b)”. This quantity increases when the probability of the collocation is low.

- ***“the doors”;*** let’s assume that “the” may occur with every word, independently. Thus, “P(ab) = P(a)*P(b)”, and “PMI(a, b) = ln 1 = 0”.

- ***“green idea / sleeps furiously”;*** when two words never occur together, “P(ab) = 0”, and “PMI(a, b) = -inf”. Therefore, rare combinations of coupled words have large PMI.

### Task
As for the second part of the assignment, your task is to extract collocations: that is word combinations that occur together. For example, “high school” or “roman empire”.

To find collocations, you will use NPMI (normalized pointwise mutual information) metric.

PMI of two words, a & b, is defined as “PMI(a, b) = ln (P(ab) / (P(a) * P(b))”, where P(ab) is the probability of two words coming one after the other, and P(a) and P(b) are probabilities of words a & b respectively.

You will estimate probabilities with occurrence counts, that is “P(a) = # of occurrences of word a / total number of words”, and “P(ab) = # of occurrences of words ‘a b’ / total number of word pairs”.

Therefore, rare combinations of coupled words have large PMI.

NPMI is computed as “NPMI(a, b) = PMI(a, b) / -ln P(ab)”. This normalizes the quantity to be within the range [-1; 1].

You task is a bit more complicated now:

- Extract all the words, as in the previous task.
- Filter out stopwords using the dictionary (/datasets/stop_words_en.txt ) (do not forget to convert words to the lowercase!)
- Compute all bigrams (that is, pairs of consequent words)
- Leave only bigrams with at least 500 occurrences
- Compute NPMI for every bigram (note: when computing probabilities, you need unpruned counts!)
- Sort word pairs by NPMI in the descending order
- Print top 39 word pairs, with words delimited by the underscore “_”

For example,

```
roman_empire

south_africa
```

***Hint:*** if you did everything right, “roman_empire” and “south_africa” are going to be in the result.

In [11]:
with open('/datasets/stop_words_en.txt', 'r') as f:
    stop_words = set(f.read().split())

print(len(stop_words))

319


In [12]:
stop_words

{'a',
 'about',
 'above',
 'across',
 'after',
 'afterwards',
 'again',
 'against',
 'all',
 'almost',
 'alone',
 'along',
 'already',
 'also',
 'although',
 'always',
 'am',
 'among',
 'amongst',
 'amoungst',
 'amount',
 'an',
 'and',
 'another',
 'any',
 'anyhow',
 'anyone',
 'anything',
 'anyway',
 'anywhere',
 'are',
 'around',
 'as',
 'at',
 'back',
 'be',
 'became',
 'because',
 'become',
 'becomes',
 'becoming',
 'been',
 'before',
 'beforehand',
 'behind',
 'being',
 'below',
 'beside',
 'besides',
 'between',
 'beyond',
 'bill',
 'both',
 'bottom',
 'but',
 'by',
 'call',
 'can',
 'cannot',
 'cant',
 'co',
 'computer',
 'con',
 'could',
 'couldnt',
 'cry',
 'de',
 'describe',
 'detail',
 'do',
 'done',
 'down',
 'due',
 'during',
 'each',
 'eg',
 'eight',
 'either',
 'eleven',
 'else',
 'elsewhere',
 'empty',
 'enough',
 'etc',
 'even',
 'ever',
 'every',
 'everyone',
 'everything',
 'everywhere',
 'except',
 'few',
 'fifteen',
 'fify',
 'fill',
 'find',
 'fire',
 'first',
 'f

In [13]:
def create_bigrams(words):
    bigrams = []
    for i, word in enumerate(words[:-1]):
        pair = u'_'.join((word, words[i+1])).encode('utf-8')
        cnt = 1
        bigrams.append((pair, cnt))
    return bigrams

In [41]:
from math import log
def calc_npmi(pair, cnt, words_occurrences_dict, total_num_of_words, total_num_of_pairs):
    word1, word2 = pair.split('_')
    p_a = words_occurrences_dict[word1] / total_num_of_words
    p_b = words_occurrences_dict[word2] / total_num_of_words
    
    pmi_ab = cnt / total_num_of_pairs
    pmi_a_b = log(pmi_ab / (p_a * p_b))
    
    nmpi_a_b = pmi_a_b / -log(pmi_ab)
    return (pair, nmpi_a_b)

In [15]:
# lowercase all words
wiki_lower = wiki.map(lambda words: [x.lower() for x in words])

# words not in stop_words_en.txt
wiki_filt = wiki_lower.map(lambda words: [x for x in words if x not in stop_words])

# create bigrams
wiki_bigrams = wiki_filt.flatMap(create_bigrams)

# aggregate counters
wiki_red = wiki_bigrams.reduceByKey(lambda a, b: a + b)

# filter values by counter
wiki_red_filt = wiki_red.filter(lambda (pair, cnt): cnt >= 500)

In [16]:
# total number of words
tot_num_words = wiki_filt.map(lambda words: len(words))
tot_num_words = tot_num_words.reduce(lambda a, b: a + b)
tot_num_words

6971026

In [17]:
# total number of words pairs
tot_num_pairs = wiki_filt.map(lambda words: len(words) - 1)
tot_num_pairs = tot_num_pairs.reduce(lambda a, b: a + b)
tot_num_pairs

6966926

In [18]:
# number of each word occurrences
words_occ = wiki_filt.flatMap(lambda words: [(x, 1) for x in words])
words_occ = words_occ.reduceByKey(lambda a, b: a + b)
words_occ = words_occ.filter(lambda (pair, cnt): cnt >= 500)
words_occ = words_occ.collect()

words_occ_dict = dict()
for item, cnt in words_occ:
    words_occ_dict[item] = cnt

In [49]:
pairs_npmi = wiki_red_filt\
    .map(lambda (pair, cnt): calc_npmi(pair, cnt, words_occ_dict, tot_num_words, tot_num_pairs))\
    .map(lambda (a, b): (b, a))\
    .sortByKey(False)\
    .map(lambda (a, b): (b, a))\
    .take(39)

In [51]:
print(len(pairs_npmi))

39


In [50]:
for pair, npmi in pairs_npmi:
    print pair

los_angeles
external_links
united_states
prime_minister
san_francisco
et_al
new_york
supreme_court
19th_century
20th_century
references_external
soviet_union
air_force
baseball_player
university_press
roman_catholic
united_kingdom
references_reading
notes_references
award_best
north_america
new_zealand
civil_war
catholic_church
world_war
war_ii
south_africa
took_place
roman_empire
united_nations
american_singer-songwriter
high_school
american_actor
american_actress
american_baseball
york_city
american_football
years_later
north_american
