In [7]:
from __future__ import print_function

import os, pandas as pd, numpy as np, dateutil.parser

from kafka import KafkaConsumer
from operator import concat
from json import loads
from urth.widgets.widget_channels import channel
from datetime import datetime as dt, timedelta as td
from dateutil import tz
from functools import reduce

In [8]:
flatten = lambda iterable: reduce(concat, iterable, [])
counts = lambda: [loads(x.value) for x in flatten(consumer.poll(100).values())]
flip = lambda tpl: tuple(reversed(tpl))

In [9]:
base = '../data'
data_f = os.path.join(base, 'airline-twitter-sentiment', 'airline-handles')
airline_handles = pd.DataFrame([flip(l.rstrip().split(',')) for l in open(data_f)],
                               columns=['Tweeter Handle', 'Airline Name'])

In [18]:
%%html

<link rel="import" 
     href="urth_components/urth-viz-table/urth-viz-table.html" 
     is='urth-core-import'>

<link rel="import" 
      href="urth_components/urth-viz-chart/urth-viz-chart.html" 
      is="urth-core-import">
        
<link rel='import' 
      href='urth_components/paper-slider/paper-slider.html' 
      package='PolymerElements/paper-slider' 
      is='urth-core-import'>

<link rel='import' 
      href='urth_components/paper-button/paper-button.html' 
      package='PolymerElements/paper-button' 
      is='urth-core-import'>

<link rel='import' 
      href='urth_components/paper-progress/paper-progress.html' 
      package='PolymerElements/paper-progress' 
      is='urth-core-import'>

<link rel='import' 
      href='urth_components/paper-toggle-button/paper-toggle-button.html' 
      package='PolymerElements/paper-toggle-button' 
      is='urth-core-import'>

In [19]:
%%html
<urth-core-channel id='w' name='counts'></urth-core-channel>
<urth-core-channel id='w' name='total_counts'></urth-core-channel>

In [11]:
%%html        
<template is='urth-core-bind'>
    <urth-core-dataframe id="handles" 
                         ref="airline_handles" 
                         value="{{ hs }}" 
                         auto></urth-core-dataframe>
            
    <urth-viz-table datarows='[[ hs.data ]]' 
                    columns='[[ hs.columns ]]' 
                    selection='[[ sel ]]'></urth-viz-table>
</template>

In [12]:
def parse_timestamp(ts):
    """Returns an UTC datetime object.
    """
    return dt.utcfromtimestamp(ts).replace(tzinfo=tz.gettz('UTC'))

def parse_date(date_string):
    """Parses an ISO date string and returns a UTC datetime object.
    """
    if date_string is None:
        return np.nan
    date = dateutil.parser.parse(date_string)
    return datetime.utcfromtimestamp(date_val.timestamp())


class DefaultDict(dict):
    def __init__(self, from_key):
        self.from_key = from_key
    def __missing__(self, key):
#         print('__missing__ called!')
        val = self.from_key(key)
        self[key] = val
        return val

consumers = DefaultDict(lambda key: KafkaConsumer(key, consumer_timeout_ms=100))

top_counts_df = pd.DataFrame(airline_handles.iloc[:,0].str.lower())
top_counts_df['Tweet Count'] = 0

def get_top_counts():
    """Returns sentiment counts (positive and negative) for indiviudal metrics. 
    """
#     print('get_top_counts called')
    c = consumers['top-counts']
    ms = [loads(v.value.decode('utf-8')) for v in flatten(c.poll(timeout_ms=500).values())]
    total = sum(m['count'] for m in ms)
    channel('w').set('total_counts', total)
    if ms:
        rs = [(m['value'].lower(), m['count']) for m in ms]
        total = sum(r[1] for r in rs)
        df = (pd.DataFrame(rs, columns=['Airline Handle', 'Tweet Count'])
              .groupby('Airline Handle', as_index=False)
              .sum())
        channel('w').set('counts', df)
        return df

def message_to_rec(m):
    return (m['mention'], 
            m['cat'], 
            m['polarity'], 
            m['tweet']['id'], 
            parse_timestamp(m['tweet']['created_at'] / 1000.0), 
            m['tweet']['text']) 

In [13]:
# This causes the kernel to die. Likely bug.

def get_sentiment_messages():
    c = consumers['sentiment']
    vs = [loads(v.value.decode('utf-8')) for v in flatten(c.poll(timeout_ms=1000).values())]
    print('vs:', vs[0] if vs else None)
    if vs:
        tweets_df = pd.DataFrame((message_to_rec(m) for ms in vs for m in ms), 
                                 columns=['Mention', 'Category', 'Polarity', 
                                          'Tweet id', 'Tweet time', 'Tweet text'])
        channel('v').set('messages', tweets_df)
        return tweets_df

In [None]:
get_sentiment_messages()

