# Meetup Outreach

This notebook shows how you can use Spark streaming to acquire and process data, then display it in an actionable dashboard using Declarative Widgets.

You have a product or service that you want to promote. One way of doing this is through local Meetups. There are already some affiliated Meetups, but you want to increase attendance. By analyzing the RSVP stream, we can find members who are going to similarly themed Meetups and contact them, to let them know about one of our affiliated Meetups. The notebook also keeps track of who you have contacted and visualizes such “conversions” in the dashboard.

On your first visit to this notebook, we recommend that you execute one cell at a time as you read along. Later, if you  just want to see the demo, select *Cell > Run All* from the menu bar. Once you've run all of the cells, select *View > View Dashboard* and then click on the **Stream** toggle to start the data stream.

**Table of Contents**

1. [Configuration](#Configuration)
2. [Upcoming Meetups](#Upcoming-Meetups)
3. [RSVPs Stream](#RSVPs-Stream)
    1. [Conversions](#Conversions)
4. [Widgets](#Widgets)
    1. [Meetups List](#Meetups-List)
    2. [Total RSVPs Plot](#Total-RSVPs-Plot)
    3. [Candidates List](#Candidates-List)
5. [Setup Streaming](#Setup-streaming)
6. [Demo Mode](#Demo-Mode)

In [None]:
%matplotlib inline

In [None]:
import shutil
import tempfile
import os
import time
import json
import sys
from tornado.websocket import websocket_connect
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from functools import reduce, partial

from urth.widgets.widget_channels import channel

##Configuration

The Meetup API requires authentication. You can get your API key [here](https://secure.meetup.com/meetup_api/key/). Enter the key below:

In [None]:
API_KEY = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'

The topic on which to search for our upcoming meetups.

In [None]:
PROD_TOPIC = 'bluemix'

How often (in seconds) to update the upcoming meetups list.

In [None]:
PROD_TIMER = 3600

To determine potential matches, define a relevant topic list.

In [None]:
RELEVANT_TOPICS = ['paas', 'cloud', 'cloud-computing', 'cloud-foundry', 'devops', 'internet-of-things', 'data-analytics', 'saas-software-as-a-service', 'iaas-infrastructure-as-a-service', 'openstack', 'docker', 'nodejs']

Threshold (in km). Candidates farther away than this distance from one of the upcoming meetups are discarded:

In [None]:
DIST_THRESHOLD = 50

Span of time over which to calculate change in meetups RSVP counts.

In [None]:
MU_CHANGE_CUTOFF = timedelta(weeks=1)
MU_CHANGE_LABEL = 'Change in last week'

For **demo** purposes, define this to `True`. The notebook will randomly select to "contact" a candidate from the list.

In [None]:
DEMO = False

##Upcoming Meetups

In [None]:
import requests
from html.parser import HTMLParser
import tornado.ioloop

# list of Meetup Event records; result of search on PROD_TOPIC keywords
upcoming_meetups = []

# dict indexed by Meetup Event ID; each entry is a list of
# [datetime, total count of "yes" RSVPs, total count of conversions]
rsvp_count_history = {}

# list of tuples containing the "yes" RSVP counts for all of the
# upcoming meetups, as well as the total conversions
total_rsvp_count = []

mu_events_url = "https://api.meetup.com/2/open_events"
params = {'topic':PROD_TOPIC, 'key':API_KEY}
loop = tornado.ioloop.IOLoop.current()

class MLStripper(HTMLParser):
    def __init__(self):
        super().__init__()
        self.reset()
        self.fed = []
    def handle_data(self, d):
        self.fed.append(d)
    def get_data(self):
        return ''.join(self.fed)

def strip_tags(html):
    s = MLStripper()
    s.feed(html)
    return s.get_data()

def get_venue(x):
    if 'venue' not in x:
        return '-'

    v = x['venue']
    city = v['city']
    if v['country'] == 'us':
        return '{}, {}'.format(city, v['state'])
    else:
        return '{} ({})'.format(city, v['country'].upper())

def map_mus(x):
    x['description'] = strip_tags(x['description']) if 'description' in x else ''
    x['group']['url'] = "http://meetup.com/{}".format(x['group']['urlname'])
    x['venue_loc'] = get_venue(x)
    return x

def update_meetups_values():
    global upcoming_meetups
    global rsvp_count_history
    global total_rsvp_count

    now = datetime.now()
    cutoff = now - MU_CHANGE_CUTOFF
    
    for mu in upcoming_meetups:
        id = mu['id']
        prev_conv_count = 0 # running total for conversions
        if id not in rsvp_count_history:
            rsvp_count_history[id] = []
        else:
            # carry previous conversions count forward
            prev_conv_count = rsvp_count_history[id][-1][2]

        # store the RSVP count for each event at this time
        rsvp_count_history[id].append([now, mu['yes_rsvp_count'], prev_conv_count])
        # cleanup old rsvp history values
        rsvp_count_history[id] = list(filter(lambda x: x[0] > cutoff, rsvp_count_history[id]))

        # save change in rsvp count over time span
        mu['change_over_time'] = rsvp_count_history[id][-1][1] - rsvp_count_history[id][0][1]

    # update the total number of RSVPs for all of upcoming meetups
    total_rsvp_count.append((now,
            reduce(lambda x, y: x + y['yes_rsvp_count'], upcoming_meetups, 0),
            reduce(lambda x, y: x + y[-1][2], rsvp_count_history.values(), 0)
    ))
    # cleanup old values
    list(filter(lambda x: x[0] > cutoff, total_rsvp_count))

def refresh_meetups_list():
    '''
    Update the data for the upcoming meetup events. Once invoked, will run every
    PROD_TIMER seconds.
    
    Publishes data on the 'meetups' and 'plots' channels.
    '''
    global upcoming_meetups

    # request updated meetups data
    r = requests.get(mu_events_url, params=params)
    upcoming_meetups = list(map(map_mus, r.json()['results']))
    # update values, comparing old vs new data
    update_meetups_values()
    # reschedule
    loop.call_later(PROD_TIMER, refresh_meetups_list)

    # broadcast the updated list
    channel('meetups').set('list', upcoming_meetups)
    fig = plot_total_rsvps(total_rsvp_count)
    channel('plots').set('total_rsvp_plot', fig)


##RSVPs Stream

In [None]:
conversions = [];

def process_rsvps(ssc, queue):
    '''Initiates processing of the RSVPs stream.'''
    msgs = ssc.textFileStream(queue)
    
    # Each event is a JSON blob. Parse it. Filter it.
    rsvps = msgs.map(lambda json_str: json.loads(json_str))

    process_candidates(rsvps)
    process_conversions(rsvps)

In [None]:
def process_candidates(rsvps):
    '''
    Generate a list of candidates from the incoming stream, filtering on
    relevant topics and distance.
    '''
    filtered_rsvps = filter_rsvps(rsvps)
    candidate_rsvps = compute_closeness(filtered_rsvps)
    member_data = get_members(candidate_rsvps)
    gen_candidate_list(member_data)

In [None]:
def filter_rsvps(rsvps):
    '''
    Filter out RSVPs that don't have our relevant topics,
    contain PROD_TOPIC or are not "yes" responses.
    '''
    return rsvps.filter(topic_filter)

def topic_filter(rsvp):
    return (rsvp['response'] == 'yes' and
            any(topic['urlkey'] in RELEVANT_TOPICS and topic['urlkey'] != PROD_TOPIC
                for topic in rsvp['group']['group_topics']))

In [None]:
# from http://www.johndcook.com/blog/python_longitude_latitude/
import math

def distance_on_unit_sphere(lat1, long1, lat2, long2):
    '''Returns distance (in km) between two points on globe'''
 
    # Convert latitude and longitude to 
    # spherical coordinates in radians.
    degrees_to_radians = math.pi/180.0
         
    # phi = 90 - latitude
    phi1 = (90.0 - lat1)*degrees_to_radians
    phi2 = (90.0 - lat2)*degrees_to_radians
         
    # theta = longitude
    theta1 = long1*degrees_to_radians
    theta2 = long2*degrees_to_radians
         
    # Compute spherical distance from spherical coordinates.
         
    # For two locations in spherical coordinates 
    # (1, theta, phi) and (1, theta', phi')
    # cosine( arc length ) = 
    #    sin phi sin phi' cos(theta-theta') + cos phi cos phi'
    # distance = rho * arc length
     
    cos = (math.sin(phi1)*math.sin(phi2)*math.cos(theta1 - theta2) + 
           math.cos(phi1)*math.cos(phi2))
    arc = math.acos( cos )
 
    # Remember to multiply arc by the radius of the earth 
    # in your favorite set of units to get length.
    #return arc
    return arc * 6371  # radius earth = 6,371 km

In [None]:
def compute_closeness(rsvps):
    '''
    Compute how close the candidate is to our list of upcoming events. If they
    are farther away than DIST_THRESHOLD, filter them out.
    '''
    return (rsvps
            .map(add_closest_mu)
            .filter(lambda x: x['cmu_dist'] < DIST_THRESHOLD))

def add_closest_mu(rsvp):
    # save closest upcoming meetup
    cmu = None
    cmu_dist = sys.maxsize
    # loop through our upcoming events, looking for closest
    for mu in upcoming_meetups:
        m = mu['group']
        r = rsvp['group']
        dist = distance_on_unit_sphere(m['group_lat'], m['group_lon'], r['group_lat'], r['group_lon'])
        if dist < cmu_dist:
            cmu = mu
            cmu_dist = dist
    # store this data in the RSVP record
    rsvp['cmu'] = cmu
    rsvp['cmu_dist'] = cmu_dist
    return rsvp

In [None]:
def get_members(rsvps):
    '''Create a member record from the RSVP record.'''
    return rsvps.map(map_rsvp_to_member)

def map_rsvp_to_member(rsvp):
    member = ({
        'id': rsvp['member']['member_id'],
        'member': rsvp['member'],
        'cmu': ({
            'id': rsvp['cmu']['id'],
            'name': rsvp['cmu']['name'],
            'event_url': rsvp['cmu']['event_url']
        }),
        'cmu_dist': rsvp['cmu_dist']
    })
    # use a fallback photo for those members without one
    if 'photo' not in member['member'] or member['member']['photo'] is None:
        member['member']['photo'] = 'http://photos4.meetupstatic.com/img/noPhoto_50.png'
    return member

In [None]:
def gen_candidate_list(rsvps):
    '''Publish the candidates list (member records), which notifies the UI.'''
    rsvps.foreachRDD(update_cand_list)

def update_cand_list(rsvps_rdd):
    channel('candidates').set('cand_list', rsvps_rdd.collect())

### Conversions

We keep track of candidates that we have contacted. If we later see that such a candidate has made an RSVP for one of our upcoming events, we count this as a "conversion".

In [None]:
def process_conversions(rsvps):
    '''Check the RSVP stream for "converted" members, updating our running total.'''
    conversion_stream = rsvps.filter(lambda x: x['response'] == 'yes' and 
            any(topic['urlkey'] == PROD_TOPIC for topic in x['group']['group_topics']) and
            x['member']['member_id'] in conversions)
    conversion_stream.foreachRDD(rdd_update_conversions_count)

def rdd_update_conversions_count(rdd):
    for rsvp in rdd.collect():
        muid = rsvp['event']['event_id']
        if muid in rsvp_count_history:
            # increment conversions counter in *latest* entry
            rsvp_count_history[muid][-1][2] += 1
        # TODO what do we do if `rsvp_count_history` doesn't have `muid`?

In [None]:
def candidate_contacted(id):
    '''
    Callback which can be used by the UI to update the list of candidates
    we have contacted.
    Works in conjunction with the <template> which follows.
    '''
    conversions.append(id)

In [None]:
%%html
<template is="urth-core-bind">
    <urth-core-function id="candidateContacted" ref="candidate_contacted" arg-id="{{id}}"></urth-core-function>
</template

## Widgets

In [None]:
%%html
<link rel="import" href="urth_components/iron-list/iron-list.html"
    is="urth-core-import" package="PolymerElements/iron-list">
<link rel="import" href="urth_components/paper-toggle-button/paper-toggle-button.html"
    is="urth-core-import" package="PolymerElements/paper-toggle-button#v1.0.10">
<link rel="import" href="urth_components/paper-material/paper-material.html"
    is="urth-core-import" package="PolymerElements/paper-material">

In [None]:
# There is a timing issue with Urth Polymer imports. This works around that issue
# so that the following Polymer element will load correctly.
import time
time.sleep(4)

### Meetups List

In [None]:
%%html
<style is="custom-style">
    .meetups-list iron-list {
        height: 400px;
    }

    .meetups-list .header .row,
    .meetups-list .meetup-row {
        display: flex;
        justify-content: space-between;
        padding: 0.3em 1em;
    }
    .meetups-list .header .row {
        align-items: flex-end;
    }
    .meetups-list .event {
        flex: 1 1 45%;
    }
    .meetups-list .date {
        flex: 1 1 20%;
    }
    .meetups-list .city {
        flex: 1 1 15%;
    }
    .meetups-list .rsvp-count {
        flex: 1 1 10%;
    }
    .meetups-list .change {
        flex: 1 1 10%;
    }

    .meetups-list .header {
        background-color: #305b81;
        color: white;
        height: 95px;
        padding: 10px;
    }
    .meetups-list .header h2 {
        position: absolute;
        margin-top: 0.83em;
        font-size: 1.5em;
    }
    .meetups-list .header .row {
        height: 75px;
    }
    .meetups-list .header .change {
        text-align: center;
    }
    
    .meetups-list .meetup-row-wrapper {
        padding: 0.3em 0.2em 0 0.2em;
    }

    .meetups-list .meetup-row .event a {
        display: block;
    }
    .meetups-list .meetup-row .event .group-name {
        font-size: smaller;
        margin-left: 1em;
    }
    .meetups-list .meetup-row .change.positive {
        color: green;
    }
    .meetups-list .meetup-row .change.positive .arrow:before {
        content: "\f176";
    }
    .meetups-list .meetup-row .change.negative {
        color: red;
    }
    .meetups-list .meetup-row .change.negative .arrow:before {
        content: "\f175";
    }
</style>

<template is="urth-core-bind" channel="meetups" id="meetups-list-tmpl">
    <script>
        (function() {
            var dateStringOptions = {weekday:'short', year:'numeric', month: 'short', day:'numeric'};
            var locale = navigator.language || navigator.browserLanguage || navigator.systemLanguage || navigator.userLanguage;

            var scope = document.getElementById('meetups-list-tmpl');
            scope.getISOString = function(time) {
                return (new Date(time)).toISOString();
            };
            scope.getLocaleDateString = function(time) {
                return (new Date(time)).toLocaleDateString(locale, dateStringOptions);
            };
            scope.getChangeOverTime = function(change) {
                if (change === 0) {
                    return '-';
                }
                return Math.abs(change);
            };
            scope.getChangeOverTimeClass = function(change) {
                var cls = 'change';
                if (change > 0) {
                    cls += ' positive'
                } else if (change < 0) {
                    cls += ' negative'
                }
                // zero is also a valid value, but has no class
                return cls;
            };
        })();
    </script>

    <div class="meetups-list">
        <paper-material class="header">
            <h2>Upcoming Meetups</h2>
            <div class="row">
                <span class="event">Meetup Event</span>
                <span class="date">Date</span>
                <span class="city">City</span>
                <span class="rsvp-count">RSVPs</span>
                <span class="change">[[change_label]]</span>
            </div>
        </paper-material>
        <iron-list items="[[list]]">
            <template>
                <div class="meetup-row-wrapper">
                    <paper-material class="meetup-row">
                        <span class="event">
                            <a href="[[item.event_url]]" target="_blank">[[item.name]]</a>
                            <a class="group-name" href="[[item.group.url]]" target="_blank">[[item.group.name]]</a>
                        </span>
                        <time class="date" datetime="[[getISOString(item.time)]]">[[getLocaleDateString(item.time)]]</time>
                        <span class="city">[[item.venue_loc]]</span>
                        <span class="rsvp-count">[[item.yes_rsvp_count]]</span>
                        <span class$="[[getChangeOverTimeClass(item.change_over_time)]]">
                            <span class="fa arrow"></span>
                            <span>[[getChangeOverTime(item.change_over_time)]]</span>
                        </span>
                    </paper-material>
                </div>
            </template>
        </iron-list>
    </div>
</template>

In [None]:
%%html
<template is="urth-core-bind" channel="meetups">
    <style>
        .topics-desc {
            font-size: larger;
        }
        .topics-desc span {
            font-style: italic;
        }
    </style>
    <div class="topics-desc">
       Showing Upcoming Meetups on <span>[[upcoming_label]]</span> tag. Meetup candidates are generated using the following topic filters: <span>[[cand_topics_label]]</span>
    </div>
</template>

### Total RSVPs Plot

In [None]:
def plot_total_rsvps(list):
    '''
    Create a stacked plot where the total is the number of RSVPs over a period of time,
    and the top layer are "converted" RSVPs.
    '''
    X = [i[0] for i in list]
    Y = [[i[1] - i[2] for i in list],  [i[2] for i in list]]

    fig, ax = plt.subplots(1,1,figsize=(10,7))
    ax.stackplot(X, *Y, baseline='zero', colors=['#2fa6e4','#82d1f5'])
    ax.set_title('Total RSVPs for Upcoming Meetups')
    fig.tight_layout()
    plt.close()
    return fig

In [None]:
%%html
<template is="urth-core-bind" channel="plots">
    <img src={{total_rsvp_plot}}>
</template>

#### Initial call to update Upcoming Meetups list
We have to do this after widgets (Meetups list, RSVPs plot) have been defined, since widgets cannot pull existing data off of channel. See [issue 35](https://github.com/jupyter-incubator/declarativewidgets/issues/35) for more info.

In [None]:
refresh_meetups_list()
channel('meetups').set('change_label', MU_CHANGE_LABEL)
channel('meetups').set('upcoming_label', PROD_TOPIC)
channel('meetups').set('cand_topics_label', ', '.join(RELEVANT_TOPICS))

###Candidates List

In [None]:
%%html
<dom-module id="dynamic-list">
    <template>
        <style>
            :host .header {
                background-color: #305b81;
                color: white;
                padding: 10px;
            }
                
            :host .controls {
                display: flex;
                justify-content: space-between;
            }
                
            :host iron-list {
                height: 850px;
            }
                
            :host .count {
                font-size: smaller;
                text-transform: uppercase;
            }                
            :host .count .new-items {
                margin-left: 1em;
                color: white;
            }
                
            :host paper-toggle-button {
                --paper-toggle-button-unchecked-button-color: darkgray;
                --default-primary-color: white;
            }
                
            :host paper-toggle-button:hover {
                cursor: pointer;
            }
            
            .member-card {
                display: flex;
                flex-flow: column wrap;
                align-content: flex-start;
                height: 50px;
                margin: 10px 10px 0 10px;
            }
            .member-card img {
                width: 50px;
                height: 50px;
                flex: 100%;
                margin-right: 10px;
            }
            .member-card .member-name {
                font-size: larger;
                line-height: 25px;
            }
            .member-card .member-name,
            .member-card .cmu {
                flex: 50%;
                white-space: nowrap;
                overflow: hidden;
                text-overflow: ellipsis;
                width: calc(100% - 70px);
            }
            .member-card a {
                color: #305b81;
                text-decoration: none;
            }
            .member-card a:hover {
                text-decoration: underline;
            }
            .member-card .contact-button {
                text-transform: uppercase;
                font-size: 0.6em;
                line-height: 1em;
                margin-left: 1em;
                padding: 0.3em;
                border-radius: 0.2em;
                background-color: #305b81;
                color: white;
            }
            .member-card:not(:hover) .contact-button {
                display: none;
            }
                
            :host .member-card[data-converted] {
                background-color: rgba(130, 209, 245, 0.2);
            }
        </style>
        
        <paper-material class="header">
            <h2>[[heading]]</h2>
            <div class="controls">
                <div class="count">
                    <span>[[getVisibleItemsCount(visibleItems.*)]]</span> items
                    <template is="dom-if" if="[[hasNewItems(newItems.*)]]">
                        <a class="new-items" on-click="flush" href="javascript:"><span>[[getNewItemsCount(newItems.*)]]</span> updates available</a>
                    </template>
                </div>
                <div class="toggle-btn-container">
                    <paper-toggle-button checked="{{liveUpdate}}"></paper-toggle-button>
                    <label id="stream-label">Live Update</label>
                </div>
            </div>
        </paper-material>
        <template is="dom-if" if="[[getVisibleItemsCount(visibleItems.*)]]">
            <iron-list items="[[visibleItems]]" id="list">
                <template>
                    <div>
                        <paper-material class="member-card" data-converted$="[[item.converted]]">
                            <img src="[[item.member.photo]]">
                            <a class="member-name" href="[[getMemberUrl(item)]]" target="_blank">
                                <span>[[item.member.member_name]]</span>
                                <span class="contact-button" data-contact-id$="[[item.member.member_id]]" on-click="_onContactClick">Contact</span>
                            </a>
                            <div class="cmu"><span>[[toFixed(item.cmu_dist)]]</span> km from <a href="[[item.cmu.event_url]]" target="_blank">[[item.cmu.name]]</a></div>
                        </paper-material>
                    </div>
                </template>
            </iron-list>
        </template>
    </template>

    <script>
    (function () {
        'use strict';

        Polymer({
            is: 'dynamic-list',

            properties: {
                heading: {
                    type: String,
                    value: 'Dynamic List'
                },
                items: {
                    type: Array,
                    value: function() { return []; }
                },

                visibleItems: {
                    type: Array,
                    value: function() { return []; }
                },

                newItems: {
                    type: Array,
                    value: function() { return []; }
                },

                liveUpdate: {
                    type: Boolean,
                    value: true,
                    observer: '_liveUpdateChanged'
                },

                /**
                 * If specified, ensures that visible items are unique
                 * based on the given property name.
                 */
                idProp: {
                    type: String,
                    value: ''
                }
            },

            _keys: null, // unique candidate member IDs

            observers: [
                '_itemsChanged(items.*)'
            ],

            created: function() {
                this._keys = {};
            },
                    
            attached: function() {
                this._contactedFunc = document.getElementById('candidateContacted');
            },

            _itemsChanged: function(change) {
                if (this.items.length === 0) {
                    return;
                }

                if (change.path === 'items') {
                    if (this.idProp) {
                        for (var i = 0, len = this.items.length; i < len; i++) {
                            var item = this.items[i];
                            var id = item[this.idProp];
                            if (!this._keys.hasOwnProperty(id)) {
                                this.unshift('newItems', item);
                                this._keys[id] = 1;
                            }
                        }
                    } else {
                        this.unshift.apply(this, ['newItems'].concat(this.items));
                    }
                } else if (change.path === 'items.splices') {
                    // item was added/removed
                    console.error('Not implemented');
                } else {
                    // individual item or its sub-props changed
                    console.error('Not implemented');
                }

                if (this.liveUpdate) {
                    this.flush(); // update main list with new items
                }
            },
                    
            _liveUpdateChanged: function(newVal, oldVal) {
                console.log('live update:',this.liveUpdate);
                if (newVal) {
                    this.flush();
                }
            },
                    
            _onContactClick: function(e) {
                if (e.model.get('item.converted')) {
                    // already contacted this candidate; nothing to do here
                    return;
                }

                // update global conversion list in python
                var id = parseInt(e.target.getAttribute('data-contact-id'), 10);
                this._contactedFunc.args = {id: id};
                this._contactedFunc.invoke();

                // update model for this element
                e.model.set('item.converted', true);
            },

            getVisibleItemsCount: function(e) {
                return this.visibleItems.length;
            },

            getNewItemsCount: function(e) {
                return this.newItems.length;
            },

            getMemberUrl: function(item) {
                return 'http://www.meetup.com/members/' + item.member.member_id + '/';
            },
              
            // only show new items count if *not* streaming
            hasNewItems: function(e) {
                return this.getNewItemsCount() !== 0 && !this.liveUpdate;
            },
                    
            toFixed: function(dist) {
                return parseFloat(dist).toFixed(2);
            },

            // Copies new (hidden) items into the visible list.
            flush: function(e) {
                this.unshift.apply(this, ['visibleItems'].concat(this.newItems));
                this.set('newItems', []);
                if (e) {
                    e.preventDefault();
                }
            },

            // For demo purposes only.
            // Simulate a mouse click on a random element in the list.
            _simulateContactClick: function() {
                if (!this.visibleItems.length) {
                    return;
                }
                var el = this.$$('#list');
                var visibleNodes = Array.prototype.filter.call(el.children, function(item) {
                    return item.tagName !== 'TEMPLATE';
                });
                var idx = Math.floor(Math.random() * visibleNodes.length);
                var node = visibleNodes[idx];
                node.querySelector('.contact-button').dispatchEvent(new MouseEvent('click', {bubbles:true}));
            }
        });
    })();
    </script>
</dom-module>

<template is="urth-core-bind" channel="candidates">
    <dynamic-list items="[[cand_list]]" id-prop="id" heading="Meetup Candidates"></dynamic-list>
</template>

##Setup streaming

In [None]:
def create_streaming_context(checkpoint_dir, sample_rate):
    '''
    Creates a new SparkContext and SparkStreamingContext. Done in a function
    to allow repeated start/stop of the streaming. Returns the streaming
    context instance.
    
    :param checkpoint_dir: Directory to use to track Spark job state
    :param sample_rate: Stream sampling rate in seconds
    '''
    # create a local SparkContext to start using as many CPUs as we can
    sc = SparkContext('local[*]')
    
    # wrap it in a StreamingContext that collects from the stream
    ssc = StreamingContext(sc, sample_rate)

    # Setup a checkpoint directory to keep total counts over time.
    ssc.checkpoint(os.path.join(checkpoint_dir, '.checkpoint'))
    
    return ssc

In [None]:
class FileRingReceiver(object):
    '''
    Hack around lack of custom DStream receivers in Python: 
    Create a ring buffer of UTF-8 text files on disk.
    '''
    def __init__(self, max_batches=10):
        self.queue = tempfile.mkdtemp()
        self.batch_count = 0
        self.max_batches = max_batches
        
    def __del__(self):
        self.destroy()
        
    def put(self, text):
        # ignore sentinels
        if text is None: return
        with open(os.path.join(self.queue, str(self.batch_count)), 'w', encoding='utf-8') as f:
            f.write(text)
        if self.batch_count >= self.max_batches:
            oldest = str(self.batch_count - self.max_batches)
            os.remove(os.path.join(self.queue, str(oldest)))
        self.batch_count += 1
        
    def destroy(self):
        shutil.rmtree(self.queue, ignore_errors=True)

In [None]:
conn_future = None
ssc = None
receiver = None

In [None]:
def start_stream():
    '''
    Creates a websocket client that pumps events into a ring buffer queue. Creates
    a SparkStreamContext that reads from the queue. Creates the events, topics, and
    venues DStreams, setting the widget channel publishing functions to iterate over
    RDDs in each. Starts the stream processing.
    '''
    global conn_future
    global ssc
    global receiver
    
    receiver = FileRingReceiver(max_batches=100)  
    conn_future = websocket_connect('ws://stream.meetup.com/2/rsvps', on_message_callback=receiver.put)
    ssc = create_streaming_context(receiver.queue, 5)
    process_rsvps(ssc, receiver.queue)
    ssc.start()
    
def shutdown_stream():
    '''
    Shuts down the websocket, stops the streaming context, and cleans up the file ring.
    '''
    global conn_future
    global ssc
    global receiver
    
    conn_future.result().close()
    ssc.stop()
    receiver.destroy()

In [None]:
%%html
<template is="urth-core-bind">
    <urth-core-function id="streamFunc" ref="start_stream"></urth-core-function>
    <urth-core-function id="shutdownFunc" ref="shutdown_stream"></urth-core-function>
</template>

<style is="custom-style">
    paper-toggle-button {
        --default-primary-color: green;
    }
    
    paper-toggle-button:hover {
        cursor: pointer;
    }
        
    .toggle-btn-container {
        margin: 1em 0;
        text-align: right;
    }
    
    #stream-label {
        font-size: larger;
        margin: 0;
        padding: 0 0.5em;
    }
</style>

<div class="toggle-btn-container">
    <paper-toggle-button id="stream-btn"></paper-toggle-button>
    <label id="stream-label">Stream</label>
</div>

<script>
    $('#stream-btn').on('change', function() {
        if ($(this).attr('checked')) {
            // start streaming
            console.warn('Starting Spark Streaming');
            $('#streamFunc').get(0).invoke();
        } else {
            // stop streaming
            console.warn('Stopping Spark Streaming');
            $('#shutdownFunc').get(0).invoke();
        }
    });
</script>

##Demo Mode

In order to run a standalone demo, the following code will (if `DEMO` is set to `True`) periodically simulate a mouse click on the contact button for a random candidate in the list. This will make it so that the rest of the code considers this as a "conversion". Over time, as "converted" members also sign up for one of the upcoming events in our list, the plot will show two different colors, the top one signifying the number of "converted" candidates.

In [None]:
%%html
<template is="urth-core-bind" channel="demo" id="demo-tmpl">
    <script>
        var startDemoMode = function(e) {
            if (e.detail.channel !== 'demo' || e.detail.key !== 'on') {
                return;
            }
            console.log('STARTING DEMO MODE');
            setInterval(function() {
                var list = document.querySelector('dynamic-list');
                if (list) {
                    list._simulateContactClick();
                }
            }, 60000);
            demoTmpl.removeEventListener('channelSetItem', startDemoMode);
        }

        var demoTmpl = document.getElementById('demo-tmpl');
        demoTmpl.register(demoTmpl, '*');
        demoTmpl.addEventListener('channelSetItem', startDemoMode);
    </script>
</template>

In [None]:
if DEMO:
    channel('demo').set('on', 1)