In [11]:
# standard imports
import pandas as pd, numpy as np 

# prefect  
from prefect import Flow, task
from prefect.engine.executors import DaskExecutor

# NLP 
from sklearn.feature_extraction import text 
from sklearn.feature_extraction.text import CountVectorizer 
from sklearn.decomposition import TruncatedSVD


# Dask 
from dask.distributed import Client
from dask_saturn import SaturnCluster

#misc files/text processing
import pickle
import re
import collections
import pprint
import re
import string


In [2]:
# ! pip install prefect 
# ! pip install bson
# ! pip install jupyter_nbextensions_configurator
# ! jupyter nbextensions_configurator enable --user


## EDA 

In [24]:
# load data
with open('./data/twenty16_debate_corpus.pkl', 'rb') as f:
       corpus = pickle.load(f)

In [4]:
corpus[50]

{'Speaker': 'Woodruff',
 'Text': 'Secretary Clinton, your campaign -- you and your campaign have made a clear appeal to women voters. You have talked repeatedly about the fact, we know you would be, if elected, the first woman president. But in New Hampshire 55 percent of the women voters supported and voted for Senator Sanders. What are women missing about you?',
 'Date': '2/11/2016',
 'Party': 'Democratic',
 'Location': 'Milwaukee, Wisconsin',
 'URL': 'http://www.presidency.ucsb.edu/ws/index.php?pid=111520',
 '_id': ObjectId('5dc1cc86619bc07aa810ddee')}

# Stop words

In [25]:
stop_words = text.ENGLISH_STOP_WORDS.union(['im','dont','need','want','senator','governor','know',
                                           'come','theyre','youre','going','think','said','thats',
                                           'just','make','did','got','mr','ms','ive','audience'])

# assign stop words 
common_debate_words = stop_words

# Dict to dataframe

In [21]:
def dict_to_df(rawtext):
    '''transform dictionary of raw text to dataframe'''
    text_df = pd.DataFrame.from_dict(rawtext)
    text_df = text_df.drop(['URL', '_id'], axis = 1)
    return text_df

# Clean text   

In [22]:
def clean_text_round1(text):
    '''make text lowercase, remove text in parantheses, remove punctuation and remove words containing numbers.'''
    text = text.lower()
    text = re.sub('\(.*?\)', '', text)
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    text = re.sub('\w*\d\w*', '', text)
    
    text = re.sub('[‘’“”…]', '', text)
    text = re.sub('\n', '', text)
    return text

round1 = lambda x: clean_text_round1(x)

# Fit vectorizer 

In [26]:
vectorizer = CountVectorizer(stop_words=common_debate_words,min_df=10, max_df=8.5)
def count_vectorize(series):
    '''create document term matrix'''
    doc_word = vectorizer.fit_transform(series)
    return doc_word

# Fit LSA model 

In [27]:
lsa = TruncatedSVD(10)
def fit_model(doc_word):
    '''fit topic model'''
    doc_topic = lsa.fit_transform(doc_word)
    return doc_topic

# Topics by word

In [28]:
def topic_word(model):
    '''create topic by word dataframe'''
    topic_word_df = pd.DataFrame(model.components_.round(10),
             index = ["component_1","component_2","component_3","component_4","component_5",
                     "component_6", "component_7","component_8","component_9","component_10"],
             columns = vectorizer.get_feature_names())
    return topic_word_df

# Display topics 

In [29]:
def display_topics(model, feature_names, no_top_words, topic_names=None):
    '''print topics outputted from model to stdout'''
    for ix, topic in enumerate(model.components_):
        if not topic_names or not topic_names[ix]:
            print("\nTopic ", ix)
        else:
            print("\nTopic: '",topic_names[ix],"'")
        print(", ".join([feature_names[i]
                        for i in topic.argsort()[:-no_top_words - 1:-1]]))

# Without Prefect 

In [54]:
    
# topic model preprocessing pipeline

# dict to dataframe
corpus_df = dict_to_df(corpus)
print('dict to dataframe : step successful')

# clean text 
corpus_df['Text'] = pd.DataFrame(corpus_df['Text'].apply(round1))
print('clean : step successful')

# fit vectorizer 
doc_w = count_vectorize(corpus_df['Text'])
print('fit vectorizer : step successful')

# fit LSA model 
lsa_model = fit_model(doc_w)
print('fit lsa : step successful')

# topics by words matrix 
topic_word_mtx = topic_word(lsa)
print('topic by words : step successful')

# display topics 
display_topics(lsa, vectorizer.get_feature_names(), 10)



