In [1]:
import numpy
import pandas
import matplotlib.pyplot as pyplot
import json
import arrow
import pytz

from sentenai import *

%matplotlib inline

In [32]:
_API_KEY = 'YOUR_API_KEY_HERE'

In [31]:
# Create a client
sentenai = Sentenai(auth_key=_API_KEY)

# Load Data
Data from the MBTA visualization

http://mbtaviz.github.io/#trains

https://github.com/mbtaviz/mbtaviz.github.io/tree/master/data

In [332]:
with open('../mbtaviz.github.io/data/marey-trips.json', 'r') as file:
    raw_data = json.load(file)
print raw_data[0]

{u'begin': 1391422380, u'line': u'blue', u'end': 1391423647, u'trip': u'B98A375E5', u'stops': [{u'stop': u'place-wondl', u'time': 1391422380}, {u'stop': u'place-rbmnl', u'time': 1391422433}, {u'stop': u'place-bmmnl', u'time': 1391422578}, {u'stop': u'place-sdmnl', u'time': 1391422686}, {u'stop': u'place-orhte', u'time': 1391422806}, {u'stop': u'place-wimnl', u'time': 1391422951}, {u'stop': u'place-aport', u'time': 1391423059}, {u'stop': u'place-mvbcl', u'time': 1391423221}, {u'stop': u'place-aqucl', u'time': 1391423392}, {u'stop': u'place-state', u'time': 1391423467}, {u'stop': u'place-gover', u'time': 1391423555}, {u'stop': u'place-bomnl', u'time': 1391423647}]}


In [333]:
# Each trip is defined via a nested json object with timestamps
# for stops made. Transform this into an event for each stop
events = []
for trip in raw_data:
    for stop in trip['stops']:
        event = {
            'line': trip['line'],
            'stop': stop['stop'],
            'trip': trip['trip'],
            'time': stop['time']
        }
        events.append(event)


In [334]:
stations = set()
for event in events:
    if event['line'] == 'red':
        stations.add(event['stop'])
print stations

set([u'place-chmnl', u'place-brdwy', u'place-asmnl', u'place-andrw', u'place-davis', u'place-cntsq', u'place-jfk', u'place-qnctr', u'place-dwnxg', u'place-smmnl', u'place-sstat', u'place-wlsta', u'place-brntn', u'place-portr', u'place-pktrm', u'place-alfcl', u'place-nqncy', u'place-qamnl', u'place-fldcr', u'place-shmnl', u'place-harsq', u'place-knncl'])


# Do this in pure pandas

In [222]:
df = pandas.DataFrame(events)
df['timestamp'] = df.time.apply(arrow.get)
df.head()

Unnamed: 0,line,stop,time,trip,timestamp
0,blue,place-wondl,1391422380,B98A375E5,2014-02-03T10:13:00+00:00
1,blue,place-rbmnl,1391422433,B98A375E5,2014-02-03T10:13:53+00:00
2,blue,place-bmmnl,1391422578,B98A375E5,2014-02-03T10:16:18+00:00
3,blue,place-sdmnl,1391422686,B98A375E5,2014-02-03T10:18:06+00:00
4,blue,place-orhte,1391422806,B98A375E5,2014-02-03T10:20:06+00:00


In [223]:
df = df.sort_values(['line', 'trip', 'time'])
df.head()

Unnamed: 0,line,stop,time,trip,timestamp
12,blue,place-orhte,1391422380,B98A375BB,2014-02-03T10:13:00+00:00
13,blue,place-wimnl,1391422560,B98A375BB,2014-02-03T10:16:00+00:00
14,blue,place-aport,1391422651,B98A375BB,2014-02-03T10:17:31+00:00
15,blue,place-mvbcl,1391422792,B98A375BB,2014-02-03T10:19:52+00:00
16,blue,place-aqucl,1391422926,B98A375BB,2014-02-03T10:22:06+00:00


