Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SIGNAL-5811 add time interval bucket feature to ThrottleAndTimed and make loop_interval optional #39

Conversation

seungjinstord
Copy link
Contributor

@seungjinstord seungjinstord commented Feb 26, 2024

Change Buffy.ThrottleAndTimed to be able to modify data in state for each throttle() invoked.
This is additive with additional defoverridable functions - original API is intact.
:loop_interval option has looser requirement as it became optional.

Example use case: let's say for some event handler operation, we process an id by querying into the DB.

If that happens a thousand times per second into a connection that takes a lot of CPU/memory, like Postgres DB,
then making a thousand query connections per second would be significantly expensive compared to one connection with a list of thousand ids - let's say the same query ends up returning a list, regardless of one id or list of ids (logic only differs by input size of ids list).

In that case we need a way to collect ids across some set of events.

Buffy.ThrottleAndTimed already has bulk of the timing mechanism in place:

  • it is a process that will not get killed once throttle work is done
  • it has throttling logic
  • it can be modified to have three additional defoverridable functions for key generating, args updating, and state update upon successful work operations.
  • it can be modified so the loop interval feature is optional.

Example use (also noted in moduledoc):

    defmodule MyElasticEventThrottler do
      use Buffy.ThrottleAndTimed,
        throttle: 100,
        supervisor_module: DynamicSupervisor,
        supervisor_name: MyDynamicSupervisor

      def handle_throttle(%{test_pid: test_pid, values: values} = args) do
        Process.sleep(200)
        send(test_pid, {:ok, args, System.monotonic_time()})
        values
      end

      def args_to_key(%{key: key}), do: key |> :erlang.term_to_binary() |> :erlang.phash2()

      def update_args(%{values: values} = old_arg, %{values: new_values} = _new_arg)
          when is_list(values) and is_list(new_values) do
        %{old_arg | values: Enum.sort(values ++ new_values)}
      end

      def update_state_with_work_result(%{args: %{values: state_values} = args} = state, result_values) do
        # because `handle_throttle()` runs in the `:continue` lifecycle of GenServer,
        # inbox processing is paused until the logic completes. Inbox will continually get new messages,
        # from calling `throttle()` and will be processed only after completion of `handle_throttle()`.
        %{state | args: %{args | values: []}}
      end
    end

# ... and using it:
MyElasticEventThrottler.throttle(%{key: "my_key", test_pid: test_pid, values: [0,1,2,3]})

Also see the added test for a more thorough use case.

@seungjinstord seungjinstord requested review from kinson, btkostner and a team February 26, 2024 18:36
@seungjinstord seungjinstord self-assigned this Feb 26, 2024
Copy link
Contributor

@kinson kinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few suggestions and one question about avoiding a potential race condition

lib/buffy/throttle_and_timed.ex Outdated Show resolved Hide resolved
lib/buffy/throttle_and_timed.ex Outdated Show resolved Hide resolved
lib/buffy/throttle_and_timed.ex Outdated Show resolved Hide resolved
Comment on lines +357 to 359
def handle_cast({:throttle, new_args}, %{args: args} = state) do
{:noreply, %{state | args: update_args(args, new_args)}}
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I'm thinking about is whether there might be issues with using handle_cast in a situation where were continuously getting throttle messages and the timeout occurs in the middle of a burst of them. Is there a chance we could drop some of the new things being added to state or overwrite them when we call update_state_with_work_result/2.

I'd have to think about it and maybe test it out to know for sure if that's possible (or not possible), but maybe you've already considered this 🙂 and done they heavy mental lifting for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to test it and was wondering why I wasn't getting any new data into the state.

Then I realized that the beauty of handle_continue is that incoming messages will be kept getting stored in the inbox until handle_continue finishes running.

After the handle_continue is finished, the GenServer will then start ingesting the backed up messages in the inbox - this was confirmed by checking the Process.info(pid, :message).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I realized that the beauty of handle_continue is that incoming messages will be kept getting stored in the inbox until handle_continue finishes running.

❤️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the handle_continue is finished, the GenServer will then start ingesting the backed up messages in the inbox - this was confirmed by checking the Process.info(pid, :message).

Could we measure/monitor the size of the inbox queue? That could be valuable lest we run into an issue where messages get dropped. I'm trying to find documentation on the behavior as the mailbox size grows but having trouble for some reason 🙃

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, we could probably implement this in the handler itself during handle_throttle if we want to capture it

Copy link
Contributor

@kinson kinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💥 I'll try plugging this into the wms-service and test it on a branch unless you already have one you're working on that I could use

@seungjinstord
Copy link
Contributor Author

seungjinstord commented Mar 11, 2024

@kinson I'll set one up gimme a sec

UPDATE: here it is: https://github.com/stordco/wms-service/tree/SIGNAL-5811-with-buffy-pr - I tweaked mix.exs to use this branch

Comment on lines +357 to 359
def handle_cast({:throttle, new_args}, %{args: args} = state) do
{:noreply, %{state | args: update_args(args, new_args)}}
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, we could probably implement this in the handler itself during handle_throttle if we want to capture it

@kinson
Copy link
Contributor

kinson commented Mar 12, 2024

@seungjinstord this worked like a charm ✨

@seungjinstord seungjinstord merged commit 3d48d04 into main Mar 12, 2024
10 checks passed
@seungjinstord seungjinstord deleted the SIGNAL-5811-add-time-interval-bucket-feature-to-throttle-and-timed branch March 12, 2024 23:43
seungjinstord pushed a commit that referenced this pull request Mar 13, 2024
An automated release has been created for you.
---


## [2.2.0](v2.1.1...v2.2.0)
(2024-03-13)


### Features

* SIGNAL-5811 add time interval bucket feature to ThrottleAndTimed and
make loop_interval optional
([#39](#39))
([3d48d04](3d48d04))


### Miscellaneous

* Sync files with stordco/common-config-elixir
([#27](#27))
([d7cffde](d7cffde))
* Sync files with stordco/common-config-elixir
([#38](#38))
([c127668](c127668))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants