Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand Oban usage for logging, rerror reporting, and periodic jobs #378

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ config :changelog,

config :changelog, Oban,
repo: Changelog.Repo,
plugins: [Oban.Plugins.Pruner, Oban.Plugins.Stager],
queues: [comment_notifier: 10]
queues: [comment_notifier: 10, scheduled: 5],
plugins: [Oban.Plugins.Pruner, Oban.Plugins.Stager]

config :changelog, Changelog.Mailer, adapter: Bamboo.LocalAdapter

Expand Down
18 changes: 11 additions & 7 deletions config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ config :changelog, Changelog.Mailer,

config :elixir, :time_zone_database, Tzdata.TimeZoneDatabase

config :changelog, Changelog.Scheduler,
global: true,
timezone: "US/Central",
jobs: [
{"0 4 * * *", {Changelog.Stats, :process, []}},
{"0 3 * * *", {Changelog.Slack.Tasks, :import_member_ids, []}},
{"* * * * *", {Changelog.NewsQueue, :publish, []}}
config :changelog, Oban,
plugins: [
Oban.Plugins.Pruner,
Oban.Plugins.Stager,
{Oban.Plugins.Cron,
timezone: "US/Central",
crons: [
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically these would go in the primary config, but to mimic the "prod only tasks" that were already set up I've only defined them in prod.exs.

{"0 4 * * *", Changelog.ObanWorkers.StatsProcessor},
{"0 3 * * *", Changelog.ObanWorkers.SlackImporter},
{"* * * * *", Changelog.ObanWorkers.NewsPublisher}
]}
]

config :changelog, Changelog.PromEx,
Expand Down
1 change: 0 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ config :changelog, Changelog.Repo,
username: System.get_env("DB_USER", "postgres")

config :changelog, Oban,
crontab: false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hasn't been necessary for a while now. The top-level crontab option is deprecated, as CRON is implemented as a plugin now.

queues: false,
plugins: false
13 changes: 8 additions & 5 deletions lib/changelog/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@ defmodule Changelog.Application do
global_ttl: :timer.minutes(5),
touch_on_read: false
),
Changelog.Scheduler,
Changelog.EpisodeTracker,
Changelog.Metacasts.Filterer.Cache,
{Oban, oban_config()}
]

# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Changelog.Supervisor]
Supervisor.start_link(children, opts)
# Only attach the telemetry logger when we aren't in an IEx shell
unless Code.ensure_loaded?(IEx) && IEx.started?() do
Oban.Telemetry.attach_default_logger(:info)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets start/stop/error type span logging for all jobs. Rather than sprinkling custom logging into the workers you can debug and get timing automatically.


Changelog.ObanReporter.attach()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets native sentry errors when a job encounters an error. Without this, the errors are silent and only visible in the errors column stored on the job record in the DB.

end

Supervisor.start_link(children, strategy: :one_for_one, name: Changelog.Supervisor)
end

defp oban_config do
Expand Down
23 changes: 23 additions & 0 deletions lib/changelog/oban_reporter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Changelog.ObanReporter do
@moduledoc false

def attach do
:telemetry.attach(
"oban-errors",
[:oban, :job, :exception],
&handle_event/4,
[]
)
end

def handle_event([:oban, :job, _], measure, meta, _) do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bulk of this was pulled directly from the Oban README.

extra =
meta.job
|> Map.take([:id, :args, :meta, :queue, :worker, :attempt, :max_attempts])
|> Map.merge(measure)

Sentry.capture_exception(meta.error, stacktrace: meta.stacktrace, extra: extra)
end

def handle_event(_event, _measure, _meta, _opts), do: :ok
end
4 changes: 1 addition & 3 deletions lib/changelog/oban_workers/comment_notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ defmodule Changelog.ObanWorkers.CommentNotifier do

alias Changelog.{NewsItemComment, Notifier, Repo}

@five_mins 60 * 5

