Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🧠 Ingest OIO trigger and suppress relevant predictions #351

Merged
merged 7 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ config :concentrate,
trip_updates: "https://cdn.mbta.com/realtime/TripUpdates.pb"
]
],
signs_stops_config: [
url: "s3://mbta-signs-dev/stops-config.json"
],
alerts: [
url: "https://cdn.mbta.com/realtime/Alerts.pb"
],
Expand Down Expand Up @@ -74,8 +77,8 @@ config :concentrate,
Concentrate.GroupFilter.VehicleStopMatch,
Concentrate.GroupFilter.SkippedStopOnAddedTrip,
Concentrate.GroupFilter.TripDescriptorTimestamp,
Concentrate.GroupFilter.UncertaintyValue
# Concentrate.GroupFilter.SuppressStopTimeUpdate Enable once fully implemented
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm ambivalent personally, but there seems to be a preference here towards avoiding commented code. Maybe this should just be whacked, and then added when it's actually in use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was actually already merged in and is being updated in this PR

Concentrate.GroupFilter.UncertaintyValue,
Concentrate.GroupFilter.SuppressStopTimeUpdate
],
source_reporters: [
Concentrate.SourceReporter.Basic,
Expand Down Expand Up @@ -114,7 +117,8 @@ config :concentrate,
"http" => Concentrate.Producer.HTTPoison,
"mqtt" => Concentrate.Producer.Mqtt,
"mqtts" => Concentrate.Producer.Mqtt,
"mqtt+ssl" => Concentrate.Producer.Mqtt
"mqtt+ssl" => Concentrate.Producer.Mqtt,
"s3" => Concentrate.Producer.S3
}

import_config "#{config_env()}.exs"
4 changes: 4 additions & 0 deletions lib/concentrate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ defmodule Concentrate do
[gtfs: [url: url]]
end

defp decode_json_key_value({"signs_stops_config", %{"url" => url}}) do
[signs_stops_config: [url: url]]
end

defp decode_json_key_value({"sinks", sinks_object}) do
sinks = decode_sinks_object(sinks_object, %{})

Expand Down
4 changes: 3 additions & 1 deletion lib/concentrate/filter/suppress/stop_prediction_status.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ defmodule Concentrate.Filter.Suppress.StopPredictionStatus do
{:noreply, [], state, :hibernate}
end

defp store_new_state([:empty]), do: store_new_state([])

defp store_new_state(events) do
currently_suppressed_stops = :ets.tab2list(@table) |> Keyword.get(:entries, [])
:ets.delete_all_objects(@table)
Expand Down Expand Up @@ -57,7 +59,7 @@ defmodule Concentrate.Filter.Suppress.StopPredictionStatus do
if route_id != nil and direction_id != nil do
@table
|> :ets.tab2list()
|> Keyword.get(:entries)
|> Keyword.get(:entries, [])
|> Enum.filter(fn %{route_id: r, direction_id: d} ->
r == route_id and d == direction_id
end)
Expand Down
43 changes: 43 additions & 0 deletions lib/concentrate/filter/suppress/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Concentrate.Filter.Suppress.Supervisor do
@moduledoc """
Supervisor for the extra servers needed for suppressing predictions based on signs config.

* HTTP producer to fetch the signs config
* Consumer / map of suppressed stops
"""
@one_day 86_400_000

require Logger

def start_link(config) do
if config[:url] do
Supervisor.start_link(
[
{
Concentrate.producer_for_url(config[:url]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth adding a test to either confirm that it actually starts pulling this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing, focusing on getting the terraform changes in so that I can put this on dev for more thorough testing but can add a test for this

{
config[:url],
parser: Concentrate.Parser.SignsConfig,
fetch_after: 1_000,
content_warning_timeout: @one_day,
name: :stop_prediction_status_producer
}
},
{Concentrate.Filter.Suppress.StopPredictionStatus,
subscribe_to: [:stop_prediction_status_producer]}
],
strategy: :rest_for_one
)
else
:ignore
end
end

def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :supervisor
}
end
end
7 changes: 4 additions & 3 deletions lib/concentrate/parser/signs_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ defmodule Concentrate.Parser.SignsConfig do
@behaviour Concentrate.Parser

@impl Concentrate.Parser
def parse(binary, opts) when is_binary(binary) and is_list(opts) do
json = Jason.decode!(binary, strings: :copy)
map_entities(json)
def parse(binary, _opts) when is_binary(binary) do
binary
|> Jason.decode!(strings: :copy)
|> map_entities()
end

defp map_entities(%{"stops" => items}) do
Expand Down
148 changes: 148 additions & 0 deletions lib/concentrate/producer/s3.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
defmodule Concentrate.Producer.S3 do
@moduledoc """
GenStage Producer for s3.
"""

use GenStage
require Logger

@start_link_opts [:name]

defmodule State do
@moduledoc false
defstruct [
:bucket,
:etag,
:ex_aws,
:fetch_after,
:last_fetch,
:last_modified,
:next_fetch_ref,
:object,
:parser_opts,
:parser,
:url
]
end

alias __MODULE__.State

def start_link({url, opts}) when is_binary(url) and is_list(opts) do
start_link_opts = Keyword.take(opts, @start_link_opts)
opts = Keyword.drop(opts, @start_link_opts)
GenStage.start_link(__MODULE__, {url, opts}, start_link_opts)
end

@impl GenStage
def init({url, opts}) do
{parser, parser_opts} =
case Keyword.fetch!(opts, :parser) do
module when is_atom(module) ->
{&module.parse/2, []}

{module, opts} when is_atom(module) and is_list(opts) ->
{&module.parse/2, opts}

fun when is_function(fun, 2) ->
{fun, []}
end

{bucket, object} = parse_s3_url(url)

