Import PySpark modules

In [1]:
from pyspark import SparkConf
from pyspark import SparkContext

Import the NLTK package

In [2]:
import nltk

Create a SparkContext that uses the YARN resource manager and specifies a name for the application

In [3]:
conf = SparkConf()
conf.setMaster('yarn-client')
conf.setAppName('spark-nltk')
sc = SparkContext(conf=conf)

### RDD Creation

Specify the location of the data on the cluster nodes

In [40]:
data = sc.wholeTextFiles('file:///usr/share/nltk_data/corpora/state_union/*-*.txt')

In [41]:
# data = sc.textFile('file:///usr/share/nltk_data/corpora/state_union/')

In [42]:
data.take(1)

[(u'file:/usr/share/nltk_data/corpora/state_union/1961-Kennedy.txt',
  u'PRESIDENT JOHN F. KENNEDY\'S SPECIAL MESSAGE TO THE CONGRESS ON URGENT NATIONAL NEEDS\n \nMay 25, 1961\n\nMr. Speaker, Mr. Vice President, my copartners in Government, gentlemen-and ladies:\nThe Constitution imposes upon me the obligation to "from time to time give to the Congress information of the State of the Union." While this has traditionally been interpreted as an annual affair, this tradition has been broken in extraordinary times.\nThese are extraordinary times. And we face an extraordinary challenge. Our strength as well as our convictions have imposed upon this nation the role of leader in freedom\'s cause.\nNo role in history could be more difficult or more important. We stand for freedom.\nThat is our conviction for ourselves--that is our only commitment to others. No friend, no neutral and no adversary should think otherwise. We are not against any man--or any nation--or any system--except as it is h

### Word count

In [47]:
year_rdd = data.map(lambda x: x[0].split('-')[0].split('state_union/')[1])
year = year_rdd.collect()
year = map(int,year)

Map the text data to the word_tokenize function in NLTK

In [54]:
words_rdd = data.map(lambda x: len(nltk.word_tokenize(x[1])))
words = words_rdd.collect()

In [56]:
print(words)

[6550, 3578, 5896, 5510, 8432, 1857, 7130, 4701, 4555, 4587, 5577, 3808, 7640, 10418, 9056, 6199, 4987, 6018, 5286, 4836, 7714, 5611, 8383, 4617, 30004, 5600, 6462, 6230, 5534, 5757, 7485, 8089, 8586, 3769, 3715, 4511, 5552, 4472, 4415, 1860, 2108, 4189, 6623, 5043, 5582, 3626, 5172, 6653, 5684, 6213, 8150, 4450, 4668, 4922, 5614, 6133, 5369, 10387, 6139, 7810, 4012, 4392, 6310, 5911, 3292]


Return ten of the words in the dataset

In [35]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

In [38]:
sqlContext.createDataFrame(words, ["id","words"])

TypeError: Can not infer schema for type: <type 'int'>

NTLK’s POS-tagger can be used to find the part of speech for each word

In [10]:
pos_word = words.map(lambda x: nltk.pos_tag([x]))

Return five of the words in the dataset

### Named Entity Recognition

Import named-entity recognition library.

In [12]:
import geograpy

In [13]:
places = data.map(lambda x: x[0] + "\t\t" + str(len(x[1])))

In [14]:
places.take(5)

[u'file:/usr/share/nltk_data/corpora/state_union/1961-Kennedy.txt\t\t35090',
 u'file:/usr/share/nltk_data/corpora/state_union/2001-GWBush-2.txt\t\t18036',
 u'file:/usr/share/nltk_data/corpora/state_union/1992-Bush.txt\t\t28674',
 u'file:/usr/share/nltk_data/corpora/state_union/1958-Eisenhower.txt\t\t30324',
 u'file:/usr/share/nltk_data/corpora/state_union/1994-Clinton.txt\t\t42133']

In [15]:
text = "My name is Zach Carwile and I live in the United States."
test = geograpy.get_place_context(text=unicode(text,encoding='utf-8', errors='ignore'))
print(test)

<geograpy.places.PlaceContext object at 0x7f4eded2b490>


In [16]:
def getPlaces(text):
    places = geograpy.get_place_context(text=text) # WHAT'S WRONG WITH THIS ONE LINE??!??
    return places

In [23]:
places = data.map(lambda x: getPlaces(x[1]))
p = places.take(1)

In [28]:
import time
start_time = time.time()
p = places.collect()
print("--- %s seconds ---" % (time.time() - start_time))

--- 320.555058002 seconds ---


In [31]:
p[3].country_mentions

[(u'United Kingdom', 1),
 (u'United States', 1),
 (u'Russian Federation', 1),
 (u'Turkey', 1)]

### Visualization

In [50]:
# acluster conda install pandas bokeh

In [51]:
from bokeh.charts import Bar
from bokeh.plotting import ColumnDataSource
from bokeh.io import output_notebook, show
from bokeh.models import HoverTool
output_notebook()

In [57]:
p = Bar(words)
show(p)