Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send pulse triggers should respect report timezone #42502

Merged
merged 7 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/metabase/driver.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
timezone-id
(str (t/zone-id))))

(defn- update-send-pulse-triggers-timezone!
[]
(classloader/require 'metabase.task.send-pulses)
((resolve 'metabase.task.send-pulses/update-send-pulse-triggers-timezone!)))
Comment on lines +56 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll be nice to replace this sneak hidden cross-module dependency with an event system - maybe in scope for the notifications refactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's one of my annoyance as well!


(defsetting report-timezone
(deferred-tru "Connection timezone to use when executing queries. Defaults to system timezone.")
:visibility :settings-manager
Expand All @@ -59,7 +64,8 @@
:setter
(fn [new-value]
(setting/set-value-of-type! :string :report-timezone new-value)
(notify-all-databases-updated)))
(notify-all-databases-updated)
(update-send-pulse-triggers-timezone!)))

(defsetting report-timezone-short
"Current report timezone abbreviation"
Expand Down
5 changes: 4 additions & 1 deletion src/metabase/task.clj
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@
:priority (.getPriority trigger)
:start-time (.getStartTime trigger)
:may-fire-again? (.mayFireAgain trigger)
:data (.getJobDataMap trigger)})
:data (into {} (.getJobDataMap trigger))})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you need to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need this in tests because we can't compare JobDataMap with clojure map


(defmethod trigger->info CronTrigger
[^CronTrigger trigger]
Expand All @@ -274,6 +274,9 @@
:schedule
(.getCronExpression trigger)

:timezone
(.getID (.getTimeZone trigger))

:misfire-instruction
;; not 100% sure why `case` doesn't work here...
(condp = (.getMisfireInstruction trigger)
Expand Down
53 changes: 42 additions & 11 deletions src/metabase/task/send_pulses.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
[clojurewerkz.quartzite.jobs :as jobs]
[clojurewerkz.quartzite.schedule.cron :as cron]
[clojurewerkz.quartzite.triggers :as triggers]
[metabase.driver :as driver]
[metabase.models.pulse :as pulse]
[metabase.models.task-history :as task-history]
[metabase.pulse]
[metabase.query-processor.timezone :as qp.timezone]
[metabase.task :as task]
[metabase.util.cron :as u.cron]
[metabase.util.log :as log]
[metabase.util.malli :as mu]
[toucan2.core :as t2])
(:import
(java.util TimeZone)
(org.quartz CronTrigger TriggerKey)))

(set! *warn-on-reflection* true)
Expand All @@ -35,12 +38,13 @@
u.cron/schedule-map->cron-string
(str/replace " " "_")))))

