diff --git a/README.md b/README.md index e2c9b2f4..e9b114df 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ single output files. ## Configuration -Concentrate can either be configured via `config/config.exs` or a JSON environment variable as `CONCENTRATE_JSON`: more details are available in [configuration.md](doc/configuration.md). +Concentrate can either be configured via `config/config.exs` or a JSON environment variable as `CONCENTRATE_JSON`: more details are available in [configuration.md](guides/configuration.md). ## Architecture diff --git a/config/config.exs b/config/config.exs index 4e946d78..7c7285de 100644 --- a/config/config.exs +++ b/config/config.exs @@ -4,6 +4,8 @@ use Mix.Config config :logger, level: :debug +config :elixir, :time_zone_database, Tzdata.TimeZoneDatabase + config :ex_aws, json_codec: Jason # per https://github.com/edgurgel/httpoison/issues/130, set the SSL version to pick a better default @@ -30,6 +32,11 @@ config :concentrate, Concentrate.Filter.IncludeStopID ], group_filters: [ + { + Concentrate.GroupFilter.ScheduledStopTimes, + # https://github.com/mbta/commuter_rail_boarding/blob/79a493f/config/config.exs#L34-L63 + on_time_statuses: ["All aboard", "Now boarding", "On time", "On Time"] + }, Concentrate.GroupFilter.TimeOutOfRange, Concentrate.GroupFilter.RemoveUnneededTimes, Concentrate.GroupFilter.Shuttle, diff --git a/config/test.exs b/config/test.exs index b2205c2c..2ab88138 100644 --- a/config/test.exs +++ b/config/test.exs @@ -4,4 +4,8 @@ config :logger, level: :info, backends: [] +config :concentrate, :group_filters, [ + {Concentrate.GroupFilter.ScheduledStopTimes, on_time_statuses: ["on time"]} +] + config :concentrate, :sink_s3, ex_aws: Concentrate.TestExAws diff --git a/lib/concentrate/filter/gtfs/pickup_drop_off.ex b/lib/concentrate/filter/gtfs/pickup_drop_off.ex deleted file mode 100644 index 4c3dc447..00000000 --- a/lib/concentrate/filter/gtfs/pickup_drop_off.ex +++ /dev/null @@ -1,87 +0,0 @@ -defmodule Concentrate.Filter.GTFS.PickupDropOff do - @moduledoc """ - Server which knows whether riders can be picked up or dropped off at a stop. - """ - use GenStage - alias Concentrate.Filter.GTFS.Helpers - require Logger - import :binary, only: [copy: 1] - @table __MODULE__ - - def start_link(opts) do - GenStage.start_link(__MODULE__, opts, name: __MODULE__) - end - - @spec pickup_drop_off(String.t(), String.t() | non_neg_integer) :: {boolean, boolean} | :unknown - def pickup_drop_off(trip_id, stop_or_stop_sequence) when is_binary(trip_id) do - find_value({trip_id, stop_or_stop_sequence}) - end - - defp find_value(key) do - case :ets.lookup(@table, key) do - [{_, value}] -> value - [] -> :unknown - end - rescue - ArgumentError -> :unknown - end - - @impl GenStage - def init(opts) do - @table = :ets.new(@table, [:named_table, :public, :set]) - {:consumer, [], opts} - end - - @impl GenStage - def handle_events(events, _from, state) do - count = - events - |> List.flatten() - |> Stream.flat_map(fn - {"stop_times.txt", body} -> - Helpers.io_stream(body) - - _ -> - [] - end) - |> CSV.decode(headers: true, num_workers: System.schedulers()) - |> Stream.flat_map(&build_inserts/1) - |> Enum.reduce(0, fn insert, acc -> - if acc == 0 do - true = :ets.delete_all_objects(@table) - end - - :ets.insert(@table, insert) - acc + 1 - end) - - _ = - if count > 0 do - Logger.info(fn -> - "#{__MODULE__}: updated with #{count} records" - end) - end - - {:noreply, [], state, :hibernate} - end - - defp can_pickup_drop_off?("1"), do: false - defp can_pickup_drop_off?(_), do: true - - defp build_inserts({:error, _}) do - [] - end - - defp build_inserts({:ok, row}) do - trip_id = copy(Map.get(row, "trip_id")) - stop_id = copy(Map.get(row, "stop_id")) - stop_sequence = String.to_integer(Map.get(row, "stop_sequence")) - - pickup? = can_pickup_drop_off?(Map.get(row, "pickup_type")) - drop_off? = can_pickup_drop_off?(Map.get(row, "drop_off_type")) - - for stop_key <- [stop_id, stop_sequence] do - {{trip_id, stop_key}, {pickup?, drop_off?}} - end - end -end diff --git a/lib/concentrate/filter/gtfs/stop_ids.ex b/lib/concentrate/filter/gtfs/stop_ids.ex deleted file mode 100644 index 6effa854..00000000 --- a/lib/concentrate/filter/gtfs/stop_ids.ex +++ /dev/null @@ -1,65 +0,0 @@ -defmodule Concentrate.Filter.GTFS.StopIDs do - @moduledoc """ - Server which knows the stop ID for a given trip ID and stop sequence. - """ - use GenStage - alias Concentrate.Filter.GTFS.Helpers - require Logger - import :binary, only: [copy: 1] - @table __MODULE__ - - def start_link(opts) do - GenStage.start_link(__MODULE__, opts, name: __MODULE__) - end - - @spec stop_id(String.t(), non_neg_integer) :: String.t() | :unknown - def stop_id(trip_id, stop_sequence) do - case :ets.lookup(@table, {trip_id, stop_sequence}) do - [{_, value}] -> value - [] -> :unknown - end - rescue - ArgumentError -> :unknown - end - - @impl GenStage - def init(opts) do - @table = :ets.new(@table, [:named_table, :public, :set]) - {:consumer, [], opts} - end - - @impl GenStage - def handle_events(events, _from, state) do - inserts = - events - |> List.flatten() - |> Stream.flat_map(fn - {"stop_times.txt", body} -> - Helpers.io_stream(body) - - _ -> - [] - end) - |> CSV.decode(headers: true, num_workers: System.schedulers()) - |> Enum.map(&build_insert/1) - - if inserts != [] do - true = :ets.delete_all_objects(@table) - :ets.insert(@table, inserts) - - Logger.info(fn -> - "#{__MODULE__}: updated with #{length(inserts)} records" - end) - end - - {:noreply, [], state, :hibernate} - end - - defp build_insert({:ok, row}) do - trip_id = copy(Map.get(row, "trip_id")) - stop_id = copy(Map.get(row, "stop_id")) - stop_sequence = String.to_integer(Map.get(row, "stop_sequence")) - - {{trip_id, stop_sequence}, stop_id} - end -end diff --git a/lib/concentrate/filter/include_route_direction.ex b/lib/concentrate/filter/include_route_direction.ex index e0af2d81..eada4737 100644 --- a/lib/concentrate/filter/include_route_direction.ex +++ b/lib/concentrate/filter/include_route_direction.ex @@ -3,7 +3,7 @@ defmodule Concentrate.Filter.IncludeRouteDirection do Adds route/direction ID for TripUpdates. """ @behaviour Concentrate.Filter - alias Concentrate.Filter.GTFS.Trips + alias Concentrate.GTFS.Trips alias Concentrate.TripDescriptor @impl Concentrate.Filter diff --git a/lib/concentrate/filter/include_stop_id.ex b/lib/concentrate/filter/include_stop_id.ex index 874fd585..9f0fda24 100644 --- a/lib/concentrate/filter/include_stop_id.ex +++ b/lib/concentrate/filter/include_stop_id.ex @@ -3,28 +3,28 @@ defmodule Concentrate.Filter.IncludeStopID do Adds missing stop IDs to StopTimeUpdates. """ @behaviour Concentrate.Filter - alias Concentrate.Filter.GTFS.StopIDs + alias Concentrate.GTFS.StopTimes alias Concentrate.StopTimeUpdate @impl Concentrate.Filter - def filter(item, stop_ids \\ StopIDs) + def filter(item, stop_times \\ StopTimes) - def filter(%StopTimeUpdate{} = stu, stop_ids) do + def filter(%StopTimeUpdate{} = stu, stop_times) do {:cont, maybe_add_stop_id( stu, StopTimeUpdate.stop_id(stu), StopTimeUpdate.trip_id(stu), StopTimeUpdate.stop_sequence(stu), - stop_ids + stop_times )} end def filter(other, _stop_ids), do: {:cont, other} - defp maybe_add_stop_id(stu, nil, trip_id, stop_sequence, stop_ids) + defp maybe_add_stop_id(stu, nil, trip_id, stop_sequence, stop_times) when is_binary(trip_id) and is_integer(stop_sequence) do - case stop_ids.stop_id(trip_id, stop_sequence) do + case stop_times.stop_id(trip_id, stop_sequence) do :unknown -> stu stop_id -> StopTimeUpdate.update_stop_id(stu, stop_id) end diff --git a/lib/concentrate/group_filter/remove_unneeded_times.ex b/lib/concentrate/group_filter/remove_unneeded_times.ex index 1daf0cff..739f4450 100644 --- a/lib/concentrate/group_filter/remove_unneeded_times.ex +++ b/lib/concentrate/group_filter/remove_unneeded_times.ex @@ -2,17 +2,17 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimes do @moduledoc """ Removes arrival times from the first stop on a trip, and the departure time from the last stop on a trip. """ - alias Concentrate.Filter.GTFS.PickupDropOff + alias Concentrate.GTFS.StopTimes alias Concentrate.{StopTimeUpdate, TripDescriptor} @behaviour Concentrate.GroupFilter @impl Concentrate.GroupFilter - def filter(trip_group, module \\ PickupDropOff) + def filter(trip_group, stop_times \\ StopTimes) - def filter({%TripDescriptor{} = td, vps, stus} = group, module) do + def filter({%TripDescriptor{} = td, vps, stus} = group, stop_times) do if TripDescriptor.schedule_relationship(td) == :SCHEDULED do trip_id = TripDescriptor.trip_id(td) - stus = ensure_all_correct_times(stus, module, trip_id) + stus = ensure_all_correct_times(stus, stop_times, trip_id) {td, vps, stus} else group @@ -21,10 +21,10 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimes do def filter(other, _module), do: other - defp ensure_all_correct_times([_ | _] = stus, module, trip_id) do + defp ensure_all_correct_times([_ | _] = stus, stop_times, trip_id) do [last | rest] = Enum.reverse(stus) - last = ensure_correct_times_for_last_stu(last, module, trip_id) - rest = Enum.map(rest, &ensure_correct_times(&1, module, trip_id)) + last = ensure_correct_times_for_last_stu(last, stop_times, trip_id) + rest = Enum.map(rest, &ensure_correct_times(&1, stop_times, trip_id)) Enum.reverse(rest, [last]) end @@ -32,21 +32,9 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimes do [] end - defp stop_sequence_or_stop_id(stu) do - case StopTimeUpdate.stop_sequence(stu) do - sequence when is_integer(sequence) -> - sequence - - _ -> - StopTimeUpdate.stop_id(stu) - end - end - - defp ensure_correct_times_for_last_stu(stu, module, trip_id) do + defp ensure_correct_times_for_last_stu(stu, stop_times, trip_id) do # we only remove the departure time from the last stop (excepting SKIPPED stops) - key = stop_sequence_or_stop_id(stu) - - case module.pickup_drop_off(trip_id, key) do + case stop_times.pick_up_drop_off(trip_id, StopTimeUpdate.stop_sequence(stu)) do {true, true} -> ensure_both_times(stu) @@ -65,10 +53,8 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimes do end end - defp ensure_correct_times(stu, module, trip_id) do - key = stop_sequence_or_stop_id(stu) - - case module.pickup_drop_off(trip_id, key) do + defp ensure_correct_times(stu, stop_times, trip_id) do + case stop_times.pick_up_drop_off(trip_id, StopTimeUpdate.stop_sequence(stu)) do {_, true} -> ensure_both_times(stu) diff --git a/lib/concentrate/group_filter/scheduled_stop_times.ex b/lib/concentrate/group_filter/scheduled_stop_times.ex new file mode 100644 index 00000000..029f32d6 --- /dev/null +++ b/lib/concentrate/group_filter/scheduled_stop_times.ex @@ -0,0 +1,78 @@ +defmodule Concentrate.GroupFilter.ScheduledStopTimes do + @moduledoc """ + Uses the static GTFS schedule to fill in missing arrival/departure times on stop time updates + that have specific `status` values. + + The desired status values must be set in app configuration at compile time. For example: + + config :concentrate, + group_filters: [ + { + Concentrate.GroupFilter.ScheduledStopTimes, + on_time_statuses: ["status 1", "status 2", "status 3"] + } + ] + + If no status values are configured, enabling this filter has no effect. + """ + @behaviour Concentrate.GroupFilter + alias Concentrate.{StopTimeUpdate, TripDescriptor}, warn: false + + @impl Concentrate.GroupFilter + def filter(trip_group, gtfs_stop_times \\ Concentrate.GTFS.StopTimes) + + config_path = [:group_filters, __MODULE__, :on_time_statuses] + @on_time_statuses Application.compile_env(:concentrate, config_path, []) + + if @on_time_statuses == [] do + def filter(trip_group, _), do: trip_group + else + def filter({trip_descriptor, vehicle_positions, [_ | _] = stop_time_updates}, gtfs_stop_times) + when not is_nil(trip_descriptor) do + { + trip_descriptor, + vehicle_positions, + filter_stop_time_updates( + stop_time_updates, + TripDescriptor.start_date(trip_descriptor), + gtfs_stop_times + ) + } + end + + def filter(trip_group, _), do: trip_group + + defp filter_stop_time_updates(stop_time_updates, nil, _), do: stop_time_updates + + defp filter_stop_time_updates(stop_time_updates, trip_date, gtfs_stop_times) do + Enum.map(stop_time_updates, fn stop_time_update -> + fill_in_arrival_departure( + stop_time_update, + trip_date, + StopTimeUpdate.arrival_time(stop_time_update), + StopTimeUpdate.departure_time(stop_time_update), + StopTimeUpdate.status(stop_time_update), + gtfs_stop_times + ) + end) + end + + defp fill_in_arrival_departure(stop_time_update, trip_date, nil, nil, status, gtfs_stop_times) + when status in @on_time_statuses do + trip_id = StopTimeUpdate.trip_id(stop_time_update) + stop_sequence = StopTimeUpdate.stop_sequence(stop_time_update) + + case gtfs_stop_times.arrival_departure(trip_id, stop_sequence, trip_date) do + {arrival_time, departure_time} -> + stop_time_update + |> StopTimeUpdate.update_arrival_time(arrival_time) + |> StopTimeUpdate.update_departure_time(departure_time) + + :unknown -> + stop_time_update + end + end + + defp fill_in_arrival_departure(stop_time_update, _, _, _, _, _), do: stop_time_update + end +end diff --git a/lib/concentrate/group_filter/vehicle_stop_match.ex b/lib/concentrate/group_filter/vehicle_stop_match.ex index 9a0762ed..33b65e9f 100644 --- a/lib/concentrate/group_filter/vehicle_stop_match.ex +++ b/lib/concentrate/group_filter/vehicle_stop_match.ex @@ -8,7 +8,7 @@ defmodule Concentrate.GroupFilter.VehicleStopMatch do of the StopTimeUpdate with the same stop sequence (if they share a parent). """ @behaviour Concentrate.GroupFilter - alias Concentrate.Filter.GTFS.Stops + alias Concentrate.GTFS.Stops alias Concentrate.{StopTimeUpdate, VehiclePosition} @impl Concentrate.GroupFilter diff --git a/lib/concentrate/filter/gtfs/helpers.ex b/lib/concentrate/gtfs/helpers.ex similarity index 82% rename from lib/concentrate/filter/gtfs/helpers.ex rename to lib/concentrate/gtfs/helpers.ex index a599317c..1f40d720 100644 --- a/lib/concentrate/filter/gtfs/helpers.ex +++ b/lib/concentrate/gtfs/helpers.ex @@ -1,5 +1,5 @@ -defmodule Concentrate.Filter.GTFS.Helpers do - @moduledoc "Shared helpers for `Filter.GTFS` modules." +defmodule Concentrate.GTFS.Helpers do + @moduledoc "Shared helpers for `GTFS` modules." @doc "Turns the given binary into a Stream of lines." @spec io_stream(binary) :: Enumerable.t() diff --git a/lib/concentrate/gtfs/stop_times.ex b/lib/concentrate/gtfs/stop_times.ex new file mode 100644 index 00000000..f087cee7 --- /dev/null +++ b/lib/concentrate/gtfs/stop_times.ex @@ -0,0 +1,174 @@ +defmodule Concentrate.GTFS.StopTimes do + @moduledoc """ + Server which stores data related to GTFS stop times, keyed by trip ID and stop sequence. + """ + use GenStage + alias Concentrate.GTFS.Helpers + require Logger + import :binary, only: [copy: 1] + @table __MODULE__ + + # Arrival and departure are stored as an offset in seconds from noon local time, as per GTFS: + # https://gtfs.org/schedule/reference/#field-types + + @type object_key :: {trip_id :: String.t(), stop_sequence :: non_neg_integer} + @type object_value :: { + stop_id :: String.t(), + arrival_offset :: integer, + departure_offset :: integer, + time_zone_name :: String.t(), + pick_up? :: boolean, + drop_off? :: boolean + } + + def start_link(opts) do + GenStage.start_link(__MODULE__, opts, name: __MODULE__) + end + + @doc """ + Given a trip ID, stop sequence, and reference date, returns the scheduled arrival and departure + times for the trip at the stop on the date, as Unix timestamps. (Assumes the given trip runs on + the given date; this is not actually checked against the calendar.) + """ + @spec arrival_departure(String.t(), non_neg_integer, :calendar.date()) :: + {arrival_time :: non_neg_integer, departure_time :: non_neg_integer} | :unknown + def arrival_departure(trip_id, stop_sequence, {_, _, _} = date) + when is_binary(trip_id) and is_integer(stop_sequence) do + case lookup({trip_id, stop_sequence}) do + {_, arrival, departure, time_zone, _, _} -> + {to_unix!(date, arrival, time_zone), to_unix!(date, departure, time_zone)} + + nil -> + :unknown + end + end + + @doc "Returns whether riders can be picked up or dropped off for a given stop time." + @spec pick_up_drop_off(String.t(), non_neg_integer) :: {boolean, boolean} | :unknown + def pick_up_drop_off(trip_id, stop_sequence) + when is_binary(trip_id) and is_integer(stop_sequence) do + case lookup({trip_id, stop_sequence}) do + {_, _, _, _, pick_up?, drop_off?} -> {pick_up?, drop_off?} + nil -> :unknown + end + end + + @doc "Returns the stop ID for a stop time, identified by trip ID and stop sequence." + @spec stop_id(String.t(), non_neg_integer) :: String.t() | :unknown + def stop_id(trip_id, stop_sequence) when is_binary(trip_id) and is_integer(stop_sequence) do + case lookup({trip_id, stop_sequence}) do + {stop_id, _, _, _, _, _} -> stop_id + nil -> :unknown + end + end + + @spec lookup(object_key) :: object_value | nil + defp lookup(key) do + :ets.lookup_element(@table, key, 2) + rescue + ArgumentError -> nil + end + + # Note: we assume, as GTFS does, that 12:00:00 always occurs exactly once per day regardless of + # time change rules. + defp to_unix!({year, month, day}, offset, zone) do + {:ok, noon} = NaiveDateTime.new(year, month, day, 12, 0, 0) + (noon |> DateTime.from_naive!(zone) |> DateTime.to_unix()) + offset + end + + @impl GenStage + def init(opts) do + @table = :ets.new(@table, [:named_table, :public, :set]) + {:consumer, [], opts} + end + + @impl GenStage + def handle_events(events, _from, state) do + events |> List.flatten() |> Map.new() |> handle_files() + {:noreply, [], state, :hibernate} + end + + defp handle_files(%{ + "agency.txt" => agencies, + "routes.txt" => routes, + "stop_times.txt" => stop_times, + "trips.txt" => trips + }) do + trip_time_zones = trip_time_zones(agencies, routes, trips) + inserts = stop_times_inserts(stop_times, trip_time_zones) + true = :ets.delete_all_objects(@table) + :ets.insert(@table, inserts) + + if inserts != [] do + Logger.info(fn -> "#{__MODULE__}: updated with #{length(inserts)} records" end) + end + end + + defp handle_files(_), do: nil + + # Determine the time zone that each trip's stop times should be interpreted in. + defp trip_time_zones(agencies, routes, trips) do + agency_time_zones = csv_into_map(agencies, &{&1["agency_id"], copy(&1["agency_timezone"])}) + route_time_zones = csv_into_map(routes, &{&1["route_id"], agency_time_zones[&1["agency_id"]]}) + csv_into_map(trips, &{&1["trip_id"], route_time_zones[&1["route_id"]]}) + end + + defp csv_into_map(csv, func) do + csv + |> Helpers.io_stream() + |> CSV.decode(headers: true) + |> Stream.flat_map(fn + {:ok, row} -> [func.(row)] + {:error, _} -> [] + end) + |> Enum.into(%{}) + end + + defp stop_times_inserts(stop_times, trip_time_zones) do + stop_times + |> Helpers.io_stream() + |> CSV.decode(headers: true, num_workers: System.schedulers()) + |> Enum.flat_map(fn + {:ok, %{"trip_id" => trip_id} = row} -> build_inserts(row, trip_time_zones[trip_id]) + {:error, _} -> [] + end) + end + + defp build_inserts(row, time_zone) when is_binary(time_zone) do + trip_id = copy(row["trip_id"]) + stop_sequence = String.to_integer(row["stop_sequence"]) + + stop_id = copy(row["stop_id"]) + arrival = time_to_offset(row["arrival_time"]) + departure = time_to_offset(row["departure_time"]) + pick_up? = type_to_boolean(row["pickup_type"]) + drop_off? = type_to_boolean(row["drop_off_type"]) + + [{{trip_id, stop_sequence}, {stop_id, arrival, departure, time_zone, pick_up?, drop_off?}}] + end + + defp build_inserts(_, _), do: [] + + @twelve_hours 12 * 60 * 60 + + # Convert a GTFS "time" to an offset in seconds from noon, which as per the spec is what GTFS + # times actually mean (rather, they are an "offset from 12 hours before noon"; we pre-subtract + # the 12-hour offset as well). + defp time_to_offset( + <> + ) do + String.to_integer(hour) * 60 * 60 + + String.to_integer(minute) * 60 + + String.to_integer(second) - + @twelve_hours + end + + defp time_to_offset( + <<_hour::binary-size(1), ":", _min::binary-size(2), ":", _sec::binary-size(2)>> = time + ) do + time_to_offset("0" <> time) + end + + defp type_to_boolean("1"), do: false + defp type_to_boolean(_), do: true +end diff --git a/lib/concentrate/filter/gtfs/stops.ex b/lib/concentrate/gtfs/stops.ex similarity index 97% rename from lib/concentrate/filter/gtfs/stops.ex rename to lib/concentrate/gtfs/stops.ex index fbb7fcc9..2c1ec550 100644 --- a/lib/concentrate/filter/gtfs/stops.ex +++ b/lib/concentrate/gtfs/stops.ex @@ -1,4 +1,4 @@ -defmodule Concentrate.Filter.GTFS.Stops do +defmodule Concentrate.GTFS.Stops do @moduledoc """ Server which maintains a list of stop id -> parent station ID. """ diff --git a/lib/concentrate/filter/gtfs/supervisor.ex b/lib/concentrate/gtfs/supervisor.ex similarity index 65% rename from lib/concentrate/filter/gtfs/supervisor.ex rename to lib/concentrate/gtfs/supervisor.ex index 8af173b9..b1a68eea 100644 --- a/lib/concentrate/filter/gtfs/supervisor.ex +++ b/lib/concentrate/gtfs/supervisor.ex @@ -1,4 +1,4 @@ -defmodule Concentrate.Filter.GTFS.Supervisor do +defmodule Concentrate.GTFS.Supervisor do @moduledoc """ Supervisor for the extra servers needed for GTFS parsing. @@ -15,16 +15,15 @@ defmodule Concentrate.Filter.GTFS.Supervisor do Application.get_env(:concentrate, :http_producer), { config[:url], - parser: Concentrate.Filter.GTFS.Unzip, + parser: Concentrate.GTFS.Unzip, fetch_after: @one_hour, content_warning_timeout: :infinity, name: :gtfs_producer } }, - {Concentrate.Filter.GTFS.Trips, subscribe_to: [:gtfs_producer]}, - {Concentrate.Filter.GTFS.Stops, subscribe_to: [:gtfs_producer]}, - {Concentrate.Filter.GTFS.StopIDs, subscribe_to: [:gtfs_producer]}, - {Concentrate.Filter.GTFS.PickupDropOff, subscribe_to: [:gtfs_producer]} + {Concentrate.GTFS.Trips, subscribe_to: [:gtfs_producer]}, + {Concentrate.GTFS.Stops, subscribe_to: [:gtfs_producer]}, + {Concentrate.GTFS.StopTimes, subscribe_to: [:gtfs_producer]} ], strategy: :rest_for_one ) diff --git a/lib/concentrate/filter/gtfs/trips.ex b/lib/concentrate/gtfs/trips.ex similarity index 96% rename from lib/concentrate/filter/gtfs/trips.ex rename to lib/concentrate/gtfs/trips.ex index 415e9d1d..ecf97550 100644 --- a/lib/concentrate/filter/gtfs/trips.ex +++ b/lib/concentrate/gtfs/trips.ex @@ -1,4 +1,4 @@ -defmodule Concentrate.Filter.GTFS.Trips do +defmodule Concentrate.GTFS.Trips do @moduledoc """ Server which maintains a list of trip -> {route_id, direction_id} mappings. """ diff --git a/lib/concentrate/filter/gtfs/unzip.ex b/lib/concentrate/gtfs/unzip.ex similarity index 83% rename from lib/concentrate/filter/gtfs/unzip.ex rename to lib/concentrate/gtfs/unzip.ex index 01d58fd0..dac53db2 100644 --- a/lib/concentrate/filter/gtfs/unzip.ex +++ b/lib/concentrate/gtfs/unzip.ex @@ -1,9 +1,9 @@ -defmodule Concentrate.Filter.GTFS.Unzip do +defmodule Concentrate.GTFS.Unzip do @moduledoc """ Unzips the GTFS file into constituent files. """ @behaviour Concentrate.Parser - @file_list ['trips.txt', 'stop_times.txt', 'stops.txt'] + @file_list ['agency.txt', 'routes.txt', 'trips.txt', 'stop_times.txt', 'stops.txt'] def parse(binary, _opts) do {:ok, files} = :zip.unzip(binary, [:memory, file_list: @file_list]) diff --git a/lib/concentrate/merge_filter.ex b/lib/concentrate/merge_filter.ex index 7133f892..7dd63de7 100644 --- a/lib/concentrate/merge_filter.ex +++ b/lib/concentrate/merge_filter.ex @@ -153,6 +153,9 @@ defmodule Concentrate.MergeFilter do filter when is_atom(filter) -> &filter.filter/1 + {filter, _options} when is_atom(filter) -> + &filter.filter/1 + filter when is_function(filter, 1) -> filter end diff --git a/lib/concentrate/supervisor.ex b/lib/concentrate/supervisor.ex index 9c1d58e5..fe8b2ba5 100644 --- a/lib/concentrate/supervisor.ex +++ b/lib/concentrate/supervisor.ex @@ -36,7 +36,7 @@ defmodule Concentrate.Supervisor do def gtfs(config) do [ - {Concentrate.Filter.GTFS.Supervisor, config} + {Concentrate.GTFS.Supervisor, config} ] end diff --git a/mix.exs b/mix.exs index 81130004..982d122e 100644 --- a/mix.exs +++ b/mix.exs @@ -72,7 +72,8 @@ defmodule Concentrate.MixProject do {:gpb, "~> 4.7", only: :dev, runtime: false, only: :dev}, {:httpoison, "~> 1.0"}, {:jason, "~> 1.0"}, - {:stream_data, "~> 0.4", only: :test} + {:stream_data, "~> 0.4", only: :test}, + {:tzdata, "~> 1.1.1"} ] end end diff --git a/mix.lock b/mix.lock index 6b41ee50..8562ce32 100644 --- a/mix.lock +++ b/mix.lock @@ -36,5 +36,6 @@ "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, "stream_data": {:hex, :stream_data, "0.5.0", "b27641e58941685c75b353577dc602c9d2c12292dd84babf506c2033cd97893e", [:mix], [], "hexpm", "012bd2eec069ada4db3411f9115ccafa38540a3c78c4c0349f151fc761b9e271"}, "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, + "tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } diff --git a/test/concentrate/filter/gtfs/pickup_drop_off_test.exs b/test/concentrate/filter/gtfs/pickup_drop_off_test.exs deleted file mode 100644 index ea695b05..00000000 --- a/test/concentrate/filter/gtfs/pickup_drop_off_test.exs +++ /dev/null @@ -1,74 +0,0 @@ -defmodule Concentrate.Filter.GTFS.PickupDropOffTest do - @moduledoc false - use ExUnit.Case - import Concentrate.Filter.GTFS.PickupDropOff - - # copied + modified from a recent stop_times.txt - @body """ - "trip_id","arrival_time","departure_time","stop_id","stop_sequence","stop_headsign","pickup_type","drop_off_type","timepoint","checkpoint_id" - "Logan-22-Weekday-trip","08:00:00","08:00:00","Logan-Subway",1,"",0,1,0,"" - "Logan-22-Weekday-trip","08:04:00","08:04:00","Logan-RentalCarCenter",2,"",0,0,0,"" - "Logan-22-Weekday-trip","08:09:00","08:09:00","Logan-A",3,"",1,0,0,"" - "Logan-33-Weekday-trip","08:04:00","08:04:00","Logan-RentalCarCenter",2,"",2,2,0,"" - """ - - defp supervised(_) do - start_supervised(Concentrate.Filter.GTFS.PickupDropOff) - event = [{"stop_times.txt", @body}] - # relies on being able to update the table from a different process - handle_events([event], :ignored, :ignored) - :ok - end - - describe "pickup?" do - setup :supervised - - test "true if there's a pickup at the stop on that trip" do - assert {true, _} = pickup_drop_off("Logan-22-Weekday-trip", "Logan-Subway") - assert {true, _} = pickup_drop_off("Logan-22-Weekday-trip", "Logan-RentalCarCenter") - assert {false, _} = pickup_drop_off("Logan-22-Weekday-trip", "Logan-A") - assert {true, _} = pickup_drop_off("Logan-33-Weekday-trip", "Logan-RentalCarCenter") - end - - test "true if there's a pickup for that stop sequence" do - assert {true, _} = pickup_drop_off("Logan-22-Weekday-trip", 1) - assert {true, _} = pickup_drop_off("Logan-22-Weekday-trip", 2) - assert {false, _} = pickup_drop_off("Logan-22-Weekday-trip", 3) - assert {true, _} = pickup_drop_off("Logan-33-Weekday-trip", 2) - end - - test "unknown for unknown trips/stops" do - assert pickup_drop_off("unknown trip", "unknown stop") == :unknown - assert pickup_drop_off("Logan-33-Weekday-trip", "unknown stop") == :unknown - assert pickup_drop_off("Logan-33-Weekday-trip", 4) == :unknown - end - end - - describe "drop_off?" do - setup :supervised - - test "true if there's a drop_off at the stop on that trip" do - assert {_, false} = pickup_drop_off("Logan-22-Weekday-trip", "Logan-Subway") - assert {_, true} = pickup_drop_off("Logan-22-Weekday-trip", "Logan-RentalCarCenter") - assert {_, true} = pickup_drop_off("Logan-22-Weekday-trip", "Logan-A") - assert {_, true} = pickup_drop_off("Logan-33-Weekday-trip", "Logan-RentalCarCenter") - end - - test "true if there's a drop_off for that stop sequence" do - assert {_, false} = pickup_drop_off("Logan-22-Weekday-trip", 1) - assert {_, true} = pickup_drop_off("Logan-22-Weekday-trip", 2) - assert {_, true} = pickup_drop_off("Logan-22-Weekday-trip", 3) - assert {_, true} = pickup_drop_off("Logan-33-Weekday-trip", 2) - end - - test "unknown for unknown trips" do - assert pickup_drop_off("unknown trip", "unknown stop") == :unknown - end - end - - describe "missing ETS table" do - test "pickup_drop_off is unknown" do - assert pickup_drop_off("trip", 1) == :unknown - end - end -end diff --git a/test/concentrate/filter/gtfs/stop_ids_test.exs b/test/concentrate/filter/gtfs/stop_ids_test.exs deleted file mode 100644 index 5fa5b703..00000000 --- a/test/concentrate/filter/gtfs/stop_ids_test.exs +++ /dev/null @@ -1,38 +0,0 @@ -defmodule Concentrate.Filter.GTFS.StopIDsTest do - @moduledoc false - use ExUnit.Case - import Concentrate.Filter.GTFS.StopIDs - - # copied + modified from a recent stop_times.txt - @body """ - "trip_id","arrival_time","departure_time","stop_id","stop_sequence","stop_headsign","pickup_type","drop_off_type","timepoint","checkpoint_id" - "Logan-22-Weekday-trip","08:00:00","08:00:00","Logan-Subway",1,"",0,1,0,"" - """ - - defp supervised(_) do - start_supervised(Concentrate.Filter.GTFS.StopIDs) - event = [{"stop_times.txt", @body}] - # relies on being able to update the table from a different process - handle_events([event], :ignored, :ignored) - :ok - end - - describe "stop_id" do - setup :supervised - - test "stop ID for the trip/sequence" do - assert stop_id("Logan-22-Weekday-trip", 1) == "Logan-Subway" - end - - test "unknown for unknown trips/stops" do - assert stop_id("unknown trip", 1) == :unknown - assert stop_id("Logan-22-Weekday-trip", 4) == :unknown - end - end - - describe "missing ETS table" do - test "stop_id is unknown" do - assert stop_id("trip", 1) == :unknown - end - end -end diff --git a/test/concentrate/filter/include_route_direction_test.exs b/test/concentrate/filter/include_route_direction_test.exs index ef45d865..745432e2 100644 --- a/test/concentrate/filter/include_route_direction_test.exs +++ b/test/concentrate/filter/include_route_direction_test.exs @@ -4,7 +4,7 @@ defmodule Concentrate.Filter.IncludeRouteDirectionTest do import Concentrate.Filter.IncludeRouteDirection alias Concentrate.TripDescriptor - @module Concentrate.Filter.FakeTrips + @module Concentrate.GTFS.FakeTrips describe "filter/2" do test "a trip update with a route/direction is kept as-is" do diff --git a/test/concentrate/filter/include_stop_id_test.exs b/test/concentrate/filter/include_stop_id_test.exs index d357949e..ced79f10 100644 --- a/test/concentrate/filter/include_stop_id_test.exs +++ b/test/concentrate/filter/include_stop_id_test.exs @@ -1,31 +1,37 @@ defmodule Concentrate.Filter.IncludeStopIDTest do @moduledoc false use ExUnit.Case, async: true - import Concentrate.Filter.IncludeStopID + alias Concentrate.Filter.IncludeStopID alias Concentrate.StopTimeUpdate - @module Concentrate.Filter.FakeStopIDs + defmodule FakeStopTimes do + @moduledoc "Fake implementation of GTFS.StopTimes" + def stop_id("trip", 1), do: "stop" + def stop_id(_, _), do: :unknown + end + + defp filter(stu), do: IncludeStopID.filter(stu, FakeStopTimes) describe "filter/2" do test "a stop time update with a stop_id is kept as-is" do stu = StopTimeUpdate.new(trip_id: "trip", stop_id: "s", stop_sequence: 1) - assert {:cont, ^stu} = filter(stu, @module) + assert {:cont, ^stu} = filter(stu) end test "a stop update without a trip is kept as-is" do stu = StopTimeUpdate.new([]) - assert {:cont, ^stu} = filter(stu, @module) + assert {:cont, ^stu} = filter(stu) end test "a missing stop id is updated" do stu = StopTimeUpdate.new(trip_id: "trip", stop_sequence: 1) - assert {:cont, new_stu} = filter(stu, @module) + assert {:cont, new_stu} = filter(stu) assert StopTimeUpdate.stop_id(new_stu) == "stop" end test "unknown trip IDs are ignored" do stu = StopTimeUpdate.new(trip_id: "unknown") - assert {:cont, ^stu} = filter(stu, @module) + assert {:cont, ^stu} = filter(stu) end test "other values are returned as-is" do diff --git a/test/concentrate/group_filter/remove_unneeded_times_test.exs b/test/concentrate/group_filter/remove_unneeded_times_test.exs index e480eeca..e08996f0 100644 --- a/test/concentrate/group_filter/remove_unneeded_times_test.exs +++ b/test/concentrate/group_filter/remove_unneeded_times_test.exs @@ -1,20 +1,20 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimesTest do @moduledoc false use ExUnit.Case, async: true - import Concentrate.GroupFilter.RemoveUnneededTimes + alias Concentrate.GroupFilter.RemoveUnneededTimes alias Concentrate.{TripDescriptor, StopTimeUpdate} - defmodule FakePickupDropOff do - @moduledoc "Fake implementation of Filter.GTFS.PickupDropOff" - - def pickup_drop_off("trip", 1), do: {true, false} - def pickup_drop_off("trip", 4), do: {true, true} - def pickup_drop_off("trip", 5), do: {false, true} - def pickup_drop_off("trip", 6), do: {false, false} - def pickup_drop_off(_, _), do: :unknown + defmodule FakeStopTimes do + @moduledoc "Fake implementation of GTFS.StopTimes" + def pick_up_drop_off("trip", 1), do: {true, false} + def pick_up_drop_off("trip", 4), do: {true, true} + def pick_up_drop_off("trip", 5), do: {false, true} + def pick_up_drop_off("trip", 6), do: {false, false} + def pick_up_drop_off(_, _), do: :unknown end - @module __MODULE__.FakePickupDropOff + defp filter(stu), do: RemoveUnneededTimes.filter(stu, FakeStopTimes) + @arrival_time 5 @departure_time 500 @tu TripDescriptor.new(trip_id: "trip") @@ -30,21 +30,21 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimesTest do stu = StopTimeUpdate.update(@stu, stop_sequence: 7, arrival_time: nil) stu_2 = StopTimeUpdate.update(@stu, stop_sequence: 8, departure_time: nil) group = {@tu, [], [stu, stu_2]} - assert filter(group, @module) == group + assert filter(group) == group end test "the arrival_time is removed if there's no drop off" do stu = StopTimeUpdate.update(@stu, stop_sequence: 1) group = {@tu, [], [stu]} expected = StopTimeUpdate.update(stu, arrival_time: nil) - assert {_, [], [^expected]} = filter(group, @module) + assert {_, [], [^expected]} = filter(group) end test "the departure_time is removed if there's no pickup" do stu = StopTimeUpdate.update(@stu, stop_sequence: 5) group = {@tu, [], [stu]} expected = StopTimeUpdate.update(stu, departure_time: nil) - assert {_, [], [^expected]} = filter(group, @module) + assert {_, [], [^expected]} = filter(group) end test "a departure_time is not removed if there are stops added afterward" do @@ -56,40 +56,40 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimesTest do group = {@tu, [], stus} # no chanages - assert filter(group, @module) == group + assert filter(group) == group end test "if the departure time is missing from the first stop, use the arrival time" do stu = StopTimeUpdate.update(@stu, stop_sequence: 1, departure_time: nil) group = {@tu, [], [stu]} expected = StopTimeUpdate.update(stu, arrival_time: nil, departure_time: @arrival_time) - assert {_, [], [^expected]} = filter(group, @module) + assert {_, [], [^expected]} = filter(group) end test "if the arrival time is missing from the last stop, use the departure time" do stu = StopTimeUpdate.update(@stu, stop_sequence: 5, arrival_time: nil) group = {@tu, [], [stu]} expected = StopTimeUpdate.update(stu, arrival_time: @departure_time, departure_time: nil) - assert {_, [], [^expected]} = filter(group, @module) + assert {_, [], [^expected]} = filter(group) end test "other stop sequence values are left alone" do stu = StopTimeUpdate.update(@stu, stop_sequence: 3) group = {@tu, [], [stu]} - assert filter(group, @module) == group + assert filter(group) == group end test "arrival time is copied if only the departure time is available" do stu = StopTimeUpdate.update(@stu, arrival_time: nil) group = {@tu, [], [stu]} - {_, [], [stu]} = filter(group, @module) + {_, [], [stu]} = filter(group) assert StopTimeUpdate.arrival_time(stu) == @departure_time end test "departure time is copied if only the arrival time is available" do stu = StopTimeUpdate.update(@stu, departure_time: nil) group = {@tu, [], [stu]} - {_, [], [stu]} = filter(group, @module) + {_, [], [stu]} = filter(group) assert StopTimeUpdate.departure_time(stu) == @arrival_time end @@ -97,18 +97,18 @@ defmodule Concentrate.GroupFilter.RemoveUnneededTimesTest do stu = StopTimeUpdate.update(@stu, stop_sequence: 6) group = {@tu, [], [stu]} expected = StopTimeUpdate.skip(stu) - assert {_, [], [^expected]} = filter(group, @module) + assert {_, [], [^expected]} = filter(group) end test "non-scheduled TripUpdates aren't modified" do td = TripDescriptor.new(trip_id: "added", schedule_relationship: :ADDED) stu = StopTimeUpdate.update(@stu, trip_id: "added", departure_time: nil) group = {td, [], [stu]} - assert filter(group, @module) == group + assert filter(group) == group end test "other values are returned as-is" do - assert filter(:value, @module) == :value + assert filter(:value) == :value end end end diff --git a/test/concentrate/group_filter/scheduled_stop_times_test.exs b/test/concentrate/group_filter/scheduled_stop_times_test.exs new file mode 100644 index 00000000..6b0c1f91 --- /dev/null +++ b/test/concentrate/group_filter/scheduled_stop_times_test.exs @@ -0,0 +1,74 @@ +defmodule Concentrate.GroupFilter.ScheduledStopTimesTest do + @moduledoc false + use ExUnit.Case, async: true + alias Concentrate.GroupFilter.ScheduledStopTimes + alias Concentrate.{TripDescriptor, StopTimeUpdate} + + defmodule FakeStopTimes do + def arrival_departure("trip1", 10, {2022, 1, 1}), do: {1_610_000_000, 1_610_000_001} + def arrival_departure("trip1", 20, {2022, 1, 1}), do: {1_620_000_000, 1_620_000_001} + def arrival_departure("trip1", 30, {2022, 1, 1}), do: {1_630_000_000, 1_630_000_001} + def arrival_departure(_, _, _), do: :unknown + end + + # see config/test.exs + @on_time_status "on time" + @other_status "delayed" + + defp filter(trip_group), do: ScheduledStopTimes.filter(trip_group, FakeStopTimes) + + describe "filter/2" do + test "fills in missing times from the static schedule when status is a specific value" do + trip = TripDescriptor.new(trip_id: "trip1", start_date: {2022, 1, 1}) + stu1 = StopTimeUpdate.new(trip_id: "trip1", stop_sequence: 0, status: @on_time_status) + stu2 = StopTimeUpdate.new(trip_id: "trip1", stop_sequence: 10, status: @on_time_status) + stu3 = StopTimeUpdate.new(trip_id: "trip1", stop_sequence: 20, status: @other_status) + + new_stu2 = + StopTimeUpdate.new( + trip_id: "trip1", + stop_sequence: 10, + status: @on_time_status, + arrival_time: 1_610_000_000, + departure_time: 1_610_000_001 + ) + + assert filter({trip, [], [stu1, stu2, stu3]}) == {trip, [], [stu1, new_stu2, stu3]} + end + + test "does not change stop time updates with existing arrival or departure times" do + trip_group = + {TripDescriptor.new(trip_id: "trip1", start_date: nil), [], + [ + StopTimeUpdate.new( + trip_id: "trip1", + stop_sequence: 10, + status: @on_time_status, + arrival_time: 1_610_000_123 + ), + StopTimeUpdate.new( + trip_id: "trip1", + stop_sequence: 10, + status: @on_time_status, + departure_time: 1_610_000_123 + ) + ]} + + assert filter(trip_group) == trip_group + end + + test "passes through if the trip has no `start_date`" do + trip_group = + {TripDescriptor.new(trip_id: "trip1", start_date: nil), [], + [StopTimeUpdate.new(trip_id: "trip1")]} + + assert filter(trip_group) == trip_group + end + + test "passes through if no trip descriptor is available" do + trip_group = {nil, [], [StopTimeUpdate.new(trip_id: "trip1")]} + + assert filter(trip_group) == trip_group + end + end +end diff --git a/test/concentrate/group_filter/vehicle_stop_match_test.exs b/test/concentrate/group_filter/vehicle_stop_match_test.exs index 18a009db..b47981f0 100644 --- a/test/concentrate/group_filter/vehicle_stop_match_test.exs +++ b/test/concentrate/group_filter/vehicle_stop_match_test.exs @@ -3,7 +3,7 @@ defmodule Concentrate.GroupFilter.VehicleStopMatchTest do use ExUnit.Case, async: true import Concentrate.GroupFilter.VehicleStopMatch alias Concentrate.{StopTimeUpdate, VehiclePosition} - alias Concentrate.Filter.GTFS.Stops + alias Concentrate.GTFS.Stops describe "filter/1" do test "updates the VehiclePosition stop_id to match the StopTimeUpdate" do diff --git a/test/concentrate/filter/gtfs/helpers_test.exs b/test/concentrate/gtfs/helpers_test.exs similarity index 77% rename from test/concentrate/filter/gtfs/helpers_test.exs rename to test/concentrate/gtfs/helpers_test.exs index c01657f9..30de1c1d 100644 --- a/test/concentrate/filter/gtfs/helpers_test.exs +++ b/test/concentrate/gtfs/helpers_test.exs @@ -1,7 +1,7 @@ -defmodule Concentrate.Filter.GTFS.HelpersTest do +defmodule Concentrate.GTFS.HelpersTest do @moduledoc false use ExUnit.Case, async: true - alias Concentrate.Filter.GTFS.Helpers + alias Concentrate.GTFS.Helpers describe "io_stream/1" do test "streams the lines of a binary" do diff --git a/test/concentrate/gtfs/stop_times_test.exs b/test/concentrate/gtfs/stop_times_test.exs new file mode 100644 index 00000000..83446ee0 --- /dev/null +++ b/test/concentrate/gtfs/stop_times_test.exs @@ -0,0 +1,105 @@ +defmodule Concentrate.GTFS.StopTimesTest do + @moduledoc false + use ExUnit.Case + alias Concentrate.GTFS.StopTimes + + @agency """ + agency_id,agency_name,agency_url,agency_timezone,agency_lang,agency_phone + 1,MBTA,http://www.mbta.com,America/New_York,EN,617-222-3200 + """ + + @routes """ + route_id,agency_id,route_short_name,route_long_name,route_desc,route_type,route_url,route_color,route_text_color,route_sort_order + CR-Middleborough,1,,Middleborough/Lakeville Line,Commuter Rail,2,https://www.mbta.com/schedules/CR-Middleborough,80276C,FFFFFF,20009 + CR-Worcester,1,,Framingham/Worcester Line,Commuter Rail,2,https://www.mbta.com/schedules/CR-Worcester,80276C,FFFFFF,20003 + """ + + @trips """ + route_id,service_id,trip_id,trip_headsign,trip_short_name,direction_id,block_id,shape_id,wheelchair_accessible,bikes_allowed + CR-Middleborough,SPRING2022-SOUTHSUN-Sunday-1-S8f,CR-524518-2019,Middleborough/Lakeville,2019,0,,9800002,1,1 + CR-Worcester,SPRING2022-SOUTHSUN-Sunday-1-Sb4,CR-524522-2502,South Station,2502,1,,9850001,1,1 + """ + + @stop_times """ + trip_id,arrival_time,departure_time,stop_id,stop_sequence,stop_headsign,pickup_type,drop_off_type,timepoint,continuous_pickup,continuous_drop_off + CR-524518-2019,20:25:00,20:25:00,NEC-2287,0,,0,1,1,, + CR-524518-2019,20:31:00,20:31:00,MM-0023-S,10,,3,3,1,, + CR-524518-2019,20:38:00,20:38:00,MM-0079-S,20,,3,3,1,, + CR-524522-2502,7:10:00,7:10:00,WML-0442-CS,0,,0,1,1,, + CR-524522-2502,7:23:00,7:23:00,WML-0364,10,,0,0,1,, + CR-524522-2502,7:27:00,7:27:00,WML-0340,20,,0,0,1,, + """ + + defp supervised(_) do + start_supervised(StopTimes) + + event = [ + {"agency.txt", @agency}, + {"routes.txt", @routes}, + {"trips.txt", @trips}, + {"stop_times.txt", @stop_times} + ] + + StopTimes.handle_events([event], :ignored, :ignored) + :ok + end + + describe "arrival_departure/3" do + setup :supervised + + test "returns scheduled arrival/departure times for a stop time with a reference date" do + assert StopTimes.arrival_departure("CR-524518-2019", 10, {2022, 6, 12}) == + {1_655_080_260, 1_655_080_260} + + assert StopTimes.arrival_departure("CR-524518-2019", 10, {2022, 6, 19}) == + {1_655_685_060, 1_655_685_060} + + assert StopTimes.arrival_departure("CR-524522-2502", 0, {2022, 6, 19}) == + {1_655_637_000, 1_655_637_000} + end + + test "returns :unknown for unknown trips or stop sequences" do + assert StopTimes.arrival_departure("CR-524518-0000", 10, {2022, 6, 19}) == :unknown + assert StopTimes.arrival_departure("CR-524518-2019", 30, {2022, 6, 19}) == :unknown + end + end + + describe "pick_up_drop_off/2" do + setup :supervised + + test "returns pick-up/drop-off booleans for a stop time" do + assert StopTimes.pick_up_drop_off("CR-524522-2502", 0) == {true, false} + end + + test "returns true for all pick-up/drop-off types other than 1" do + assert StopTimes.pick_up_drop_off("CR-524518-2019", 10) == {true, true} + end + + test "returns :unknown for unknown trips or stop sequences" do + assert StopTimes.pick_up_drop_off("CR-524518-0000", 10) == :unknown + assert StopTimes.pick_up_drop_off("CR-524518-2019", 30) == :unknown + end + end + + describe "stop_id/2" do + setup :supervised + + test "returns the stop ID for a stop time" do + assert StopTimes.stop_id("CR-524522-2502", 0) == "WML-0442-CS" + assert StopTimes.stop_id("CR-524518-2019", 20) == "MM-0079-S" + end + + test "returns :unknown for unknown trips or stop sequences" do + assert StopTimes.stop_id("CR-524518-0000", 10) == :unknown + assert StopTimes.stop_id("CR-524518-2019", 30) == :unknown + end + end + + describe "missing ETS table" do + test "all functions return :unknown" do + assert StopTimes.arrival_departure("CR-524518-2019", 10, {2022, 6, 19}) == :unknown + assert StopTimes.pick_up_drop_off("CR-524518-2019", 10) == :unknown + assert StopTimes.stop_id("CR-524518-2019", 10) == :unknown + end + end +end diff --git a/test/concentrate/filter/gtfs/stops_test.exs b/test/concentrate/gtfs/stops_test.exs similarity index 90% rename from test/concentrate/filter/gtfs/stops_test.exs rename to test/concentrate/gtfs/stops_test.exs index ef71c481..ae245f36 100644 --- a/test/concentrate/filter/gtfs/stops_test.exs +++ b/test/concentrate/gtfs/stops_test.exs @@ -1,7 +1,7 @@ -defmodule Concentrate.Filter.GTFS.StopsTest do +defmodule Concentrate.GTFS.StopsTest do @moduledoc false use ExUnit.Case - import Concentrate.Filter.GTFS.Stops + import Concentrate.GTFS.Stops @body """ "stop_id","stop_code","stop_name","stop_desc","platform_code","platform_name","stop_lat","stop_lon","stop_address","zone_id","stop_url","level_id","location_type","parent_station","wheelchair_boarding" @@ -10,7 +10,7 @@ defmodule Concentrate.Filter.GTFS.StopsTest do """ defp supervised(_) do - start_supervised!(Concentrate.Filter.GTFS.Stops) + start_supervised!(Concentrate.GTFS.Stops) event = [{"stops.txt", @body}] # relies on being able to update the table from a different process handle_events([event], :ignored, :ignored) diff --git a/test/concentrate/filter/gtfs/trips_test.exs b/test/concentrate/gtfs/trips_test.exs similarity index 87% rename from test/concentrate/filter/gtfs/trips_test.exs rename to test/concentrate/gtfs/trips_test.exs index 67b519d4..13c59339 100644 --- a/test/concentrate/filter/gtfs/trips_test.exs +++ b/test/concentrate/gtfs/trips_test.exs @@ -1,7 +1,7 @@ -defmodule Concentrate.Filter.GTFS.TripsTest do +defmodule Concentrate.GTFS.TripsTest do @moduledoc false use ExUnit.Case - import Concentrate.Filter.GTFS.Trips + import Concentrate.GTFS.Trips @body """ trip_id,route_id,direction_id @@ -9,7 +9,7 @@ defmodule Concentrate.Filter.GTFS.TripsTest do """ defp supervised(_) do - start_supervised(Concentrate.Filter.GTFS.Trips) + start_supervised(Concentrate.GTFS.Trips) event = [{"trips.txt", @body}] # relies on being able to update the table from a different process handle_events([event], :ignored, :ignored) diff --git a/test/concentrate/filter/gtfs/unzip_test.exs b/test/concentrate/gtfs/unzip_test.exs similarity index 92% rename from test/concentrate/filter/gtfs/unzip_test.exs rename to test/concentrate/gtfs/unzip_test.exs index f76cbc34..2de1b0e3 100644 --- a/test/concentrate/filter/gtfs/unzip_test.exs +++ b/test/concentrate/gtfs/unzip_test.exs @@ -1,7 +1,7 @@ -defmodule Concentrate.Filter.GTFS.UnzipTest do +defmodule Concentrate.GTFS.UnzipTest do @moduledoc false use ExUnit.Case, async: true - import Concentrate.Filter.GTFS.Unzip + import Concentrate.GTFS.Unzip describe "parse/1" do test "returns the relevants bodies" do diff --git a/test/support/filter/fakes.ex b/test/support/filter/fakes.ex index ef7c9a11..e0d17526 100644 --- a/test/support/filter/fakes.ex +++ b/test/support/filter/fakes.ex @@ -1,5 +1,5 @@ -defmodule Concentrate.Filter.FakeTrips do - @moduledoc "Fake implementation of Filter.GTFS.Trips" +defmodule Concentrate.GTFS.FakeTrips do + @moduledoc "Fake implementation of GTFS.Trips" def route_id("trip"), do: "route" def route_id(_), do: nil @@ -7,12 +7,6 @@ defmodule Concentrate.Filter.FakeTrips do def direction_id(_), do: nil end -defmodule Concentrate.Filter.FakeStopIDs do - @moduledoc "Fake implementation of Filter.GTFS.StopIDs" - def stop_id("trip", 1), do: "stop" - def stop_id(_, _), do: :unknown -end - defmodule Concentrate.Filter.FakeCancelledTrips do @moduledoc "Fake implementation of Filter.Alerts.CancelledTrips" def route_cancelled?("route", {1970, 1, 2}) do diff --git a/test/test_helper.exs b/test/test_helper.exs index 103b911b..667a8d89 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,2 +1,3 @@ Application.ensure_all_started(:stream_data) +Application.ensure_all_started(:tzdata) ExUnit.start()