In [224]:
df['timediff'] = df.groupby('trip')['time'].diff()
df['prev_stop'] = df.groupby('trip')['stop'].shift()
df.head()

Unnamed: 0,line,stop,time,trip,timestamp,timediff,prev_stop
12,blue,place-orhte,1391422380,B98A375BB,2014-02-03T10:13:00+00:00,,
13,blue,place-wimnl,1391422560,B98A375BB,2014-02-03T10:16:00+00:00,180.0,place-orhte
14,blue,place-aport,1391422651,B98A375BB,2014-02-03T10:17:31+00:00,91.0,place-wimnl
15,blue,place-mvbcl,1391422792,B98A375BB,2014-02-03T10:19:52+00:00,141.0,place-aport
16,blue,place-aqucl,1391422926,B98A375BB,2014-02-03T10:22:06+00:00,134.0,place-mvbcl


In [226]:
df[(df.timediff > 5 * 60.) & (df.stop == 'place-harsq') & (df.prev_stop == 'place-portr')]

Unnamed: 0,line,stop,time,trip,timestamp,timediff,prev_stop
15538,red,place-harsq,1391471870,R98338237,2014-02-03T23:57:50+00:00,333.0,place-portr
15576,red,place-harsq,1391472409,R98338251,2014-02-04T00:06:49+00:00,507.0,place-portr
14973,red,place-harsq,1391466980,R9833827D,2014-02-03T22:36:20+00:00,593.0,place-portr
15696,red,place-harsq,1391473748,R98338307,2014-02-04T00:29:08+00:00,309.0,place-portr
15043,red,place-harsq,1391467130,R98338349,2014-02-03T22:38:50+00:00,436.0,place-portr
15558,red,place-harsq,1391472032,R983383CE,2014-02-04T00:00:32+00:00,369.0,place-portr


# Create a stream and insert events

In [7]:
# Check what streams I have so far
sentenai.streams()

<type 'list'>


[Stream(name="mbta-blue"),
 Stream(name="weather-daily-boston"),
 Stream(name="test-stream"),
 Stream(name="mbta-orange"),
 Stream(name="mbta-vis"),
 Stream(name="mbta-red")]

In [27]:
mbta_stream = stream('mbta-vis')

In [None]:
# Make a single stream
mbta_stream = stream('mbta-vis')

for idx, event in enumerate(events):
    if idx % 500 == 0:
        print '%d of %d' % (idx, len(events))
    event_id = sentenai.put(mbta_stream, event, timestamp=arrow.get(event['time']))

In [28]:
# Make a stream for each line
for idx, event in enumerate(events):
    if idx % 500 == 0:
        print '%d of %d' % (idx, len(events))
    mbta_stream = stream('mbta-%s' % event['line'])
    event_id = sentenai.put(mbta_stream, event, timestamp=arrow.get(event['time']))

0 of 17718
500 of 17718
1000 of 17718
1500 of 17718
2000 of 17718
2500 of 17718
3000 of 17718
3500 of 17718
4000 of 17718
4500 of 17718
5000 of 17718
5500 of 17718
6000 of 17718
6500 of 17718
7000 of 17718
7500 of 17718
8000 of 17718
8500 of 17718
9000 of 17718
9500 of 17718
10000 of 17718
10500 of 17718
11000 of 17718
11500 of 17718
12000 of 17718
12500 of 17718
13000 of 17718
13500 of 17718
14000 of 17718
14500 of 17718
15000 of 17718
15500 of 17718
16000 of 17718
16500 of 17718
17000 of 17718
17500 of 17718


In [231]:
# Create a stream per trip
trips = set()