dict to dataframe : step successful
clean : step successful
fit vectorizer : step successful
fit lsa : step successful
topic by words : step successful

Topic  0
people, country, president, america, say, right, american, states, years, government

Topic  1
president, states, united, clinton, obama, isis, world, secretary, america, iran

Topic  2
tax, percent, jobs, taxes, government, plan, pay, money, economy, cut

Topic  3
country, america, president, jobs, world, tax, united, states, trade, percent

Topic  4
president, tax, people, states, united, isis, obama, taxes, war, plan

Topic  5
president, care, health, government, insurance, clinton, states, act, federal, affordable

Topic  6
government, care, health, federal, states, years, weve, things, big, deal

Topic  7
care, health, isis, states, world, insurance, united, tax, war, affordable

Topic  8
government, states, clinton, united, america, world, secretary, hillary, economy, american

Topic  9
government, president, clinton, co

**Not bad--the LSA model ran and we have some topics, but what if we want this model to rerun on new data at a certain time in the future? First we'd need to make our function for loading data into function, but additionally, there's no real way to do this from within our notebook. This is where Prefect comes in**

# Authenticate Prefect Cloud

[Authenticate]('https://docs.prefect.io/orchestration/tutorial/configure.html#authenticating-with-prefect-cloudyour') your local machine to leverage Prefect Cloud. 

1. Sign up for the free tier of Prefect Cloud 
2. Open menu (three lines top right of dashboard page) 
3. Click API tokens 
4. Create 1 token for the tenant (save it somewhere on your computer) 
5. Create anoter token for the runner (save it somewhere on your computer) 
6. From the dashboard select new project and create a new project 

Login to Prefect: 

In [38]:
! prefect auth login -t eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ0ZW5hbnRfaWQiOiI5N2FmZTgyMC02MTFlLTRmMDEtYjEzOC0zNDhkYjAwZWY1MjQiLCJ1c2VyX2lkIjoiMDhlODgyOTctNDI5MS00Yjc0LWFjNWYtOTA1ODZlY2RiZGRiIiwiYXBpX3Rva2VuX3Njb3BlIjoiVEVOQU5UIiwiaWF0IjoxNTg5MzExMDUwLCJleHAiOjQxMDI0NDQ4MDAsImp0aSI6ImM0MjliMjZmLTdjNTQtNDFmZi1hZDEzLTJmMjdmOGM2MmU1OCIsImlzcyI6IlByZWZlY3QgQ2xvdWQiLCJhdWQiOiJQcmVmZWN0IENsb3VkIEFQSSIsInR5cCI6IkFQSSJ9.KeZY-ztg9alOlY4ZzXVQYAeDdkYYpPEwhUccfIdBSNWG_6XGhr6Nh9oKCyllu0jC3bEYB3qmEu7FLXxcEDdqsRLt91jrWGGl4ULyEPsjfRQfFdU8OQE0Z8AfFmsGMWKFpZ-Ce8Wo6_yO1ERkQHv2-Rh34L6-JKJyQj0JP-QB5kl3Cr435T0kwu65vRzVS-PdDwN698hNnf_alMD7RrOTWV0ZNbl-qnPB9hI2ZG4e2232ejyZh97fa6jNHTxlw3uxAiJSuaH1YIiRE8_AbSL436KFkqOzNMTC7jxHVip_TH09r9Eirau0GpUnKoMOe9Km62Zan4zYg8SqHUSqscs81Q