fetch_after = Keyword.get(opts, :fetch_after)
ex_aws = Keyword.get(opts, :ex_aws, ExAws)

{
:producer,
%State{
bucket: bucket,
ex_aws: ex_aws,
fetch_after: fetch_after,
last_fetch: monotonic_now() - fetch_after - 1,
object: object,
parser_opts: parser_opts,
parser: parser,
url: url
},
dispatcher: GenStage.BroadcastDispatcher
}
end

defp parse_s3_url(url) do
%URI{host: bucket, path: object} = URI.parse(url)

{bucket, object}
end

@impl GenStage
def handle_demand(_, state) do
state = schedule_fetch(state)

{:noreply, [], state}
end

@impl GenStage
def handle_info(:fetch, state) do
state = %{state | next_fetch_ref: nil, last_fetch: monotonic_now()}
state = schedule_fetch(state)

case state.ex_aws.request(
ExAws.S3.get_object(state.bucket, state.object,
if_none_match: state.etag,
if_modified_since: state.last_modified
)
) do
{:ok, %{status_code: 200, body: body, headers: headers}} ->
state = %{
state
| last_modified: get_header(headers, "last-modified"),
etag: get_header(headers, "etag")
}

{:noreply, parse_response(body, state), state}

{:ok, %{status_code: 304}} ->
{:noreply, [], state}

{_, error} ->
Logger.warn(
"#{__MODULE__} error fetching s3 url=#{state.url}} error=#{inspect(error, limit: :infinity)}"
)

{:noreply, [], state}
end
end

defp schedule_fetch(%{next_fetch_ref: nil} = state) do
next_fetch_after = max(state.last_fetch + state.fetch_after - monotonic_now(), 0)
next_fetch_ref = Process.send_after(self(), :fetch, next_fetch_after)

%{state | next_fetch_ref: next_fetch_ref}
end

# coveralls-ignore-start
defp schedule_fetch(%{next_fetch_ref: _} = state) do
# already scheduled! this isn't always hit during testing (but it is
# sometimes) so we skip the coverage check.
state
end

# coveralls-ignore-stop

defp monotonic_now do
System.monotonic_time(:millisecond)
end

defp get_header(headers, header) do
Enum.find_value(headers, fn {key, value} ->
String.downcase(key) == header and value
end)
end

defp parse_response(body, state) do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nlwstein this was added so that the events list wasn't empty if the new state should be empty. otherwise the GenStage does not actually pass along the empty array as a message.

case state.parser.(body, state.parser_opts) do
[] -> [:empty]
events -> events
end
end
end
9 changes: 8 additions & 1 deletion lib/concentrate/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ defmodule Concentrate.Supervisor do
def children(config) do
pool = pool()
alerts = alerts(config[:alerts])
signs_stops_config = signs_stops_config(config[:signs_stops_config])
gtfs = gtfs(config[:gtfs])
pipeline = pipeline(config)
health = health()
Enum.concat([pool, alerts, gtfs, pipeline, health])
Enum.concat([pool, alerts, signs_stops_config, gtfs, pipeline, health])
end

def pool do
Expand All @@ -34,6 +35,12 @@ defmodule Concentrate.Supervisor do
]
end

def signs_stops_config(config) do
[
{Concentrate.Filter.Suppress.Supervisor, config}
]
end

def gtfs(config) do
[
{Concentrate.GTFS.Supervisor, config}
Expand Down
40 changes: 40 additions & 0 deletions test/concentrate/filter/suppress/stop_prediction_status_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ defmodule Concentrate.Filter.Suppress.StopPredictionStatusTest do
handle_events(@entries, :from, :state)

assert MapSet.new([1, 2]) == flagged_stops_on_route("Red", 0)
assert MapSet.new([3, 4]) == flagged_stops_on_route("Red", 1)
assert MapSet.new([5]) == flagged_stops_on_route("Blue", 0)
assert MapSet.new([6]) == flagged_stops_on_route("Blue", 1)
end

test "returns nil if missing stop_id or direction_id" do
Expand All @@ -43,6 +46,43 @@ defmodule Concentrate.Filter.Suppress.StopPredictionStatusTest do

assert log =~
"Cleared prediction suppression for stop_id=2 route_id=Red direction_id=0 based on RTS feed"

assert MapSet.new([]) == flagged_stops_on_route("Red", 0)
assert MapSet.new([3, 4]) == flagged_stops_on_route("Red", 1)
assert MapSet.new([5]) == flagged_stops_on_route("Blue", 0)
assert MapSet.new([6]) == flagged_stops_on_route("Blue", 1)
end

test "empties state when supplied with :empty list" do
handle_events(@entries, :from, :state)

log =
capture_log(fn ->
handle_events([:empty], :from, :state)
end)

assert log =~
"Cleared prediction suppression for stop_id=1 route_id=Red direction_id=0 based on RTS feed"

assert log =~
"Cleared prediction suppression for stop_id=2 route_id=Red direction_id=0 based on RTS feed"

assert log =~
"Cleared prediction suppression for stop_id=3 route_id=Red direction_id=1 based on RTS feed"

assert log =~
"Cleared prediction suppression for stop_id=4 route_id=Red direction_id=1 based on RTS feed"

assert log =~
"Cleared prediction suppression for stop_id=5 route_id=Blue direction_id=0 based on RTS feed"

assert log =~
"Cleared prediction suppression for stop_id=6 route_id=Blue direction_id=1 based on RTS feed"

assert MapSet.new([]) == flagged_stops_on_route("Red", 0)
assert MapSet.new([]) == flagged_stops_on_route("Red", 1)
assert MapSet.new([]) == flagged_stops_on_route("Blue", 0)
assert MapSet.new([]) == flagged_stops_on_route("Blue", 1)
end
end

Expand Down
Loading
Loading