# Lets limit this to only a few dozen trips
trip_limit = 500
for idx, evt in enumerate(events):
    if idx % 1000 == 0:
        print '%d of %d' % (idx, len(events))
    if evt['line'] != 'red':
        continue
    accept_trip = (
        (len(trips) <= trip_limit) or 
        (len(trips) > trip_limit and evt['trip'] in trips)
    )
    if accept_trip:
        trips.add(evt['trip'])
        mbta_stream = stream('mbta-%s' % evt['trip'])
        event_id = sentenai.put(
            mbta_stream, 
            evt, 
            timestamp=arrow.get(evt['time'])
        )
        

0 of 17718
500 of 17718
1000 of 17718
1500 of 17718
2000 of 17718
2500 of 17718
3000 of 17718
3500 of 17718
4000 of 17718
4500 of 17718
5000 of 17718
5500 of 17718
6000 of 17718
6500 of 17718
7000 of 17718
7500 of 17718
8000 of 17718
8500 of 17718
9000 of 17718
9500 of 17718
10000 of 17718
10500 of 17718
11000 of 17718
11500 of 17718
12000 of 17718
12500 of 17718
13000 of 17718
13500 of 17718
14000 of 17718
14500 of 17718
15000 of 17718
15500 of 17718
16000 of 17718
16500 of 17718
17000 of 17718
17500 of 17718


# Journey to work.
In this example lets assume I live near the Porter Square subway stop and commute to Downtown Crossing in Boston for work. I don't like getting stuck on the subway and would like to find out how often my commute is longer than 20 minutes.

In [29]:
# Query a single stream to get all events in a single trip.
s = stream('mbta-R98338292')

sentenai.query(select(), returning={s: True}).dataframe(s)

SentenaiException: Something went wrong.

In [28]:
# Get all events in a trip if the trip is more than 20 minutes

s = stream('mbta-R983382C1')
query = select(
).span(s.stop == 'place-portr'
).then(s.stop == 'place-dwnxg', within=delta(minutes=20))
res = sentenai.query(
    query,
    returning={s: True}
).dataframe(s)
print query
res

SentenaiException: Something went wrong.

In [25]:
s

Stream(name="mbta-R983382C1")

In [24]:
# Now lets apply this query to all trip streams to find our
# which ones have long commutes.

def find_long_commutes(strm, duration=20):
    return sentenai.query(
        select(
        ).span(
            strm.stop == 'place-portr'
        ).then(
            strm.stop == 'place-dwnxg', within=delta(minutes=duration)
        ))

# Query all streams based on meta data to find the streams
# corresponding to redline trips. A regular expression is
# used.
streams = sentenai.streams(name=r'mbta-R.+')
print 'RedLine streams found: %d' % len(streams)

# Apply the query to all streams.
results = map(lambda x: find_long_commutes(x), [s])

<type 'list'>
RedLine streams found: 441


SentenaiException: Something went wrong.

In [None]:
# We can also write the same query using switch statements.
# Here we define the beginning and end of a span by switches between
# event states, or stations in this case.
s = stream('mbta-R983382C1')
transition1 = event(V.stop == 'place-davis') >> event(V.stop == 'place-portr')# >> event(V.stop == 'place-harsq')
transition2 = event(V.stop == 'place-pktrm') >> event(V.stop == 'place-dwnxg') >> event(V.stop == 'place-sstat')
transition3 = event(V.stop == 'place-davis') >> event(V.stop == 'place-portr') >> event(V.stop == 'place-harsq')
query = select(
).span(s(transition1), min=delta(minutes=1)
)#.then(s(transition2), within=delta(minutes=100))
res = sentenai.query(
    query,
    returning={s: True}
).dataframe(s)
print query
res

In [441]:
s = stream('mbta-red')
query = select(
).span(s.stop == 'place-davis'
).then(s.stop == 'place-sstat', max=delta(days=1))
res = sentenai.query(
    query,
    returning={s: True}
).dataframe(s)
print query
res

