diff --git a/config/config.exs b/config/config.exs index fae1de4534..7f25fbd626 100644 --- a/config/config.exs +++ b/config/config.exs @@ -40,8 +40,8 @@ config :changelog, config :changelog, Oban, repo: Changelog.Repo, - plugins: [Oban.Plugins.Pruner, Oban.Plugins.Stager], - queues: [comment_notifier: 10] + queues: [comment_notifier: 10, scheduled: 5], + plugins: [Oban.Plugins.Pruner, Oban.Plugins.Stager] config :changelog, Changelog.Mailer, adapter: Bamboo.LocalAdapter diff --git a/config/prod.exs b/config/prod.exs index 97d741c4ea..4eca6a957b 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -50,13 +50,17 @@ config :changelog, Changelog.Mailer, config :elixir, :time_zone_database, Tzdata.TimeZoneDatabase -config :changelog, Changelog.Scheduler, - global: true, - timezone: "US/Central", - jobs: [ - {"0 4 * * *", {Changelog.Stats, :process, []}}, - {"0 3 * * *", {Changelog.Slack.Tasks, :import_member_ids, []}}, - {"* * * * *", {Changelog.NewsQueue, :publish, []}} +config :changelog, Oban, + plugins: [ + Oban.Plugins.Pruner, + Oban.Plugins.Stager, + {Oban.Plugins.Cron, + timezone: "US/Central", + crons: [ + {"0 4 * * *", Changelog.ObanWorkers.StatsProcessor}, + {"0 3 * * *", Changelog.ObanWorkers.SlackImporter}, + {"* * * * *", Changelog.ObanWorkers.NewsPublisher} + ]} ] config :changelog, Changelog.PromEx, diff --git a/config/test.exs b/config/test.exs index b57cd007d8..c1879d03de 100644 --- a/config/test.exs +++ b/config/test.exs @@ -22,6 +22,5 @@ config :changelog, Changelog.Repo, username: System.get_env("DB_USER", "postgres") config :changelog, Oban, - crontab: false, queues: false, plugins: false diff --git a/lib/changelog/application.ex b/lib/changelog/application.ex index 51afc0eb4c..0b2cb8196d 100644 --- a/lib/changelog/application.ex +++ b/lib/changelog/application.ex @@ -24,16 +24,19 @@ defmodule Changelog.Application do global_ttl: :timer.minutes(5), touch_on_read: false ), - Changelog.Scheduler, Changelog.EpisodeTracker, Changelog.Metacasts.Filterer.Cache, {Oban, oban_config()} ] - # See https://hexdocs.pm/elixir/Supervisor.html - # for other strategies and supported options - opts = [strategy: :one_for_one, name: Changelog.Supervisor] - Supervisor.start_link(children, opts) + # Only attach the telemetry logger when we aren't in an IEx shell + unless Code.ensure_loaded?(IEx) && IEx.started?() do + Oban.Telemetry.attach_default_logger(:info) + + Changelog.ObanReporter.attach() + end + + Supervisor.start_link(children, strategy: :one_for_one, name: Changelog.Supervisor) end defp oban_config do diff --git a/lib/changelog/oban_reporter.ex b/lib/changelog/oban_reporter.ex new file mode 100644 index 0000000000..b57db73b84 --- /dev/null +++ b/lib/changelog/oban_reporter.ex @@ -0,0 +1,23 @@ +defmodule Changelog.ObanReporter do + @moduledoc false + + def attach do + :telemetry.attach( + "oban-errors", + [:oban, :job, :exception], + &handle_event/4, + [] + ) + end + + def handle_event([:oban, :job, _], measure, meta, _) do + extra = + meta.job + |> Map.take([:id, :args, :meta, :queue, :worker, :attempt, :max_attempts]) + |> Map.merge(measure) + + Sentry.capture_exception(meta.error, stacktrace: meta.stacktrace, extra: extra) + end + + def handle_event(_event, _measure, _meta, _opts), do: :ok +end diff --git a/lib/changelog/oban_workers/comment_notifier.ex b/lib/changelog/oban_workers/comment_notifier.ex index dacee071f2..2cdddb1dd6 100644 --- a/lib/changelog/oban_workers/comment_notifier.ex +++ b/lib/changelog/oban_workers/comment_notifier.ex @@ -7,8 +7,6 @@ defmodule Changelog.ObanWorkers.CommentNotifier do alias Changelog.{NewsItemComment, Notifier, Repo} - @five_mins 60 * 5 - @impl Oban.Worker def perform(%Oban.Job{args: %{"comment_id" => comment_id}}) do comment = Repo.get(NewsItemComment, comment_id) @@ -22,7 +20,7 @@ defmodule Changelog.ObanWorkers.CommentNotifier do """ def schedule_notification(%NewsItemComment{id: id}) do %{comment_id: id} - |> __MODULE__.new(schedule_in: @five_mins) + |> new(schedule_in: {5, :minutes}) |> Oban.insert() end end diff --git a/lib/changelog/oban_workers/news_publisher.ex b/lib/changelog/oban_workers/news_publisher.ex new file mode 100644 index 0000000000..5263891175 --- /dev/null +++ b/lib/changelog/oban_workers/news_publisher.ex @@ -0,0 +1,10 @@ +defmodule Changelog.ObanWorkers.NewsPublisher do + use Oban.Worker, queue: :scheduled + + alias Changelog.NewsQueue + + @impl Oban.Worker + def perform(_job) do + {:ok, NewsQueue.publish()} + end +end diff --git a/lib/changelog/oban_workers/slack_importer.ex b/lib/changelog/oban_workers/slack_importer.ex new file mode 100644 index 0000000000..ed55bc0bb6 --- /dev/null +++ b/lib/changelog/oban_workers/slack_importer.ex @@ -0,0 +1,28 @@ +defmodule Changelog.ObanWorkers.SlackImporter do + use Oban.Worker, queue: :scheduled + + alias Changelog.{Person, Repo} + alias Changelog.Slack.Client + + import Ecto.Query + + @impl Oban.Worker + def perform(_job) do + %{"members" => members} = Client.list() + + for %{"id" => id, "profile" => profile} <- members do + email = Map.get(profile, "email", "") + + import_member_id(id, email) + end + + :ok + end + + def import_member_id(id, email) do + Person + |> where([p], p.email == ^email) + |> where([p], is_nil(p.slack_id) or p.slack_id == "pending") + |> Repo.update_all(set: [slack_id: id]) + end +end diff --git a/lib/changelog/stats/stats.ex b/lib/changelog/oban_workers/stats_processor.ex similarity index 52% rename from lib/changelog/stats/stats.ex rename to lib/changelog/oban_workers/stats_processor.ex index 0d368575b3..4ec7788fa3 100644 --- a/lib/changelog/stats/stats.ex +++ b/lib/changelog/oban_workers/stats_processor.ex @@ -1,59 +1,57 @@ -defmodule Changelog.Stats do - import Ecto - import Ecto.Changeset +defmodule Changelog.ObanWorkers.StatsProcessor do + use Oban.Worker, queue: :scheduled, unique: [period: 600] + + import Ecto.Query, only: [select: 3] alias Changelog.{Cache, Podcast, Repo, Episode, EpisodeStat} alias Changelog.Stats.{Analyzer, Parser, S3} + alias Ecto.Changeset require Logger - def process do - end_date = Timex.today() - start_date = Timex.shift(end_date, days: -2) + @impl Oban.Worker + def perform(%Job{args: %{"date" => date, "podcast_id" => podcast_id}}) do + date = Date.from_iso8601!(date) + podcast = Repo.get!(Podcast, podcast_id) - for time <- Timex.Interval.new(from: start_date, until: end_date) do - time |> Timex.to_date() |> process() - end - end - - def process(date) do - Logger.info("Stats: Start processing for #{date}") - podcasts = Repo.all(Podcast.public()) - process(date, podcasts) - Cache.delete_prefix("stats-") - Logger.info("Stats: Finished processing for #{date}") - end + processed = + date + |> S3.get_logs(podcast.slug) + |> Parser.parse() + |> Enum.group_by(& &1.episode) + |> Enum.map(fn {slug, entries} -> process_episode(date, podcast, slug, entries) end) - def process(date, podcast) when not is_list(podcast), do: process(date, [podcast]) + Podcast.update_stat_counts(podcast) - def process(date, podcasts) do - podcasts - |> Enum.map(&Task.async(fn -> process_podcast(date, &1) end)) - # 10 minutes - |> Enum.flat_map(&Task.await(&1, 600_000)) + {:ok, processed} end - defp process_podcast(date, podcast) do - Logger.info("Stats: Processing #{podcast.name}") + def perform(_job) do + today = Date.utc_today() + range = Date.range(Date.add(today, -2), Date.add(today, -1)) - processed = - S3.get_logs(date, podcast.slug) - |> Parser.parse() - |> Enum.group_by(& &1.episode) - |> Enum.map(fn {slug, entries} -> - process_episode(date, podcast, slug, entries) - end) + podcast_ids = + Podcast.public() + |> select([p], p.id) + |> Repo.all() - Podcast.update_stat_counts(podcast) - Logger.info("Stats: Finished Processing #{podcast.name}") + jobs = + for(date <- range, pid <- podcast_ids, do: %{date: date, podcast_id: pid}) + |> Enum.map(&new/1) + |> Oban.insert_all() + + Cache.delete_prefix("stats-") - processed + {:ok, jobs} end + @impl Oban.Worker + def timeout(_job), do: 600_000 + defp process_episode(date, podcast, slug, entries) do - if episode = Repo.get_by(assoc(podcast, :episodes), slug: slug) do + if episode = Repo.get_by(Ecto.assoc(podcast, :episodes), slug: slug) do stat = - case Repo.get_by(assoc(episode, :episode_stats), date: date) do + case Repo.get_by(Ecto.assoc(episode, :episode_stats), date: date) do nil -> %EpisodeStat{ episode_id: episode.id, @@ -67,7 +65,7 @@ defmodule Changelog.Stats do end stat = - change(stat, %{ + Changeset.change(stat, %{ total_bytes: Analyzer.bytes(entries), downloads: Analyzer.downloads(entries, stat.episode_bytes), uniques: Analyzer.uniques_count(entries), diff --git a/lib/changelog/scheduler.ex b/lib/changelog/scheduler.ex deleted file mode 100644 index e5899e6538..0000000000 --- a/lib/changelog/scheduler.ex +++ /dev/null @@ -1,3 +0,0 @@ -defmodule Changelog.Scheduler do - use Quantum, otp_app: :changelog -end diff --git a/lib/changelog/slack/tasks.ex b/lib/changelog/slack/tasks.ex deleted file mode 100644 index e12a38dd35..0000000000 --- a/lib/changelog/slack/tasks.ex +++ /dev/null @@ -1,27 +0,0 @@ -defmodule Changelog.Slack.Tasks do - alias Changelog.{Person, Repo} - alias Changelog.Slack.Client - - import Ecto.Query - - def import_member_ids do - list = Client.list() - - for member <- Map.get(list, "members") do - id = Map.get(member, "id") - email = get_in(member, ["profile", "email"]) || "" - import_member_id(id, email) - end - end - - def import_member_id(id, email) do - query = - from p in Person, - where: p.email == ^email, - where: is_nil(p.slack_id) or p.slack_id == "pending" - - if person = Repo.one(query) do - Repo.update(Ecto.Changeset.change(person, slack_id: id)) - end - end -end diff --git a/lib/changelog_web/controllers/slack_controller.ex b/lib/changelog_web/controllers/slack_controller.ex index f97413c3b9..13d61ad1d1 100644 --- a/lib/changelog_web/controllers/slack_controller.ex +++ b/lib/changelog_web/controllers/slack_controller.ex @@ -2,7 +2,8 @@ defmodule ChangelogWeb.SlackController do use ChangelogWeb, :controller alias Changelog.Episode - alias Changelog.Slack.{Client, Countdown, Messages, Tasks} + alias Changelog.Slack.{Client, Countdown, Messages} + alias Changelog.ObanWorkers.SlackImporter require Logger @@ -32,7 +33,7 @@ defmodule ChangelogWeb.SlackController do id = Map.get(member, "id") email = get_in(member, ["profile", "email"]) || "" Client.im(id, Messages.welcome()) - Tasks.import_member_id(id, email) + SlackImporter.import_member_id(id, email) json(conn, %{}) end diff --git a/lib/mix/tasks/slack.ex b/lib/mix/tasks/slack.ex index b5fdd2015c..2df05e5822 100644 --- a/lib/mix/tasks/slack.ex +++ b/lib/mix/tasks/slack.ex @@ -5,6 +5,6 @@ defmodule Mix.Tasks.Changelog.Slack do def run(_) do Mix.Task.run("app.start") - Changelog.Slack.Tasks.import_member_ids() + Changelog.ObanWorkers.SlackImporter.perform(%Oban.Job{}) end end diff --git a/lib/mix/tasks/stats/process.ex b/lib/mix/tasks/stats/process.ex index df7ef4e82e..8654e51ce8 100644 --- a/lib/mix/tasks/stats/process.ex +++ b/lib/mix/tasks/stats/process.ex @@ -1,6 +1,9 @@ defmodule Mix.Tasks.Changelog.Stats.Process do use Mix.Task + alias Changelog.{Podcast, Repo} + alias Changelog.ObanWorkers.StatsProcessor + @shortdoc "Processes stats for given date, or all missing dates" def run(args) when is_nil(args), do: run([]) @@ -9,8 +12,22 @@ defmodule Mix.Tasks.Changelog.Stats.Process do Mix.Task.run("app.start") case Timex.parse(List.first(args), "{YYYY}-{0M}-{D}") do - {:ok, time} -> Changelog.Stats.process(Timex.to_date(time)) - {:error, _message} -> Changelog.Stats.process() + {:ok, time} -> + date = Timex.to_date(time) + + Podcast.public() + |> Repo.all() + |> Enum.map(&StatsProcessor.new(%{date: date, podcast_id: &1.id})) + |> Oban.insert_all() + + {:error, _message} -> + %{} + |> StatsProcessor.new() + |> Oban.insert!() end + + results = Oban.drain_queue(queue: :scheduled, with_recursion: true, with_safety: false) + + Mix.shell().info("Stats processed for #{results.success - 1} dates/podcasts") end end diff --git a/mix.exs b/mix.exs index 3ae2770627..ea8232eb04 100644 --- a/mix.exs +++ b/mix.exs @@ -41,7 +41,7 @@ defmodule Changelog.Mixfile do {:plug_cowboy, "~> 2.0"}, # Leaving this here for future dev loops with @akoutmos # {:prom_ex, github: "akoutmos/prom_ex", branch: "master"}, - {:oban, "~> 2.7.2"}, + {:oban, "~> 2.8"}, {:prom_ex, "~> 1.3.0"}, {:unplug, "~> 0.2.1"}, {:postgrex, ">= 0.0.0"}, @@ -66,7 +66,6 @@ defmodule Changelog.Mixfile do {:nimble_csv, "~> 1.1.0"}, {:sweet_xml, "~> 0.6"}, {:user_agent_parser, "~> 1.0"}, - {:quantum, ">= 2.1.0"}, {:oauth, github: "tim/erlang-oauth"}, {:ueberauth_github, "~> 0.4"}, {:ueberauth_twitter, github: "jerodsanto/ueberauth_twitter"}, diff --git a/mix.lock b/mix.lock index bedc234bf5..46f780a304 100644 --- a/mix.lock +++ b/mix.lock @@ -27,13 +27,11 @@ "ex_aws_s3": {:hex, :ex_aws_s3, "2.3.0", "5dfe50116bad048240bae7cd9418bfe23296542ff72a01b9138113a1cd31451c", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "0b13b11478825d62d2f6e57ae763695331be06f2216468f31bb304316758b096"}, "ex_machina": {:hex, :ex_machina, "2.7.0", "b792cc3127fd0680fecdb6299235b4727a4944a09ff0fa904cc639272cd92dc7", [:mix], [{:ecto, "~> 2.2 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.0", [hex: :ecto_sql, repo: "hexpm", optional: true]}], "hexpm", "419aa7a39bde11894c87a615c4ecaa52d8f107bbdd81d810465186f783245bf8"}, "excoveralls": {:hex, :excoveralls, "0.14.2", "f9f5fd0004d7bbeaa28ea9606251bb643c313c3d60710bad1f5809c845b748f0", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "ca6fd358621cb4d29311b29d4732c4d47dac70e622850979bc54ed9a3e50f3e1"}, - "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "32e95820a97cffea67830e91514a2ad53b888850442d6d395f53a1ac60c82e07"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "finch": {:hex, :finch, "0.7.0", "2352962c81fd54952788d66e5eed436a7b734745e0c1cdd5b28a28184fe3e5cd", [:mix], [{:castore, "~> 0.1", [hex: :castore, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.5", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4e046b6a4f010898a036244d680ae212331aab2f0a1005bcded3e3e3364f0025"}, "floki": {:hex, :floki, "0.31.0", "f05ee8a8e6a3ced4e62beeb2c79a63bc8e12ab98fbaaf6e6a3d9b76b1278e23f", [:mix], [{:html_entities, "~> 0.5.0", [hex: :html_entities, repo: "hexpm", optional: false]}], "hexpm", "b05afa372f5c345a5bf240ac25ea1f0f3d5fcfd7490ac0beeb4a203f9444891e"}, "gen_smtp": {:hex, :gen_smtp, "1.1.1", "bf9303c31735100631b1d708d629e4c65944319d1143b5c9952054f4a1311d85", [:rebar3], [{:hut, "1.3.0", [hex: :hut, repo: "hexpm", optional: false]}, {:ranch, ">= 1.7.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "51bc50cc017efd4a4248cbc39ea30fb60efa7d4a49688986fafad84434ff9ab7"}, "gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"}, - "gen_state_machine": {:hex, :gen_state_machine, "2.1.0", "a38b0e53fad812d29ec149f0d354da5d1bc0d7222c3711f3a0bd5aa608b42992", [:mix], [], "hexpm", "ae367038808db25cee2f2c4b8d0531522ea587c4995eb6f96ee73410a60fa06b"}, "gettext": {:hex, :gettext, "0.18.2", "7df3ea191bb56c0309c00a783334b288d08a879f53a7014341284635850a6e55", [:mix], [], "hexpm", "f9f537b13d4fdd30f3039d33cb80144c3aa1f8d9698e47d7bcbcc8df93b1f5c5"}, "hackney": {:hex, :hackney, "1.17.4", "99da4674592504d3fb0cfef0db84c3ba02b4508bae2dff8c0108baa0d6e0977c", [:rebar3], [{:certifi, "~>2.6.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "de16ff4996556c8548d512f4dbe22dd58a587bf3332e7fd362430a7ef3986b16"}, "hashids": {:hex, :hashids, "2.0.5", "d9839924c8221b954da8b110eda3e59c2c03df0389bac6e7d0e535f937033df1", [:mix], [], "hexpm", "ef47d8679f20d7bea59d0d49c202258c89f61b9b741bd3dceef2c1985cf95554"}, @@ -44,8 +42,6 @@ "icalendar": {:hex, :icalendar, "1.1.0", "898a8640abb32d161d990e419999004718a7a4b48be31f48db248f90ca33fa6e", [:mix], [{:timex, "~> 3.4", [hex: :timex, repo: "hexpm", optional: false]}], "hexpm", "a131f45fbabd2ee5a22e6bc49ea91e81131158394e7169274cee866263640dca"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, - "jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm", "fc3499fed7a726995aa659143a248534adc754ebd16ccd437cd93b649a95091f"}, - "libring": {:hex, :libring, "1.5.0", "44313eb6862f5c9168594a061e9d5f556a9819da7c6444706a9e2da533396d70", [:mix], [], "hexpm", "04e843d4fdcff49a62d8e03778d17c6cb2a03fe2d14020d3825a1761b55bd6cc"}, "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"}, @@ -59,7 +55,7 @@ "oauth": {:git, "https://github.com/tim/erlang-oauth.git", "bd19896e31125f99ff45bb5850b1c0e74b996743", []}, "oauth2": {:hex, :oauth2, "2.0.0", "338382079fe16c514420fa218b0903f8ad2d4bfc0ad0c9f988867dfa246731b0", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "881b8364ac7385f9fddc7949379cbe3f7081da37233a1aa7aab844670a91e7e7"}, "oauther": {:hex, :oauther, "1.1.1", "7d8b16167bb587ecbcddd3f8792beb9ec3e7b65c1f8ebd86b8dd25318d535752", [:mix], [], "hexpm", "9374f4302045321874cccdc57eb975893643bd69c3b22bf1312dab5f06e5788e"}, - "oban": {:hex, :oban, "2.7.2", "93ea2a767bded05d37924f26fa4ac0600f8e256689e0b76c91315e5be45e2d58", [:mix], [{:ecto_sql, ">= 3.4.3", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "254bbcd6526b355ce79a93831d1375cd58725ff282f2fa0d362ef194f1218ef1"}, + "oban": {:hex, :oban, "2.8.0", "e44b19a30e30bb983099f55d59749316ff0eaf5dfef4214e1190738176653e50", [:mix], [{:ecto_sql, ">= 3.4.3", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2954a2ac418f7cc4217c0772a3dd3a70e2966240583b97f4126a489e1300a573"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "phoenix": {:hex, :phoenix, "1.5.9", "a6368d36cfd59d917b37c44386e01315bc89f7609a10a45a22f47c007edf2597", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7e4bce20a67c012f1fbb0af90e5da49fa7bf0d34e3a067795703b74aef75427d"}, "phoenix_ecto": {:hex, :phoenix_ecto, "4.3.0", "2c69a452c2e0ee8c93345ae1cdc1696ef4877ff9cbb15c305def41960c3c4ebf", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "0ac491924217550c8f42c81c1f390b5d81517d12ceaf9abf3e701156760a848e"}, @@ -70,18 +66,16 @@ "plug_cowboy": {:hex, :plug_cowboy, "2.5.1", "7cc96ff645158a94cf3ec9744464414f02287f832d6847079adfe0b58761cbd0", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "107d0a5865fa92bcb48e631cc0729ae9ccfa0a9f9a1bd8f01acb513abf1c2d64"}, "plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm", "fec8660eb7733ee4117b85f55799fd3833eb769a6df71ccf8903e8dc5447cfce"}, - "postgrex": {:hex, :postgrex, "0.15.9", "46f8fe6f25711aeb861c4d0ae09780facfdf3adbd2fb5594ead61504dd489bda", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "610719103e4cb2223d4ab78f9f0f3e720320eeca6011415ab4137ddef730adee"}, + "postgrex": {:hex, :postgrex, "0.15.10", "2809dee1b1d76f7cbabe570b2a9285c2e7b41be60cf792f5f2804a54b838a067", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "1560ca427542f6b213f8e281633ae1a3b31cdbcd84ebd7f50628765b8f6132be"}, "prom_ex": {:hex, :prom_ex, "1.3.0", "7949d45af8a8da7258ed0575997473f6937473819a382be1582a89146c206745", [:mix], [{:absinthe, ">= 1.6.0", [hex: :absinthe, repo: "hexpm", optional: true]}, {:ecto, ">= 3.5.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:finch, "~> 0.7.0", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, ">= 2.4.0", [hex: :oban, repo: "hexpm", optional: true]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, ">= 0.14.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, ">= 1.10.0", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 2.1", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.2", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0.1", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.1", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "9ca9e796b0d766d9a7f216faeb78cbfd5943c70c8c94b4b88436c2fde0900af3"}, "quantum": {:hex, :quantum, "3.3.0", "e8f6b9479728774288c5f426b11a6e3e8f619f3c226163a7e18bccfe543b714d", [:mix], [{:crontab, "~> 1.1", [hex: :crontab, repo: "hexpm", optional: false]}, {:gen_stage, "~> 0.14 or ~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3b83ef137ab3887e783b013418b5ce3e847d66b71c4ef0f233b0321c84b72f67"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, - "rollbax": {:hex, :rollbax, "0.8.2", "204a47a83fe32745def19ea0307b297395c00315fb85f30dfda04d5009b5ecb9", [:mix], [{:hackney, "~> 1.1", [hex: :hackney, repo: "hexpm", optional: false]}, {:poison, "~> 1.4 or ~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm", "6575d11dab446ef8010d7b3815bd476965825964d77cf45632b7f28ba88b9de9"}, "scrivener": {:hex, :scrivener, "2.7.0", "fa94cdea21fad0649921d8066b1833d18d296217bfdf4a5389a2f45ee857b773", [:mix], [], "hexpm", "30da36a427f2519cf75993271fb7c5aad1759682a70f90d880a85c3d743d2c57"}, "scrivener_ecto": {:hex, :scrivener_ecto, "2.7.0", "cf64b8cb8a96cd131cdbcecf64e7fd395e21aaa1cb0236c42a7c2e34b0dca580", [:mix], [{:ecto, "~> 3.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:scrivener, "~> 2.4", [hex: :scrivener, repo: "hexpm", optional: false]}], "hexpm", "e809f171687806b0031129034352f5ae44849720c48dd839200adeaf0ac3e260"}, "scrivener_html": {:git, "https://github.com/jerodsanto/scrivener_html.git", "a05c95efd6a1d5aeba626be15259460163cdb55c", [branch: "phx-1-5-9"]}, "sentry": {:hex, :sentry, "8.0.5", "5ca922b9238a50c7258b52f47364b2d545beda5e436c7a43965b34577f1ef61f", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 2.3", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "4972839fdbf52e886d7b3e694c8adf421f764f2fa79036b88fb4742049bd4b7c"}, "shopify": {:hex, :shopify, "0.4.0", "bb53bba95d56a047c5cd4b1a794ef1b6408fb71268d1782bb42e7af15f25a546", [:mix], [{:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:poison, "~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm", "7c98d571a58c031e8540416140f3d5ad5b161ee93f45b82ecefa13e27065ae6d"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, - "swarm": {:hex, :swarm, "3.4.0", "64f8b30055d74640d2186c66354b33b999438692a91be275bb89cdc7e401f448", [:mix], [{:gen_state_machine, "~> 2.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:libring, "~> 1.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm", "94884f84783fc1ba027aba8fe8a7dae4aad78c98e9f9c76667ec3471585c08c6"}, "sweet_xml": {:hex, :sweet_xml, "0.7.0", "39ca6a53c526a1759672690656d5a787bee1016bfff467310170f9b428a238cb", [:mix], [], "hexpm", "2f18cb07f22b5a0d3e99d8b7e4176020f0051f90e449968821e4fde930edd945"}, "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"}, diff --git a/test/changelog/stats/stats_test.exs b/test/changelog/oban_workers/stats_processor_test.exs similarity index 84% rename from test/changelog/stats/stats_test.exs rename to test/changelog/oban_workers/stats_processor_test.exs index f303c4ba92..c2f1d0ecb5 100644 --- a/test/changelog/stats/stats_test.exs +++ b/test/changelog/oban_workers/stats_processor_test.exs @@ -1,9 +1,11 @@ -defmodule ChangelogStatsTest do +defmodule Changelog.ObanWorkers.StatsProcessorTest do use Changelog.SchemaCase + use Oban.Testing, repo: Changelog.Repo import Mock alias Changelog.{Stats, Episode, Podcast, Repo} + alias Changelog.ObanWorkers.StatsProcessor defp log_fixtures(date) do file_dir = "#{fixtures_path()}/logs/#{date}" @@ -11,14 +13,30 @@ defmodule ChangelogStatsTest do Enum.map(files, fn file -> File.read!("#{file_dir}/#{file}") end) end - describe "process" do + describe "perform/1" do + test "inserting jobs for each public podcast and date" do + podcast1 = insert(:podcast) + podcast2 = insert(:podcast) + + {:ok, jobs} = perform_job(StatsProcessor, %{}) + + assert length(jobs) == 4 + + args = Enum.map(jobs, & &1.args) + + assert %{"date" => iso_ago(1), "podcast_id" => podcast1.id} in args + assert %{"date" => iso_ago(2), "podcast_id" => podcast1.id} in args + assert %{"date" => iso_ago(1), "podcast_id" => podcast2.id} in args + assert %{"date" => iso_ago(2), "podcast_id" => podcast2.id} in args + end + test_with_mock "it processes known logs from 2016-10-10", Stats.S3, get_logs: fn date, _slug -> log_fixtures(date) end do podcast = insert(:podcast) e1 = insert(:episode, podcast: podcast, slug: "223", audio_bytes: 80_743_303) - stats = Stats.process(~D[2016-10-10], podcast) + {:ok, stats} = perform_job(StatsProcessor, %{date: ~D[2016-10-10], podcast_id: podcast.id}) assert length(stats) == 1 stat = get_stat(stats, e1) @@ -51,7 +69,8 @@ defmodule ChangelogStatsTest do e15 = insert(:episode, podcast: podcast, slug: "222", audio_bytes: 81_563_934) e16 = insert(:episode, podcast: podcast, slug: "223", audio_bytes: 80_743_303) - stats = Stats.process(~D[2016-10-11], podcast) + {:ok, stats} = perform_job(StatsProcessor, %{date: ~D[2016-10-11], podcast_id: podcast.id}) + assert length(stats) == 16 stat = get_stat(stats, e1) @@ -138,6 +157,12 @@ defmodule ChangelogStatsTest do assert refreshed_reach_count(podcast) == 45 end + defp iso_ago(days) do + Date.utc_today() + |> Date.add(-days) + |> Date.to_iso8601() + end + defp get_stat(stats, episode) do Enum.find(stats, fn x -> x.episode_id == episode.id end) end diff --git a/test/changelog_web/controllers/slack_controller_test.exs b/test/changelog_web/controllers/slack_controller_test.exs index 4540036921..fd8eebacde 100644 --- a/test/changelog_web/controllers/slack_controller_test.exs +++ b/test/changelog_web/controllers/slack_controller_test.exs @@ -3,7 +3,8 @@ defmodule ChangelogWeb.SlackControllerTest do import Mock - alias Changelog.Slack.{Client, Messages, Tasks} + alias Changelog.Slack.{Client, Messages} + alias Changelog.ObanWorkers.SlackImporter describe "the countdown endpoint" do setup do @@ -71,7 +72,7 @@ defmodule ChangelogWeb.SlackControllerTest do } do with_mocks([ {Client, [], [im: fn _, _ -> nil end]}, - {Tasks, [], [import_member_id: fn _, _ -> nil end]} + {SlackImporter, [], [import_member_id: fn _, _ -> nil end]} ]) do conn = post(conn, Routes.slack_path(conn, :event), %{ @@ -89,7 +90,7 @@ defmodule ChangelogWeb.SlackControllerTest do }) assert called(Client.im("U2XU53R", Messages.welcome())) - assert called(Tasks.import_member_id("U2XU53R", "grace@hopper.com")) + assert called(SlackImporter.import_member_id("U2XU53R", "grace@hopper.com")) assert conn.status == 200 end end