[32mLogin successful![0m


You can store the runner token as environment variable, or paste it into the run_agent command at the bottom of the flow block in the code below. 


In [71]:
# ! export PREFECT__CLOUD__AGENT__AUTH_TOKEN=<COPIED_RUNNER_TOKEN>

# Execute Prefect Flow 

Now if you run our NLP flow below, it will be registered with the prefect API (through the ```.register``` command). Now we can use an agent to watch for flow runs that are scheduled by the Prefect API and execute them accordingly. 

Uncomment the ```.run_agent``` line after the register command to start a Local Agent. Note -- the Local Agent will use the RUNNER token stored in your environment but if you want to manually pass it a token you may do so with ```run_agent(token=<YOUR_RUNNER_TOKEN>)```.

In [80]:
with Flow("NLP : Debate Transcripts") as nlp_flow:
    
# topic model preprocessing pipeline

    # dict to dataframe
    corpus_df = dict_to_df(corpus)

    # clean text 
    corpus_df['Text'] = pd.DataFrame(corpus_df['Text'].apply(round1))

    # fit vectorizer 
    doc_w = count_vectorize(corpus_df['Text'])

    # fit LSA model 
    lsa_model = fit_model(doc_w)

    # topics by words matrix 
    topic_word_mtx = topic_word(lsa)

    # display topics 
    display_topics(lsa, vectorizer.get_feature_names(), 10)

nlp_flow.run()
nlp_flow.register(project_name='nlp_demo')
nlp_flow.run_agent('eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ0ZW5hbnRfaWQiOiI5N2FmZTgyMC02MTFlLTRmMDEtYjEzOC0zNDhkYjAwZWY1MjQiLCJ1c2VyX2lkIjpudWxsLCJhcGlfdG9rZW5fc2NvcGUiOiJSVU5ORVIiLCJyb2xlIjoiUlVOTkVSIiwiaWF0IjoxNTg5MzExNzQ2LCJleHAiOjQxMDI0NDQ4MDAsImp0aSI6ImRkYzBmY2JkLTAzN2ItNGUzZS05MTk1LWNlYmQ4ODNhYjQyNiIsImlzcyI6IlByZWZlY3QgQ2xvdWQiLCJhdWQiOiJQcmVmZWN0IENsb3VkIEFQSSIsInR5cCI6IkFQSSJ9.FeoGCZ_tKtM81u3pzP6YjjIVz1VrUsxLptpPx1wUpishylu2oEe7neZb90iuldRODMo-glFzS2gc55f8gNfdLiShD8ESd3ahxqjvjL2JCy-UtwCexskKs1k0jPTKxvyuGRVGqVTe3tecZQk97jRDKEiI6PtH8Tg9U7MqjdJM_YSPskril1qlOihbRRM-na2blPr4hR8ppu3HZtUw4bPCt-k1ZvEhh9tWLcSMKWLSE-q1X74mrPOMZ8iydbn3gT9h3V9NVznSLNplWqrKdv9bkfn52yt5XFhZozwHtDw3piQ2k-OQtmu_zuQXV84g7lcwYuRDMmeC8QIHWFfVxnHLyg')


Topic  0
people, country, president, america, say, right, american, states, years, government

Topic  1
president, states, united, clinton, obama, isis, world, secretary, america, iran

Topic  2
tax, percent, jobs, taxes, government, plan, money, pay, economy, cut

Topic  3
country, america, president, jobs, world, tax, united, states, trade, percent

Topic  4
president, tax, people, states, united, isis, obama, taxes, plan, war

Topic  5
president, care, health, government, insurance, clinton, state, federal, money, obama

Topic  6
government, care, health, states, united, federal, years, deal, things, way

Topic  7
care, health, states, isis, tax, united, insurance, world, secretary, clinton

Topic  8
states, united, government, america, clinton, world, secretary, hillary, new, economy

Topic  9
government, clinton, wall, street, country, secretary, president, big, obama, isis
[2020-05-12 20:49:20] INFO - prefect.FlowRunner | Beginning Flow run for 'NLP : Debate Transcripts'
[2020-0

# What just happened? 

**You might notice your cell is still running (*), You can interrupt your kernel to stop the agent from continually querying. 
The agent should by default stop querying after a minute (check your dashboard to make sure, you can just remove the agent from your dashboard after that).**

**With extremely minimal code changes we now have a prefect flow setup which allows us to do really powerful things like execute this entire flow with a click of a button from the prefect UI, or schedule runs for a specific time of day, below is how you schedule this run from the notebook directly** 

In [79]:
#! prefect run cloud --name 'NLP : Debate Transcripts' --project 'nlp_demo'

# Prefect flow on Saturn Dask cluster

Now what if theoretically, we have a much larger corpus that cannot be fit into memory -- we can run the flow on a dask cluster

Start Saturn Dask cluster

In [12]:
cluster = SaturnCluster()
cluster

VBox(children=(HTML(value='<h2>SaturnCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n   …

In [14]:
client = Client(cluster)

In [16]:
cluster.status

'running'

Then simply point the prefec dask executor to url of saturn dask cluster

In [31]:
executor = DaskExecutor('tcp://sami-energy-dask.main-namespace:8786')
nlp_flow.run(executor=executor)

[2020-05-12 16:52:31] INFO - prefect.FlowRunner | Beginning Flow run for 'NLP : Debate Transcripts'
[2020-05-12 16:52:31] INFO - prefect.FlowRunner | Starting flow run.
[2020-05-12 16:52:31] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


<Success: "All reference tasks succeeded.">