Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter to fill in stop times from schedules #243

Merged
merged 6 commits into from Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -7,7 +7,7 @@ single output files.

## Configuration

Concentrate can either be configured via `config/config.exs` or a JSON environment variable as `CONCENTRATE_JSON`: more details are available in [configuration.md](doc/configuration.md).
Concentrate can either be configured via `config/config.exs` or a JSON environment variable as `CONCENTRATE_JSON`: more details are available in [configuration.md](guides/configuration.md).

## Architecture

Expand Down
7 changes: 7 additions & 0 deletions config/config.exs
Expand Up @@ -4,6 +4,8 @@ use Mix.Config

config :logger, level: :debug

config :elixir, :time_zone_database, Tzdata.TimeZoneDatabase

config :ex_aws, json_codec: Jason

# per https://github.com/edgurgel/httpoison/issues/130, set the SSL version to pick a better default
Expand All @@ -30,6 +32,11 @@ config :concentrate,
Concentrate.Filter.IncludeStopID
],
group_filters: [
{
Concentrate.GroupFilter.ScheduledStopTimes,
# https://github.com/mbta/commuter_rail_boarding/blob/79a493f/config/config.exs#L34-L63
on_time_statuses: ["All aboard", "Now boarding", "On time", "On Time"]
},
Concentrate.GroupFilter.TimeOutOfRange,
Concentrate.GroupFilter.RemoveUnneededTimes,
Concentrate.GroupFilter.Shuttle,
Expand Down
4 changes: 4 additions & 0 deletions config/test.exs
Expand Up @@ -4,4 +4,8 @@ config :logger,
level: :info,
backends: []

config :concentrate, :group_filters, [
{Concentrate.GroupFilter.ScheduledStopTimes, on_time_statuses: ["on time"]}
]

config :concentrate, :sink_s3, ex_aws: Concentrate.TestExAws
87 changes: 0 additions & 87 deletions lib/concentrate/filter/gtfs/pickup_drop_off.ex

This file was deleted.

65 changes: 0 additions & 65 deletions lib/concentrate/filter/gtfs/stop_ids.ex

This file was deleted.

2 changes: 1 addition & 1 deletion lib/concentrate/filter/include_route_direction.ex
Expand Up @@ -3,7 +3,7 @@ defmodule Concentrate.Filter.IncludeRouteDirection do
Adds route/direction ID for TripUpdates.
"""
@behaviour Concentrate.Filter
alias Concentrate.Filter.GTFS.Trips
alias Concentrate.GTFS.Trips
alias Concentrate.TripDescriptor

@impl Concentrate.Filter
Expand Down
12 changes: 6 additions & 6 deletions lib/concentrate/filter/include_stop_id.ex
Expand Up @@ -3,28 +3,28 @@ defmodule Concentrate.Filter.IncludeStopID do
Adds missing stop IDs to StopTimeUpdates.
"""
@behaviour Concentrate.Filter
alias Concentrate.Filter.GTFS.StopIDs
alias Concentrate.GTFS.StopTimes
alias Concentrate.StopTimeUpdate

@impl Concentrate.Filter
def filter(item, stop_ids \\ StopIDs)
def filter(item, stop_times \\ StopTimes)

def filter(%StopTimeUpdate{} = stu, stop_ids) do
def filter(%StopTimeUpdate{} = stu, stop_times) do
{:cont,
maybe_add_stop_id(
stu,
StopTimeUpdate.stop_id(stu),
StopTimeUpdate.trip_id(stu),
StopTimeUpdate.stop_sequence(stu),
stop_ids
stop_times
)}
end

def filter(other, _stop_ids), do: {:cont, other}

defp maybe_add_stop_id(stu, nil, trip_id, stop_sequence, stop_ids)
defp maybe_add_stop_id(stu, nil, trip_id, stop_sequence, stop_times)
when is_binary(trip_id) and is_integer(stop_sequence) do
case stop_ids.stop_id(trip_id, stop_sequence) do
case stop_times.stop_id(trip_id, stop_sequence) do
:unknown -> stu
stop_id -> StopTimeUpdate.update_stop_id(stu, stop_id)
end
Expand Down
36 changes: 11 additions & 25 deletions lib/concentrate/group_filter/remove_unneeded_times.ex
Expand Up @@ -2,17 +2,17 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimes do
@moduledoc """
Removes arrival times from the first stop on a trip, and the departure time from the last stop on a trip.
"""
alias Concentrate.Filter.GTFS.PickupDropOff
alias Concentrate.GTFS.StopTimes
alias Concentrate.{StopTimeUpdate, TripDescriptor}
@behaviour Concentrate.GroupFilter

@impl Concentrate.GroupFilter
def filter(trip_group, module \\ PickupDropOff)
def filter(trip_group, stop_times \\ StopTimes)

def filter({%TripDescriptor{} = td, vps, stus} = group, module) do
def filter({%TripDescriptor{} = td, vps, stus} = group, stop_times) do
if TripDescriptor.schedule_relationship(td) == :SCHEDULED do
trip_id = TripDescriptor.trip_id(td)
stus = ensure_all_correct_times(stus, module, trip_id)
stus = ensure_all_correct_times(stus, stop_times, trip_id)
{td, vps, stus}
else
group
Expand All @@ -21,32 +21,20 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimes do

def filter(other, _module), do: other

defp ensure_all_correct_times([_ | _] = stus, module, trip_id) do
defp ensure_all_correct_times([_ | _] = stus, stop_times, trip_id) do
[last | rest] = Enum.reverse(stus)
last = ensure_correct_times_for_last_stu(last, module, trip_id)
rest = Enum.map(rest, &ensure_correct_times(&1, module, trip_id))
last = ensure_correct_times_for_last_stu(last, stop_times, trip_id)
rest = Enum.map(rest, &ensure_correct_times(&1, stop_times, trip_id))
Enum.reverse(rest, [last])
end

defp ensure_all_correct_times([], _, _) do
[]
end

defp stop_sequence_or_stop_id(stu) do
case StopTimeUpdate.stop_sequence(stu) do
sequence when is_integer(sequence) ->
sequence

_ ->
StopTimeUpdate.stop_id(stu)
end
end

defp ensure_correct_times_for_last_stu(stu, module, trip_id) do
defp ensure_correct_times_for_last_stu(stu, stop_times, trip_id) do
# we only remove the departure time from the last stop (excepting SKIPPED stops)
key = stop_sequence_or_stop_id(stu)

case module.pickup_drop_off(trip_id, key) do
case stop_times.pick_up_drop_off(trip_id, StopTimeUpdate.stop_sequence(stu)) do
{true, true} ->
ensure_both_times(stu)

Expand All @@ -65,10 +53,8 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimes do
end
end

defp ensure_correct_times(stu, module, trip_id) do
key = stop_sequence_or_stop_id(stu)

case module.pickup_drop_off(trip_id, key) do
defp ensure_correct_times(stu, stop_times, trip_id) do
case stop_times.pick_up_drop_off(trip_id, StopTimeUpdate.stop_sequence(stu)) do
{_, true} ->
ensure_both_times(stu)

Expand Down
78 changes: 78 additions & 0 deletions lib/concentrate/group_filter/scheduled_stop_times.ex
@@ -0,0 +1,78 @@
defmodule Concentrate.GroupFilter.ScheduledStopTimes do
@moduledoc """
Uses the static GTFS schedule to fill in missing arrival/departure times on stop time updates
that have specific `status` values.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: should this note that it needs to come before RemoveUnneededTimes in the filter list, or not include arrival/departure times in cases when we'd drop them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to, strictly speaking. It does need to if you want RemoveUnneededTimes to remove times it generates, but I think that's a logical consequence of its position in the filter list. If it's generally understood that filters are run in the order they're configured, I think we don't have to specifically call it out here. (Maybe we do want to call that out somewhere in the general documentation?)


The desired status values must be set in app configuration at compile time. For example:

config :concentrate,
group_filters: [
{
Concentrate.GroupFilter.ScheduledStopTimes,
on_time_statuses: ["status 1", "status 2", "status 3"]
}
]

If no status values are configured, enabling this filter has no effect.
"""
@behaviour Concentrate.GroupFilter
alias Concentrate.{StopTimeUpdate, TripDescriptor}, warn: false

@impl Concentrate.GroupFilter
def filter(trip_group, gtfs_stop_times \\ Concentrate.GTFS.StopTimes)

config_path = [:group_filters, __MODULE__, :on_time_statuses]
@on_time_statuses Application.compile_env(:concentrate, config_path, [])

if @on_time_statuses == [] do
def filter(trip_group, _), do: trip_group
else
def filter({trip_descriptor, vehicle_positions, [_ | _] = stop_time_updates}, gtfs_stop_times)
when not is_nil(trip_descriptor) do
{
trip_descriptor,
vehicle_positions,
filter_stop_time_updates(
stop_time_updates,
TripDescriptor.start_date(trip_descriptor),
gtfs_stop_times
)
}
end

def filter(trip_group, _), do: trip_group

defp filter_stop_time_updates(stop_time_updates, nil, _), do: stop_time_updates

defp filter_stop_time_updates(stop_time_updates, trip_date, gtfs_stop_times) do
Enum.map(stop_time_updates, fn stop_time_update ->
fill_in_arrival_departure(
stop_time_update,
trip_date,
StopTimeUpdate.arrival_time(stop_time_update),
StopTimeUpdate.departure_time(stop_time_update),
StopTimeUpdate.status(stop_time_update),
gtfs_stop_times
)
end)
end

defp fill_in_arrival_departure(stop_time_update, trip_date, nil, nil, status, gtfs_stop_times)
when status in @on_time_statuses do
trip_id = StopTimeUpdate.trip_id(stop_time_update)
stop_sequence = StopTimeUpdate.stop_sequence(stop_time_update)

case gtfs_stop_times.arrival_departure(trip_id, stop_sequence, trip_date) do
{arrival_time, departure_time} ->
stop_time_update
|> StopTimeUpdate.update_arrival_time(arrival_time)
|> StopTimeUpdate.update_departure_time(departure_time)

:unknown ->
stop_time_update
end
end

defp fill_in_arrival_departure(stop_time_update, _, _, _, _, _), do: stop_time_update
end
end
2 changes: 1 addition & 1 deletion lib/concentrate/group_filter/vehicle_stop_match.ex
Expand Up @@ -8,7 +8,7 @@ defmodule Concentrate.GroupFilter.VehicleStopMatch do
of the StopTimeUpdate with the same stop sequence (if they share a parent).
"""
@behaviour Concentrate.GroupFilter
alias Concentrate.Filter.GTFS.Stops
alias Concentrate.GTFS.Stops
alias Concentrate.{StopTimeUpdate, VehiclePosition}

@impl Concentrate.GroupFilter
Expand Down