In [1]:
from dask_yarn import YarnCluster
from dask.distributed import Client

In [15]:
! pip install nltk spacy



In [2]:
!python3 -m spacy download en_core_web_sm

Collecting en-core-web-sm==3.0.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0-py3-none-any.whl (13.7 MB)
[K     |████████████████████████████████| 13.7 MB 13.5 MB/s eta 0:00:01
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')


In [16]:
import dask
import pickle
from collections import Counter
from nltk.corpus import stopwords
from nltk.stem.wordnet import WordNetLemmatizer
import string
import nltk
import pandas as pd
import spacy
import time

# NOTE: stopwords include pronouns! TODO: make custom stop words list?
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt')

[nltk_data] Downloading package stopwords to /home/hadoop/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /home/hadoop/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt to /home/hadoop/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [17]:
# Create a cluster where each worker has 1 cores and 4 GiB of memory:
cluster = YarnCluster(environment="/home/hadoop/environment.tar.gz",
                      worker_vcores = 1,
                      worker_memory = "4GiB"
                      )

# Scale cluster out to 8 such workers:
cluster.scale(8)

# Connect to the cluster (before proceeding, you should wait for workers to be registered by the dask scheduler, as below):
client = Client(cluster)

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://172.31.27.2:35153
distributed.scheduler - INFO -   dashboard at:                    :36493
distributed.scheduler - INFO - Receive client connection: Client-fa89df54-c57b-11eb-b56f-0a52978c0d3f
distributed.core - INFO - Starting established connection


In [None]:
client

In [19]:
# text cleaning adatped from: https://github.com/xssChauhan/Blog-Posts/blob/master/dask-text-processing/Dask%20Text%20Processing.ipynb

import string
import unicodedata
import re

@dask.delayed
def clean_comments(comments):
    '''
    '''
    comment_lst = []
    for comment in comments:
        if not comment:
            comment_lst.append(None)
            continue
        comment = comment.replace('\n','')
        comment = nltk.word_tokenize(comment)
        comment = list(filter(lambda word: word.isalnum(), comment))
        comment = [word.lower() for word in comment]
        comment_lst.append(comment)
    
    # comment = unicodedata.normalize('NFKD', comment).encode('ascii', 'ignore').decode('ascii') # I dont know if this is working, and we have to remove links
    #             comment = re.sub('\d', '', comment)
    #             comment = re.sub(r"\W+|_", " ", comment)

    return comment_lst
        
            

In [20]:
from nltk.corpus import stopwords

stop_words = stopwords.words("english")
#stop_words.remove(['she', 'her', 'hers', 'herself']) # for topic clustering?

@dask.delayed
def remove_stopwords(comments):
    '''
    '''
    comment_lst = []
    for comment in comments:
        if not comment:
            comment_lst.append(None)
            continue
        no_stop_words = list(filter(lambda comment: comment not in stop_words, comment))
        comment_lst.append(no_stop_words)
    return comment_lst


In [21]:
import spacy

nlp = spacy.load('en_core_web_sm')

@dask.delayed
def lemmatize(comments):
    '''
    '''
    comment_lst = []
    for comment in comments:
        if not comment:
            comment_lst.append(None)
        word_join = nlp(' '.join(comment))
        lemmatized = [word.lemma_ for word in word_join]
        comment_lst.append(lemmatized)
    
    return comment_lst

In [22]:
def clean_text(df):
    '''
    '''
    
    df["cleaned_text"] = clean_comments(df['comment'])
    df["cleaned_text"] = remove_stopwords(df['comment'])
    df["cleaned_text"] = lemmatize(df['comment'])
    
    return df


In [23]:
comments_corpus = pickle.load(open("comments_corpus_RC_2011-08.pickle","rb"))

distributed.utils_perf - INFO - full garbage collection released 16.44 MiB from 6582 reference cycles (threshold: 9.54 MiB)


In [24]:
import dask.dataframe as ddf

# change corpus to dataframe
subreddit_lst = []
comment_lst = []
time_lst = []
for key in comments_corpus:
    for comment, time in comments_corpus[key]:
        subreddit_lst.append(key)
        comment_lst.append(comment)
        time_lst.append(time)
        
comments_df = pd.DataFrame(list(zip(subreddit_lst, comment_lst, time_lst)),
                           columns = ['subreddit', 'comment', 'time created'])

#convert to dask dataframe
dask_dataframe = ddf.from_pandas(comments_df, npartitions=12)

In [25]:
#t0 = time.time()
result = dask_dataframe.map_partitions(clean_text, meta=comments_df)
df = result.compute()
#t1 = time.time()
#print("Time to process with Dask {}".format(t1-t0))


KeyboardInterrupt: 

### testing

In [51]:
subreddit_lst = []
comment_lst = []
time_lst = []
for key in comments_corpus_test:
    for comment, time in comments_corpus_test[key]:
        subreddit_lst.append(key)
        comment_lst.append(comment)
        time_lst.append(time)
        
comments_df = pd.DataFrame(list(zip(subreddit_lst, comment_lst, time_lst)),
                           columns = ['subreddit', 'comment', 'time created'])

comments_df

Unnamed: 0,subreddit,comment,time created
0,gameofthrones,So I should post a picture of the insides of t...,1312156828
1,gameofthrones,It's inside Daenerys the whole time.,1312156896
2,gameofthrones,The Wall wasn't built to keep *wildlings* out...,1312157007
3,gameofthrones,Yeah it's probably the episode that will need ...,1312157061
4,gameofthrones,"Hah, yeah this is the guy that I saw when rese...",1312157198
5,gameofthrones,That one and [this one](http://www.youtube.com...,1312157591
6,gameofthrones,"[aCoK](/b ""It is Theons uncle, he is the one w...",1312157889
7,gameofthrones,What do we get more of? People posting their b...,1312157980
8,gameofthrones,"with hints of stratego, looks to be rather fun.",1312158136
9,gameofthrones,i hope he budgeted in a spray tan.,1312158180


In [15]:
counter = 0
for comment in comments_df['comment']:
    print(comment)
    counter += 1
    if counter == 10:
        break

So I should post a picture of the insides of the books all lined up?
It's inside Daenerys the whole time.
The Wall wasn't built to keep *wildlings* out...
Yeah it's probably the episode that will need to be the more massively modified (in regards of what we will see) so it's a good thing that GRRM is the one calling the shots.
Hah, yeah this is the guy that I saw when researching making my costume. This guy is going to have an amazing costume... I would like to steal it. 
That one and [this one](http://www.youtube.com/watch?v=2T5_0AGdFic) are way better cut and narrated though...
[aCoK](/b "It is Theons uncle, he is the one who meets Theon when he returns to Pyke and he takes him back to their castle to meet his father. He is usually called Damphair and he is a priest")
What do we get more of? People posting their books or people posting about people posting their books?

How about we stop both.
with hints of stratego, looks to be rather fun.
i hope he budgeted in a spray tan.


In [21]:
counter = 0
for key,comment in comments_corpus['gameofthrones']:
    print(key.lower().split(' '))
    
    counter += 1
    if counter == 5:
        break

['so', 'i', 'should', 'post', 'a', 'picture', 'of', 'the', 'insides', 'of', 'the', 'books', 'all', 'lined', 'up?']
["it's", 'inside', 'daenerys', 'the', 'whole', 'time.']
['the', 'wall', "wasn't", 'built', 'to', 'keep', '*wildlings*', 'out...']
['yeah', "it's", 'probably', 'the', 'episode', 'that', 'will', 'need', 'to', 'be', 'the', 'more', 'massively', 'modified', '(in', 'regards', 'of', 'what', 'we', 'will', 'see)', 'so', "it's", 'a', 'good', 'thing', 'that', 'grrm', 'is', 'the', 'one', 'calling', 'the', 'shots.']
['hah,', 'yeah', 'this', 'is', 'the', 'guy', 'that', 'i', 'saw', 'when', 'researching', 'making', 'my', 'costume.', 'this', 'guy', 'is', 'going', 'to', 'have', 'an', 'amazing', 'costume...', 'i', 'would', 'like', 'to', 'steal', 'it.', '']


In [10]:
comments_corpus_test = {'gameofthrones' : comments_corpus['gameofthrones'][0:20]}

In [193]:
comments_corpus_test = clean_comments(comments_corpus_test)
comments_corpus_test

{'gameofthrones': [(['so',
    'i',
    'should',
    'post',
    'a',
    'picture',
    'of',
    'the',
    'insides',
    'of',
    'the',
    'books',
    'all',
    'lined',
    'up'],
   '1312156828'),
  (['it', 'inside', 'daenerys', 'the', 'whole', 'time'], '1312156896'),
  (['the', 'wall', 'was', 'built', 'to', 'keep', 'wildlings', 'out'],
   '1312157007'),
  (['yeah',
    'it',
    'probably',
    'the',
    'episode',
    'that',
    'will',
    'need',
    'to',
    'be',
    'the',
    'more',
    'massively',
    'modified',
    'in',
    'regards',
    'of',
    'what',
    'we',
    'will',
    'see',
    'so',
    'it',
    'a',
    'good',
    'thing',
    'that',
    'grrm',
    'is',
    'the',
    'one',
    'calling',
    'the',
    'shots'],
   '1312157061'),
  (['hah',
    'yeah',
    'this',
    'is',
    'the',
    'guy',
    'that',
    'i',
    'saw',
    'when',
    'researching',
    'making',
    'my',
    'costume',
    'this',
    'guy',
    'is',
    '

In [194]:
comments_corpus_test = remove_stopwords(comments_corpus_test)
comments_corpus_test

['post', 'picture', 'insides', 'books', 'lined']
['inside', 'daenerys', 'whole', 'time']
['wall', 'built', 'keep', 'wildlings']
['yeah', 'probably', 'episode', 'need', 'massively', 'modified', 'regards', 'see', 'good', 'thing', 'grrm', 'one', 'calling', 'shots']
['hah', 'yeah', 'guy', 'saw', 'researching', 'making', 'costume', 'guy', 'going', 'amazing', 'costume', 'would', 'like', 'steal']
['one', 'one', 'http', 'way', 'better', 'cut', 'narrated', 'though']
['acok', 'theons', 'uncle', 'one', 'meets', 'theon', 'returns', 'pyke', 'takes', 'back', 'castle', 'meet', 'father', 'usually', 'called', 'damphair', 'priest']
['get', 'people', 'posting', 'books', 'people', 'posting', 'people', 'posting', 'books', 'stop']
['hints', 'stratego', 'looks', 'rather', 'fun']
['hope', 'budgeted', 'spray', 'tan']
['mother', 'child', 'milk', 'action', 'might', 'work', 'see', 'dragons', 'fit', 'rest', 'qualification']
['think', 'find', 'anyone', 'subreddit', 'disagree']
['suspicious', 'bewbs', 'also']
['spec

{'gameofthrones': [(['post', 'picture', 'insides', 'books', 'lined'],
   '1312156828'),
  (['inside', 'daenerys', 'whole', 'time'], '1312156896'),
  (['wall', 'built', 'keep', 'wildlings'], '1312157007'),
  (['yeah',
    'probably',
    'episode',
    'need',
    'massively',
    'modified',
    'regards',
    'see',
    'good',
    'thing',
    'grrm',
    'one',
    'calling',
    'shots'],
   '1312157061'),
  (['hah',
    'yeah',
    'guy',
    'saw',
    'researching',
    'making',
    'costume',
    'guy',
    'going',
    'amazing',
    'costume',
    'would',
    'like',
    'steal'],
   '1312157198'),
  (['one', 'one', 'http', 'way', 'better', 'cut', 'narrated', 'though'],
   '1312157591'),
  (['acok',
    'theons',
    'uncle',
    'one',
    'meets',
    'theon',
    'returns',
    'pyke',
    'takes',
    'back',
    'castle',
    'meet',
    'father',
    'usually',
    'called',
    'damphair',
    'priest'],
   '1312157889'),
  (['get',
    'people',
    'posting',
    '