select
    s:stop == "place-davis";
    s:stop == "place-sstat" within  for at most 1d


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,line,stop,time,trip
.ts,.stream,.span,.delta,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2014-02-03 10:46:29+00:00,mbta-red,0,00:00:00,red,place-davis,1391424389,R9833829B
2014-02-03 10:47:06+00:00,mbta-red,0,00:00:37,red,place-sstat,1391424426,R983381D8
2014-02-03 10:53:06+00:00,mbta-red,1,00:00:00,red,place-davis,1391424786,R983381D0
2014-02-03 10:53:08+00:00,mbta-red,1,00:00:02,red,place-sstat,1391424788,R983382D4
2014-02-03 10:55:19+00:00,mbta-red,2,00:00:00,red,place-davis,1391424919,R983383B4
2014-02-03 10:55:29+00:00,mbta-red,2,00:00:10,red,place-sstat,1391424929,R98338200
2014-02-03 11:46:31+00:00,mbta-red,3,00:00:00,red,place-davis,1391427991,R9833822D
2014-02-03 11:46:37+00:00,mbta-red,3,00:00:06,red,place-sstat,1391427997,R9833835F
2014-02-03 12:05:04+00:00,mbta-red,4,00:00:00,red,place-davis,1391429104,R9833834B
2014-02-03 12:05:15+00:00,mbta-red,4,00:00:11,red,place-sstat,1391429115,R983382B6


In [10]:
# Query all stream meta data to find only the streams
# corresponding to the red line trips.
streams = sentenai.streams(name=r'mbta-R.+')[:10] #sentenai.streams(name=r'mbta-[A-Z].+')
print len(streams), streams[:10]

<type 'list'>
10 [Stream(name="mbta-R9833839A"), Stream(name="mbta-R983382C1"), Stream(name="mbta-R9833839B"), Stream(name="mbta-R9833838A"), Stream(name="mbta-R983382FF"), Stream(name="mbta-R9833827E"), Stream(name="mbta-R9833839C"), Stream(name="mbta-R9833838D"), Stream(name="mbta-R983382A6"), Stream(name="mbta-R9833826B")]


In [256]:
def find_delays(strm):
    '''In this case `after` will key off of the state of the stream change.
    So the stream changes from porter to harvard starting the after clock at
    the end of the porter window not when the train first arrives at porter'''
    return sentenai.query(
        select(
        ).span(
            strm.stop == 'place-portr', min=delta(minutes=5)
        ).then(
            strm.stop == 'place-harsq'
        ))

results = map(lambda x: find_delays(x), streams)

In [257]:
dfs = pandas.concat([r.dataframe(s) for r, s in zip(results, streams)], axis=0)
dfs.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,line,stop,time,trip
.ts,.stream,.span,.delta,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2014-02-04 00:23:59+00:00,mbta-R98338307,0,00:00:00,red,place-portr,1391473439,R98338307
2014-02-04 00:29:08+00:00,mbta-R98338307,0,00:05:09,red,place-harsq,1391473748,R98338307
2014-02-03 23:52:17+00:00,mbta-R98338237,0,00:00:00,red,place-portr,1391471537,R98338237
2014-02-03 23:57:50+00:00,mbta-R98338237,0,00:05:33,red,place-harsq,1391471870,R98338237
2014-02-03 22:31:34+00:00,mbta-R98338349,0,00:00:00,red,place-portr,1391466694,R98338349


In [11]:
def find_long_commutes(strm):
    return sentenai.query(
        select(
        ).span(
            strm.stop == 'place-portr' # find a state change from davis which will be porter
        ).then(
            strm.stop == 'place-dwnxg', within=delta(minutes=20) # look for changes to DTX more than 20 minutes after not being davis anymore
        ))

results = map(lambda x: find_long_commutes(x), streams)