vs: [{'polarity': 2.0, 'tweet': {'id': 717180043588296704, 'created_at': 1459824021000, 'text': '@VirginAmerica and @AlaskaAir ??? Nooooooooooo. #virginsacrifice.'}, 'cat': '=', 'mention': 'virginamerica'}]


In [None]:
def dashboard_data():
    """Returns the individual sentiment metrics and the total sentiment.
    """
    channel().set('show-progress', True)
    channel().set('progress-end', 2)
    
    end_date = datetime.now()
    start_date = end_date - timedelta(days=CURRENT_WINDOW_SIZE_IN_DAYS)
    channel().set('progress-message', 'Collecting individual sentiment counts.')
    channel().set('progress', 1)
    metrics = get_sentiment_counts_during(start_date, end_date)
    individual_sentiment = {
        'rows': metrics,
        'columns': ['Index', 'Positive', 'Negative'],
        'metadata' : {'interval' : CURRENT_WINDOW_SIZE_IN_DAYS}
    }

    total_positive = 0
    total_negative = 0
    channel().set('progress-message', 'Calculating total sentiment counts.')
    channel().set('progress', 2)
    
    for metric in individual_sentiment['rows']:
        total_positive +=  metric[1]
        total_negative +=  metric[2]

    total_sentiment = {
        'rows': [
            ['Total', total_positive, total_negative]
        ],
        'columns': ['Index', 'Positive', 'Negative'],
        'metadata' : {'interval' : CURRENT_WINDOW_SIZE_IN_DAYS}
    }
    
    channel().set('show-progress', False)
    return {
        'individual': individual_sentiment,
        'total': total_sentiment
    }

In [14]:
%%html
<template id='banner' is="urth-core-bind">
    <h1>Twitter Spark Dashboard</h1>
    
    <paper-item>
        <p>
            This dashboard shows a few statistics collected during the sentiment analysis.
        </p>
    </paper-item>
    
    <template is='dom-if' if='{{show-progress}}'>
        <paper-progress value="{{progress}}" min='0' max='{{progress-end}}'></paper-progress>
        <paper-item>
            <p class='center fill'>{{progress-message}}</p>
        </paper-item>
    </template>
</template>

In [15]:
%%html
<template id="countGraph" is="urth-core-bind" channel="w">

    <paper-item>
        <h2>Number of tweets in the current time window, by handle</h2>
    </paper-item>

    <urth-viz-chart type='bar' 
                    datarows='{{ counts.data }}' 
                    columns='{{ counts.columns }}'
                    rotatelabels='30'
                    margin='{"bottom": 100, "right":100}'></urth-viz-chart>

    <urth-core-function id="topCounts1" 
                        ref="get_top_counts" 
                        result="{{ counts }}"></urth-core-function>

    <paper-item>
    <div>
    
    <paper-toggle-button id='stream-toggle' 
                         on-change="toggleStream">Stream</paper-toggle-button>
    
    <p>Current counts: {{ tol}}
    <div>
    </paper-item>

</template>

In [16]:
%%html
<template is="urth-core-bind">
    <urth-core-function id="topCountsFunction" 
                        ref="get_top_counts" 
                        result="{{ counts }}"></urth-core-function>
    <button onClick="topCountsFunction.invoke()">invoke</button><br/>
    <span>{{ counts }}</span>
</template> 

In [17]:
%%javascript
var streamInterval = null;
var topCounts = function() {
    console.debug('in topCounts');
    return $('#topCountsFunction').get(0).invoke();
}

countGraph.toggleStream = function(e) {
    if (document.getElementById('stream-toggle').active) {
        console.debug('stream-toggle on!');
        streamInterval = setInterval(topCounts, 5000);
    }
    else {
        console.debug('stream-toggle off');
        clearInterval(streamInterval);
    }
};

<IPython.core.display.Javascript object>

In [None]:
sent_c = consumers['sentiment']

In [None]:
vs = [(v.offset, loads(v.value)) for v in flatten(sent_c.poll(timeout_ms=500).values())]

os = [o for o, _ in vs]
print(min(os), '-', max(os))

In [None]:
vs[3][1]

In [None]:
tweets_df = pd.DataFrame((message_to_rec(m) for _, v in vs for m in v), 
                         columns=['mention', 'cat', 'polarity', 
                                  'tweet_id', 'tweet_created_at', 'tweet_text'])
tweets_df

In [None]:
sent_c.config

In [None]:
vs2 = flatten(sent_c.poll(timeout_ms=1000).values())

In [None]:
max(v.offset for v in vs2)

In [None]:
sent_c.committed(_[0])

In [None]:
import threading as t
from time import sleep

def test(n):
    for i in range(n):
        print('step:', i)
        sleep(1)

In [None]:
task = t.Thread(target=test, args=(10,))
task.start()

In [None]:
task.stop()


In [None]:
loop = asyncio.new_event_loop()

In [None]:
loop

In [None]:
'hello'