@impl Oban.Worker
def perform(%Oban.Job{args: %{"comment_id" => comment_id}}) do
comment = Repo.get(NewsItemComment, comment_id)
Expand All @@ -22,7 +20,7 @@ defmodule Changelog.ObanWorkers.CommentNotifier do
"""
def schedule_notification(%NewsItemComment{id: id}) do
%{comment_id: id}
|> __MODULE__.new(schedule_in: @five_mins)
|> new(schedule_in: {5, :minutes})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I've switched to the brand new time unit syntax.

|> Oban.insert()
end
end
10 changes: 10 additions & 0 deletions lib/changelog/oban_workers/news_publisher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule Changelog.ObanWorkers.NewsPublisher do
use Oban.Worker, queue: :scheduled

alias Changelog.NewsQueue

@impl Oban.Worker
def perform(_job) do
{:ok, NewsQueue.publish()}
end
end
28 changes: 28 additions & 0 deletions lib/changelog/oban_workers/slack_importer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule Changelog.ObanWorkers.SlackImporter do
use Oban.Worker, queue: :scheduled

alias Changelog.{Person, Repo}
alias Changelog.Slack.Client

import Ecto.Query

@impl Oban.Worker
def perform(_job) do
%{"members" => members} = Client.list()

for %{"id" => id, "profile" => profile} <- members do
email = Map.get(profile, "email", "")

import_member_id(id, email)
end

:ok
end

def import_member_id(id, email) do
Person
|> where([p], p.email == ^email)
|> where([p], is_nil(p.slack_id) or p.slack_id == "pending")
|> Repo.update_all(set: [slack_id: id])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of update_all here allows for a single DB call, rather than a read/write cycle. It's pretty minor considering that I have no idea how many users are in the slack channel, but it was low fruit 🙂

end
end
Original file line number Diff line number Diff line change
@@ -1,59 +1,57 @@
defmodule Changelog.Stats do
import Ecto
import Ecto.Changeset
defmodule Changelog.ObanWorkers.StatsProcessor do
use Oban.Worker, queue: :scheduled, unique: [period: 600]

import Ecto.Query, only: [select: 3]

alias Changelog.{Cache, Podcast, Repo, Episode, EpisodeStat}
alias Changelog.Stats.{Analyzer, Parser, S3}
alias Ecto.Changeset

require Logger

def process do
end_date = Timex.today()
start_date = Timex.shift(end_date, days: -2)
@impl Oban.Worker
def perform(%Job{args: %{"date" => date, "podcast_id" => podcast_id}}) do
date = Date.from_iso8601!(date)
podcast = Repo.get!(Podcast, podcast_id)

for time <- Timex.Interval.new(from: start_date, until: end_date) do
time |> Timex.to_date() |> process()
end
end

def process(date) do
Logger.info("Stats: Start processing for #{date}")
podcasts = Repo.all(Podcast.public())
process(date, podcasts)
Cache.delete_prefix("stats-")
Logger.info("Stats: Finished processing for #{date}")
end
processed =
date
|> S3.get_logs(podcast.slug)
|> Parser.parse()
|> Enum.group_by(& &1.episode)
|> Enum.map(fn {slug, entries} -> process_episode(date, podcast, slug, entries) end)

def process(date, podcast) when not is_list(podcast), do: process(date, [podcast])
Podcast.update_stat_counts(podcast)

def process(date, podcasts) do
podcasts
|> Enum.map(&Task.async(fn -> process_podcast(date, &1) end))
# 10 minutes
|> Enum.flat_map(&Task.await(&1, 600_000))
{:ok, processed}
end

defp process_podcast(date, podcast) do
Logger.info("Stats: Processing #{podcast.name}")
def perform(_job) do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clause is the "fall-through" that's triggered by cron. In this case, we don't process anything, we only generate one job per date/podcast pair. Those jobs then run independently and in isolation.

today = Date.utc_today()
range = Date.range(Date.add(today, -2), Date.add(today, -1))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newer versions of Elixir have native Date range iteration, which allows us to skip the Timex interval with date conversion step. Fun fact, the use of -2 and -1 is because Timex and Date ranges differed by one day.


processed =
S3.get_logs(date, podcast.slug)
|> Parser.parse()
|> Enum.group_by(& &1.episode)
|> Enum.map(fn {slug, entries} ->
process_episode(date, podcast, slug, entries)
end)
podcast_ids =
Podcast.public()
|> select([p], p.id)
|> Repo.all()

Podcast.update_stat_counts(podcast)
Logger.info("Stats: Finished Processing #{podcast.name}")
jobs =
for(date <- range, pid <- podcast_ids, do: %{date: date, podcast_id: pid})
|> Enum.map(&new/1)
|> Oban.insert_all()

Cache.delete_prefix("stats-")

processed
{:ok, jobs}
end

@impl Oban.Worker
def timeout(_job), do: 600_000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout/1 callback will prevent any individual job from running more than 10 minutes, like the previous task async/await did.


defp process_episode(date, podcast, slug, entries) do
if episode = Repo.get_by(assoc(podcast, :episodes), slug: slug) do
if episode = Repo.get_by(Ecto.assoc(podcast, :episodes), slug: slug) do
stat =
case Repo.get_by(assoc(episode, :episode_stats), date: date) do
case Repo.get_by(Ecto.assoc(episode, :episode_stats), date: date) do
nil ->
%EpisodeStat{
episode_id: episode.id,
Expand All @@ -67,7 +65,7 @@ defmodule Changelog.Stats do
end

stat =
change(stat, %{
Changeset.change(stat, %{
total_bytes: Analyzer.bytes(entries),
downloads: Analyzer.downloads(entries, stat.episode_bytes),
uniques: Analyzer.uniques_count(entries),
Expand Down
3 changes: 0 additions & 3 deletions lib/changelog/scheduler.ex

This file was deleted.

27 changes: 0 additions & 27 deletions lib/changelog/slack/tasks.ex

This file was deleted.

5 changes: 3 additions & 2 deletions lib/changelog_web/controllers/slack_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ defmodule ChangelogWeb.SlackController do
use ChangelogWeb, :controller

alias Changelog.Episode
alias Changelog.Slack.{Client, Countdown, Messages, Tasks}
alias Changelog.Slack.{Client, Countdown, Messages}
alias Changelog.ObanWorkers.SlackImporter

require Logger

Expand Down Expand Up @@ -32,7 +33,7 @@ defmodule ChangelogWeb.SlackController do
id = Map.get(member, "id")
email = get_in(member, ["profile", "email"]) || ""
Client.im(id, Messages.welcome())
Tasks.import_member_id(id, email)
SlackImporter.import_member_id(id, email)
json(conn, %{})
end

Expand Down
2 changes: 1 addition & 1 deletion lib/mix/tasks/slack.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ defmodule Mix.Tasks.Changelog.Slack do

def run(_) do
Mix.Task.run("app.start")
Changelog.Slack.Tasks.import_member_ids()
Changelog.ObanWorkers.SlackImporter.perform(%Oban.Job{})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronous invocation using an empty job struct.

end
end
21 changes: 19 additions & 2 deletions lib/mix/tasks/stats/process.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
defmodule Mix.Tasks.Changelog.Stats.Process do
use Mix.Task

alias Changelog.{Podcast, Repo}
alias Changelog.ObanWorkers.StatsProcessor

@shortdoc "Processes stats for given date, or all missing dates"

def run(args) when is_nil(args), do: run([])
Expand All @@ -9,8 +12,22 @@ defmodule Mix.Tasks.Changelog.Stats.Process do
Mix.Task.run("app.start")

case Timex.parse(List.first(args), "{YYYY}-{0M}-{D}") do
{:ok, time} -> Changelog.Stats.process(Timex.to_date(time))
{:error, _message} -> Changelog.Stats.process()
{:ok, time} ->
date = Timex.to_date(time)

Podcast.public()
|> Repo.all()
|> Enum.map(&StatsProcessor.new(%{date: date, podcast_id: &1.id}))
|> Oban.insert_all()

{:error, _message} ->
%{}
|> StatsProcessor.new()
|> Oban.insert!()
end

results = Oban.drain_queue(queue: :scheduled, with_recursion: true, with_safety: false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drain_queue/1 function will execute all of the jobs in this process. The with_recursion option means that it will keep executing new jobs after the previous one finishes, which is required to process the sub-jobs generated by the default case. The with_safety flag disables "safe execution", any errors that are encountered will bubble up to the caller so you can see them from the CLI.


Mix.shell().info("Stats processed for #{results.success - 1} dates/podcasts")
end
end
3 changes: 1 addition & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ defmodule Changelog.Mixfile do
{:plug_cowboy, "~> 2.0"},
# Leaving this here for future dev loops with @akoutmos
# {:prom_ex, github: "akoutmos/prom_ex", branch: "master"},
{:oban, "~> 2.7.2"},
{:oban, "~> 2.8"},
{:prom_ex, "~> 1.3.0"},
{:unplug, "~> 0.2.1"},
{:postgrex, ">= 0.0.0"},
Expand All @@ -66,7 +66,6 @@ defmodule Changelog.Mixfile do
{:nimble_csv, "~> 1.1.0"},
{:sweet_xml, "~> 0.6"},
{:user_agent_parser, "~> 1.0"},
{:quantum, ">= 2.1.0"},
{:oauth, github: "tim/erlang-oauth"},
{:ueberauth_github, "~> 0.4"},
{:ueberauth_twitter, github: "jerodsanto/ueberauth_twitter"},
Expand Down
Loading