In [12]:
dfs = pandas.concat([r.dataframe(s) for r, s in zip(results, streams)], axis=0)
dfs.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,line,stop,time,trip
.ts,.stream,.span,.delta,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2014-02-03 20:59:53+00:00,mbta-R983382C1,0,00:00:00,red,place-portr,1391461193,R983382C1
2014-02-03 21:02:45+00:00,mbta-R983382C1,0,00:02:52,red,place-harsq,1391461365,R983382C1
2014-02-03 21:06:19+00:00,mbta-R983382C1,0,00:06:26,red,place-cntsq,1391461579,R983382C1
2014-02-03 21:09:07+00:00,mbta-R983382C1,0,00:09:14,red,place-knncl,1391461747,R983382C1
2014-02-03 21:13:34+00:00,mbta-R983382C1,0,00:13:41,red,place-chmnl,1391462014,R983382C1


In [13]:
dfs.trip.unique().shape

(4,)

# What Won't Work

In [322]:
mbta_stream = stream('mbta-red')

In [22]:
# This doesn't work because it doesn't group things within
# the same trip
query = select(
).span(
    mbta_stream.stop == 'place-portr'
).then(
    mbta_stream.stop == 'place-harsq',
)

print query

select
    mbta_stream:stop == "place-portr";
    mbta_stream:stop == "place-harsq" within 


In [323]:
# Lets try a switch statement
transition = event(V.stop == 'place-davis') >> event(V.stop == 'place-portr') >> event(V.stop == 'place-harsq')

query = select(
).span(
    mbta_stream(transition)
).then(
    mbta_stream.stop == 'place-cntsq', after=delta(minutes=2)
)
print query

select
    mbta_stream:(stop == "place-davis" -> stop == "place-portr" -> stop == "place-harsq");
    mbta_stream:stop == "place-cntsq" after 2m


In [86]:
#
query = select(
).span(
    mbta_stream.stop == 'place-portr'
).then(
    mbta_stream.stop == 'place-harsq', after=delta(minutes=2), within=delta(minutes=20)
)
print query

select
    mbta_stream:stop == "place-portr";
    mbta_stream:stop == "place-harsq" after 2m within 20m


In [324]:
mbta_stream = stream('mbta-red')
results = sentenai.query(query,
    returning = {mbta_stream: True}
)
print results.stats()

{}


In [150]:
query = select(
).span(
    (s1.stop == 'place-harsq') | (s2.stop == 'place-harsq')
).then(
    (s1.stop == 'place-cntsq') | (s2.stop == 'place-cntsq')
)

results = sentenai.query(
    query,
    returning={s1: True, s2: True}
)

In [151]:
df = results.dataframe()

In [152]:
pandas.concat(df.values())

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,line,stop,time,trip
.ts,.stream,.span,.delta,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2014-02-03 10:23:06+00:00,mbta-R98338241,0,00:00:00,red,place-harsq,1391422986,R98338241
2014-02-03 10:27:09+00:00,mbta-R98338241,0,00:04:03,red,place-cntsq,1391423229,R98338241
2014-02-03 10:35:49+00:00,mbta-R98338241,1,00:00:00,red,place-dwnxg,1391423749,R98338241
2014-02-03 10:37:55+00:00,mbta-R98338241,1,00:02:06,red,place-sstat,1391423875,R98338241
2014-02-03 10:35:34+00:00,mbta-R98338254,1,00:00:00,red,place-harsq,1391423734,R98338254
2014-02-03 10:39:26+00:00,mbta-R98338254,1,00:03:52,red,place-cntsq,1391423966,R98338254


In [41]:
c = s.stop > 2
print type(c)

<class 'sentenai.flare.Cond'>


In [43]:
from sentenai import flare

In [44]:
flare.Cond(s.stop, '>', 2)

(stream "mbta-R983382C1"):stop > 2

In [48]:
s = flare.Stream('hey', {})
s('hey')

TypeError: ("A stream should not be called with <type 'str'>", AttributeError("'str' object has no attribute '_bind'",))

In [53]:
flare.EventPath(['lol', 'bar'])

lol.bar

In [54]:
import time, datetime

In [None]:
datetime.timedelta()