Skip to content

Commit

Permalink
Add signs config fetching from live data
Browse files Browse the repository at this point in the history
  • Loading branch information
bfauble committed May 8, 2024
1 parent cde9d25 commit e5df291
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
9 changes: 6 additions & 3 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ config :concentrate,
gtfs_realtime: [
vehicle_positions: "https://cdn.mbta.com/realtime/VehiclePositions.pb",
trip_updates: "https://cdn.mbta.com/realtime/TripUpdates.pb"
]
],
],
signs_stops_config: [
url: "https://mbta-signs.s3.amazonaws.com/stops-config.json"
],
alerts: [
url: "https://cdn.mbta.com/realtime/Alerts.pb"
Expand Down Expand Up @@ -74,8 +77,8 @@ config :concentrate,
Concentrate.GroupFilter.VehicleStopMatch,
Concentrate.GroupFilter.SkippedStopOnAddedTrip,
Concentrate.GroupFilter.TripDescriptorTimestamp,
Concentrate.GroupFilter.UncertaintyValue
# Concentrate.GroupFilter.SuppressStopTimeUpdate Enable once fully implemented
Concentrate.GroupFilter.UncertaintyValue,
Concentrate.GroupFilter.SuppressStopTimeUpdate
],
source_reporters: [
Concentrate.SourceReporter.Basic,
Expand Down
4 changes: 4 additions & 0 deletions lib/concentrate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ defmodule Concentrate do
[gtfs: [url: url]]
end

defp decode_json_key_value({"signs_stops_config", %{"url" => url}}) do
[signs_stops_config: [url: url]]
end

defp decode_json_key_value({"sinks", sinks_object}) do
sinks = decode_sinks_object(sinks_object, %{})

Expand Down
42 changes: 42 additions & 0 deletions lib/concentrate/filter/suppress/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Concentrate.Filter.Suppress.Supervisor do
@moduledoc """
Supervisor for the extra servers needed for suppressing predictions based on signs config.
* HTTP producer to fetch the signs config
* Consumer / map of suppressed stops
"""
@one_day 86_400_000

require Logger

def start_link(config) do
if config[:url] do
Supervisor.start_link(
[
{
Concentrate.producer_for_url(config[:url]),
{
config[:url],
parser: Concentrate.Parser.SignsConfig,
fetch_after: 10_000,
content_warning_timeout: @one_day,
name: :stop_prediction_status_producer
}
},
{Concentrate.Filter.Suppress.StopPredictionStatus, subscribe_to: [:stop_prediction_status_producer]}
],
strategy: :rest_for_one
)
else
:ignore
end
end

def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :supervisor
}
end
end
9 changes: 8 additions & 1 deletion lib/concentrate/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ defmodule Concentrate.Supervisor do
def children(config) do
pool = pool()
alerts = alerts(config[:alerts])
signs_stops_config = signs_stops_config(config[:signs_stops_config])
gtfs = gtfs(config[:gtfs])
pipeline = pipeline(config)
health = health()
Enum.concat([pool, alerts, gtfs, pipeline, health])
Enum.concat([pool, alerts, signs_stops_config, gtfs, pipeline, health])
end

def pool do
Expand All @@ -34,6 +35,12 @@ defmodule Concentrate.Supervisor do
]
end

def signs_stops_config(config) do
[
{Concentrate.Filter.Suppress.Supervisor, config}
]
end

def gtfs(config) do
[
{Concentrate.GTFS.Supervisor, config}
Expand Down

0 comments on commit e5df291

Please sign in to comment.