(defn- send-pulse-trigger-key->schedule-map
(defn- send-pulse-trigger-key->info
[trigger-key]
(let [[_ _pulse-id schedule-str] (re-matches #"metabase\.task\.send-pulse\.trigger\.(\d+)\.(.*)" trigger-key)]
(-> schedule-str
(str/replace "_" " ")
u.cron/cron-string->schedule-map)))
(let [[_ pulse-id schedule-str] (re-matches #"metabase\.task\.send-pulse\.trigger\.(\d+)\.(.*)" trigger-key)]
{:pulse-id (parse-long pulse-id)
:schedule-map (-> schedule-str
(str/replace "_" " ")
u.cron/cron-string->schedule-map)}))

(defn- send-pulse!
[pulse-id channel-ids]
Expand All @@ -56,16 +60,24 @@
(catch Throwable e
(log/errorf e "Error sending Pulse %d to channel ids: %s" pulse-id (str/join ", " channel-ids)))))

(defn- send-trigger-timezone
[]
(or (driver/report-timezone)
(qp.timezone/system-timezone-id)
"UTC"))
Comment on lines +63 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it could be DRY'd up with other parts of the codebase outside of pulses. It doesn't feel right to have a notion of the trigger time zone specific to send pulses task.

Also, before we were just using (driver/report-timezone) without (qp.timezone/system-timezone-id), can you explain the change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the fallback timezone id we used in QP so I think we should apply it for pulse too:

(if use-report-timezone-id-if-unsupported?

It's also used in other tasks like persist refresh : https://github.com/metabase/metabase/blob/send-pulse-trigger-respect-timezone/src/metabase/task/persist_refresh.clj#L286

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I'm suggesting to DRY this up then. Otherwise they can get out of sync. Instead of duplicating this logic again, update-send-pulse-triggers-timezone!, and the cron-schedule function should use the same function that contains the logic for getting the reporting time zone ID. Why not use results-timezone-id from the QP code?

Copy link
Contributor Author

@qnkhuat qnkhuat May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

results-timezone-id requires a DB. In case of pulse a dashboard can contain questions from more than 1 DB.

Wdyt if we move this to metabase.task or a new metabase.task.util?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind where it goes too much. But I'm seeing a couple more places this could be DRYed up too:

(defn- schedule-timezone
[]
(or (driver/report-timezone) "UTC"))

(cron/in-time-zone (TimeZone/getTimeZone (or (driver/report-timezone)
(qp.timezone/system-timezone-id)
"UTC")))


(mu/defn ^:private send-pulse-trigger
"Build a Quartz trigger to send a pulse to a list of channel-ids."
^CronTrigger
([pulse-id :- pos-int?
schedule-map :- u.cron/ScheduleMap
pc-ids :- [:set pos-int?]]
(send-pulse-trigger pulse-id schedule-map pc-ids 6))
pc-ids :- [:set pos-int?]
timezone :- :string]
(send-pulse-trigger pulse-id schedule-map pc-ids timezone 6))
([pulse-id :- pos-int?
schedule-map :- u.cron/ScheduleMap
pc-ids :- [:set pos-int?]
timezone :- :string
priority :- pos-int?]
(triggers/build
(triggers/with-identity (send-pulse-trigger-key pulse-id schedule-map))
Expand All @@ -75,6 +87,7 @@
(triggers/with-schedule
(cron/schedule
(cron/cron-schedule (u.cron/schedule-map->cron-string schedule-map))
(cron/in-time-zone (TimeZone/getTimeZone ^String timezone))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the fix

;; if we miss a sync for one reason or another (such as system being down) do not try to run the sync again.
;; Just wait until the next sync cycle.
;;
Expand Down Expand Up @@ -129,21 +142,39 @@
priority (ms-duration->priority (- end start))]
(when (= :done result)
(log/infof "Updating priority of trigger %s to %d" (.getName ^TriggerKey (send-pulse-trigger-key pulse-id schedule-map)) priority)
(task/reschedule-trigger! (send-pulse-trigger pulse-id schedule-map channel-ids priority))))
(task/reschedule-trigger! (send-pulse-trigger pulse-id schedule-map channel-ids (send-trigger-timezone) priority))))
(log/infof "Skip sending pulse %d because all channels have no recipients" pulse-id))))

;; called in [driver/report-timezone] setter
(defn update-send-pulse-triggers-timezone!
"Update the timezone of all SendPulse triggers if the report timezone changes."
[]
(let [triggers (-> send-pulse-job-key task/job-info :triggers)
new-timezone (send-trigger-timezone)]
(doseq [trigger triggers
:when (not= new-timezone (:timezone trigger))] ; skip if timezone is the same
(let [trigger-key (:key trigger)
channel-ids (get-in trigger [:data "channel-ids"])
{:keys [pulse-id
schedule-map]} (send-pulse-trigger-key->info trigger-key)]
(log/infof "Updating timezone of trigger %s to %s. Was: %s" trigger-key new-timezone (:timezone trigger))
(task/reschedule-trigger! (send-pulse-trigger pulse-id schedule-map channel-ids new-timezone (:priority trigger)))))))

(jobs/defjob ^{:doc "Triggers that send a pulse to a list of channels at a specific time"}
SendPulse
[context]
(let [{:strs [pulse-id channel-ids]} (qc/from-job-data context)
trigger-key (.. context getTrigger getKey getName)]
(send-pulse!* (send-pulse-trigger-key->schedule-map trigger-key) pulse-id channel-ids)))
(send-pulse!* (:schedule-map (send-pulse-trigger-key->info trigger-key)) pulse-id channel-ids)))

;;; ------------------------------------------------ Job: InitSendPulseTriggers ----------------------------------------------------

(declare update-send-pulse-trigger-if-needed!)

(defn- init-send-pulse-triggers!
(defn init-send-pulse-triggers!
"Update send pulse triggers for all active pulses.
Called once when Metabase starts up to create triggers for all existing PulseChannels
and whenever the report timezone changes."
[]
(let [trigger-slot->pc-ids (as-> (t2/select :model/PulseChannel :enabled true) results
(group-by #(select-keys % [:pulse_id :schedule_type :schedule_day :schedule_hour :schedule_frame]) results)
Expand Down Expand Up @@ -215,7 +246,7 @@
(do
(log/infof "Updating Send Pulse trigger %s for pulse %d with new pc-ids: %s, was: %s " trigger-key pulse-id new-pc-ids existing-pc-ids)
(task/delete-trigger! trigger-key)
(task/add-trigger! (send-pulse-trigger pulse-id schedule-map new-pc-ids))))))
(task/add-trigger! (send-pulse-trigger pulse-id schedule-map new-pc-ids (send-trigger-timezone)))))))

