Skip to content

Commit

Permalink
Graph tests, updates to summarizer functions (#11)
Browse files Browse the repository at this point in the history
* clean up masks

* parameterize the walk speed

* more tests on the edges of the graph

* typing everywhere

* add some comments, drop out a dataframe append loop

* add some comments, drop out a dataframe append loop

* add typehints

* graph empty create test

* graph empty create test

* drop tuples on list extend

* add check about list lengths

* add test to check summary edge output, comments
  • Loading branch information
kuanb committed Dec 21, 2017
1 parent 3bfc3f4 commit e1ee37d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 24 deletions.
75 changes: 52 additions & 23 deletions peartree/summarizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .utilities import log


def calculate_average_wait(direction_times: List) -> float:
def calculate_average_wait(direction_times: pd.DataFrame) -> float:
first = direction_times.arrival_time[1:].values
second = direction_times.arrival_time[:-1].values
wait_seconds = (first - second)
Expand Down Expand Up @@ -47,14 +47,17 @@ def generate_wait_times(trips_and_stop_times: pd.DataFrame

def generate_all_observed_edge_costs(trips_and_stop_times: pd.DataFrame
) -> Union[None, pd.DataFrame]:
all_edge_costs = None
all_edge_costs = []
all_from_stop_ids = []
all_to_stop_ids = []
for trip_id in trips_and_stop_times.trip_id.unique():
tst_mask = (trips_and_stop_times.trip_id == trip_id)
tst_sub = trips_and_stop_times[tst_mask]

# Just in case both directions are under the same trip id
for direction in [0, 1]:
tst_sub_dir = tst_sub[tst_sub.direction_id == direction]
dir_mask = (tst_sub.direction_id == direction)
tst_sub_dir = tst_sub[dir_mask]

tst_sub_dir = tst_sub_dir.sort_values('stop_sequence')
deps = tst_sub_dir.departure_time[:-1]
Expand All @@ -63,20 +66,33 @@ def generate_all_observed_edge_costs(trips_and_stop_times: pd.DataFrame
# Use .values to strip existing indices
edge_costs = np.subtract(arrs.values, deps.values)

# Now place results in data frame
new_edges = pd.DataFrame({
'edge_cost': edge_costs,
'from_stop_id': tst_sub_dir.stop_id[:-1].values,
'to_stop_id': tst_sub_dir.stop_id[1:].values})
# Add each resulting list to the running array totals
all_edge_costs += list(edge_costs)

fr_ids = tst_sub_dir.stop_id[:-1].values
all_from_stop_ids += list(fr_ids)

to_ids = tst_sub_dir.stop_id[1:].values
all_to_stop_ids += list(to_ids)

if all_edge_costs is None:
all_edge_costs = new_edges
elif not new_edges.empty:
all_edge_costs = all_edge_costs.append(new_edges)
return all_edge_costs
# Only return a dataframe if there is contents to populate
# it with
if len(all_edge_costs) > 0:
# Now place results in data frame
return pd.DataFrame({
'edge_cost': all_edge_costs,
'from_stop_id': all_from_stop_ids,
'to_stop_id': all_to_stop_ids})

# Otherwise a None value should be returned
else:
return None


def summarize_edge_costs(df: pd.DataFrame) -> pd.DataFrame:
# Used as a function applied to a grouping
# operation, pulls out the mean edge cost for each
# unqiue edge pair (from node and to node)
from_stop_id = df.from_stop_id.values[0]
results_mtx = []
for to_stop_id in df.to_stop_id.unique():
Expand All @@ -89,18 +105,26 @@ def summarize_edge_costs(df: pd.DataFrame) -> pd.DataFrame:


def generate_summary_edge_costs(all_edge_costs: pd.DataFrame) -> pd.DataFrame:
# Given a dataframe of edges costs, get the average for each
# from node - to node pair
summary_groupings = all_edge_costs.groupby('from_stop_id')
summary = summary_groupings.apply(summarize_edge_costs)
summary = summary.reset_index(drop=True)
return summary


def summarize_waits_at_one_stop(stop_df: pd.DataFrame) -> float:
# Calculate average wait time at this stop, given all observed
# wait times
divide_by = len(stop_df) * 2
# Calculates average wait time at this stop, given all observed
# TODO: Simply dividiing by two may not be appropriate - it is
# go for estimation purposes, but I could introduce
# more sophisticated wait time calculations here
divide_by = (len(stop_df) * 2)
dir_0_sum = stop_df.wait_dir_0.sum()
dir_1_sum = stop_df.wait_dir_1.sum()

# A weighted average is performed, which could inaccurately8
# portrary a wait time at a given stop if one direction has
# significantly higher frequence than another
calculated = ((dir_0_sum + dir_1_sum) / divide_by)

return calculated
Expand Down Expand Up @@ -184,6 +208,8 @@ def generate_summary_wait_times(df: pd.DataFrame) -> pd.DataFrame:
log('Original stop id count: {}'.format(len(init_of_stop_ids)))
log('After cleaning stop id count: {}'.format(len(end_of_stop_ids)))

# Check for the presence of any unresolved stop ids and
# assign them some value boarding cost
if len(init_of_stop_ids) > len(end_of_stop_ids):
a = set(list(init_of_stop_ids))
b = set(list(end_of_stop_ids))
Expand All @@ -193,10 +219,10 @@ def generate_summary_wait_times(df: pd.DataFrame) -> pd.DataFrame:

# TODO: Perhaps these are start/end stops and should adopt
# a cost that is "average" for that route?
# We should think of how to actually do this
# because we do not have enough data, for now let's
# just assign some default high cost connection value
# to these stops
# I should think of how to actually do this
# because we do not have enough data, for now let's
# just assign some default high cost connection value
# to these stops
sids = list(summed_reset.stop_id)
acst = list(summed_reset.avg_cost)
for i in unresolved_ids:
Expand All @@ -216,8 +242,10 @@ def generate_edge_and_wait_values(feed: ptg.gtfs.feed,
all_wait_times = None
for i, route in feed.routes.iterrows():
log('Processing on route {}.'.format(route.route_id))
# Now get all the trips for that route
trips = feed.trips[feed.trips.route_id == route.route_id]

# Get all the subset of trips that are related to this route
route_match_mask = (feed.trips.route_id == route.route_id)
trips = feed.trips[route_match_mask]

# Get just the stop times related to this trip
st_trip_id_mask = feed.stop_times.trip_id.isin(trips.trip_id)
Expand Down Expand Up @@ -273,6 +301,7 @@ def generate_edge_and_wait_values(feed: ptg.gtfs.feed,
'wait_dir_0',
'wait_dir_1']]

# Add to the running total for wait times in this feed subset
if all_wait_times is None:
all_wait_times = tst_sub
else:
Expand All @@ -281,7 +310,7 @@ def generate_edge_and_wait_values(feed: ptg.gtfs.feed,
# Get all edge costs for this route and add to the running total
edge_costs = generate_all_observed_edge_costs(trips_and_stop_times)

# Add to the running total
# Add to the running total in this feed subset
if all_edge_costs is None:
all_edge_costs = edge_costs
else:
Expand Down
39 changes: 38 additions & 1 deletion tests/test_graph.py
Original file line number Diff line number Diff line change
@@ -1 +1,38 @@
import pytest
import os

from peartree.graph import (generate_empty_md_graph,
generate_summary_graph_elements)
from peartree.paths import get_representative_feed


def fixture(filename):
return os.path.join(os.path.dirname(__file__), 'fixtures', filename)


def test_generate_empty_graph():
G = generate_empty_md_graph('foo')
assert len(G.edges()) == 0
assert len(G.nodes()) == 0


def test_generate_summary_graph_elements():
path_1 = fixture('caltrain-2017-07-24.zip')
feed_1 = get_representative_feed(path_1)

start = 7 * 60 * 60
end = 10 * 60 * 60

(summary_edge_costs,
wait_times_by_stop) = generate_summary_graph_elements(feed_1, start, end)

# Ensure that the summary edge cost dataframe looks as it should
ec_cols = ['edge_cost', 'from_stop_id', 'to_stop_id']
for c in ec_cols:
assert c in summary_edge_costs.columns

# Make sure that all edges are unique - there are no duplicated
# in the returned edge dataframe (each should be its own summary)
f = summary_edge_costs.from_stop_id
t = summary_edge_costs.to_stop_id
z = list(zip(f, t))
assert len(list(set(z))) == len(z)

0 comments on commit e1ee37d

Please sign in to comment.