;;; -------------------------------------------------- Task init ------------------------------------------------

Expand Down
5 changes: 2 additions & 3 deletions test/metabase/models/pulse_channel_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,10 @@
:schedule_frame nil})

(defn send-pulse-triggers
[pulse-id]
[pulse-id & {:keys [additional-keys]}]
(->> (task/job-info @#'task.send-pulses/send-pulse-job-key)
:triggers
(map #(select-keys % [:key :schedule :data :priority]))
(map #(update % :data (fn [data] (into {} data))))
(map #(select-keys % (concat [:key :schedule :data :priority] additional-keys)))
(filter #(or (nil? pulse-id) (= pulse-id (get-in % [:data "pulse-id"]))))
set))

Expand Down
72 changes: 72 additions & 0 deletions test/metabase/task/send_pulses_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
(:require
[clojure.set :as set]
[clojure.test :refer :all]
[java-time.api :as t]
[metabase.driver :as driver]
[metabase.models.pulse :refer [Pulse]]
[metabase.models.pulse-channel :refer [PulseChannel]]
[metabase.models.pulse-channel-recipient :refer [PulseChannelRecipient]]
Expand Down Expand Up @@ -184,3 +186,73 @@
(Thread/sleep 3000)
(testing "make sure that all channels will be sent even though number of jobs exceed the thread pool"
(is (= (set pc-ids) @sent-channel-ids))))))))))

(def ^:private daily-at-8am
{:schedule_type "daily"
:schedule_hour 8
:schedule_day nil
:schedule_frame nil})

(defn- send-pusle-triggers-next-fire-time
[pulse-id]
(first (map :next-fire-time (pulse-channel-test/send-pulse-triggers pulse-id :additional-keys [:next-fire-time]))))

(defn next-fire-hour
[expected-hour]
(let [now (t/offset-date-time (t/zone-offset 0))
next-day? (>= (.getHour now) expected-hour)]
(cond-> (t/offset-date-time (.getYear now)
(.. now getMonth getValue)
(.getDayOfMonth now)
expected-hour
0 0 0 (t/zone-offset 0))
;; if the current hour is greater than the expected hour, then it should be fired tomorrow
next-day? (t/plus (t/days 1))
true t/java-date)))

(deftest send-pulse-trigger-respect-report-timezone-test
(pulse-channel-test/with-send-pulse-setup!
(mt/with-temporary-setting-values [report-timezone "Asia/Ho_Chi_Minh" #_utc+7]
(mt/with-temp
[:model/Pulse {pulse :id} {}
:model/PulseChannel {_pc :id} (merge
{:pulse_id pulse
:channel_type :slack
:details {:channel "#random"}}
daily-at-8am)]
;; if its want to be fired at 8 am utc+7, then it should be fired at 1am utc
(is (= (next-fire-hour 1)
(send-pusle-triggers-next-fire-time pulse))))))

(mt/with-temporary-setting-values [report-timezone "UTC"]
(mt/with-temp
[:model/Pulse {pulse :id} {}
:model/PulseChannel {_pc :id} (merge
{:pulse_id pulse
:channel_type :slack
:details {:channel "#random"}}
daily-at-8am)]
(is (= (next-fire-hour 8)
(send-pusle-triggers-next-fire-time pulse))))))

(deftest change-report-timezone-will-update-triggers-timezone-test
(pulse-channel-test/with-send-pulse-setup!
(mt/discard-setting-changes [report-timezone]
(mt/with-temp
[:model/Pulse {pulse :id} {}
:model/PulseChannel {pc :id} (merge
{:pulse_id pulse
:channel_type :slack
:details {:channel "#random"}}
daily-at-8am)]
(testing "Sanity check"
(is (= #{(assoc (pulse-channel-test/pulse->trigger-info pulse daily-at-8am #{pc})
:timezone "UTC"
:next-fire-time (next-fire-hour 8))}
(pulse-channel-test/send-pulse-triggers pulse :additional-keys [:next-fire-time :timezone]))))
(driver/report-timezone! "Asia/Ho_Chi_Minh")
(testing "changing report timezone will change the timezone and fire time of the trigger"
(is (= #{(assoc (pulse-channel-test/pulse->trigger-info pulse daily-at-8am #{pc})
:timezone "Asia/Ho_Chi_Minh"
:next-fire-time (next-fire-hour 1))}
(pulse-channel-test/send-pulse-triggers pulse :additional-keys [:next-fire-time :timezone]))))))))