diff --git a/src/metabase/driver.clj b/src/metabase/driver.clj index d1a69c7faed35..a9b440dd50c5b 100644 --- a/src/metabase/driver.clj +++ b/src/metabase/driver.clj @@ -51,6 +51,11 @@ timezone-id (str (t/zone-id)))) +(defn- update-send-pulse-triggers-timezone! + [] + (classloader/require 'metabase.task.send-pulses) + ((resolve 'metabase.task.send-pulses/update-send-pulse-triggers-timezone!))) + (defsetting report-timezone (deferred-tru "Connection timezone to use when executing queries. Defaults to system timezone.") :visibility :settings-manager @@ -59,7 +64,8 @@ :setter (fn [new-value] (setting/set-value-of-type! :string :report-timezone new-value) - (notify-all-databases-updated))) + (notify-all-databases-updated) + (update-send-pulse-triggers-timezone!))) (defsetting report-timezone-short "Current report timezone abbreviation" diff --git a/src/metabase/task.clj b/src/metabase/task.clj index 839fbe7b72d85..1a3d12a5f5c94 100644 --- a/src/metabase/task.clj +++ b/src/metabase/task.clj @@ -265,7 +265,7 @@ :priority (.getPriority trigger) :start-time (.getStartTime trigger) :may-fire-again? (.mayFireAgain trigger) - :data (.getJobDataMap trigger)}) + :data (into {} (.getJobDataMap trigger))}) (defmethod trigger->info CronTrigger [^CronTrigger trigger] @@ -274,6 +274,9 @@ :schedule (.getCronExpression trigger) + :timezone + (.getID (.getTimeZone trigger)) + :misfire-instruction ;; not 100% sure why `case` doesn't work here... (condp = (.getMisfireInstruction trigger) diff --git a/src/metabase/task/persist_refresh.clj b/src/metabase/task/persist_refresh.clj index e85f41dfab6be..bb0466c69800f 100644 --- a/src/metabase/task/persist_refresh.clj +++ b/src/metabase/task/persist_refresh.clj @@ -375,7 +375,7 @@ (some->> refresh-job-key task/job-info :triggers - (m/index-by (comp #(get % "db-id") qc/from-job-data :data)))) + (m/index-by (comp #(get % "db-id") :data)))) (defn unschedule-persistence-for-database! "Stop refreshing tables for a given database. Should only be called when marking the database as not diff --git a/src/metabase/task/send_pulses.clj b/src/metabase/task/send_pulses.clj index cd508eadd21d2..63fbb7200ab6e 100644 --- a/src/metabase/task/send_pulses.clj +++ b/src/metabase/task/send_pulses.clj @@ -11,15 +11,18 @@ [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.cron :as cron] [clojurewerkz.quartzite.triggers :as triggers] + [metabase.driver :as driver] [metabase.models.pulse :as pulse] [metabase.models.task-history :as task-history] [metabase.pulse] + [metabase.query-processor.timezone :as qp.timezone] [metabase.task :as task] [metabase.util.cron :as u.cron] [metabase.util.log :as log] [metabase.util.malli :as mu] [toucan2.core :as t2]) (:import + (java.util TimeZone) (org.quartz CronTrigger TriggerKey))) (set! *warn-on-reflection* true) @@ -35,12 +38,13 @@ u.cron/schedule-map->cron-string (str/replace " " "_"))))) -(defn- send-pulse-trigger-key->schedule-map +(defn- send-pulse-trigger-key->info [trigger-key] - (let [[_ _pulse-id schedule-str] (re-matches #"metabase\.task\.send-pulse\.trigger\.(\d+)\.(.*)" trigger-key)] - (-> schedule-str - (str/replace "_" " ") - u.cron/cron-string->schedule-map))) + (let [[_ pulse-id schedule-str] (re-matches #"metabase\.task\.send-pulse\.trigger\.(\d+)\.(.*)" trigger-key)] + {:pulse-id (parse-long pulse-id) + :schedule-map (-> schedule-str + (str/replace "_" " ") + u.cron/cron-string->schedule-map)})) (defn- send-pulse! [pulse-id channel-ids] @@ -56,16 +60,24 @@ (catch Throwable e (log/errorf e "Error sending Pulse %d to channel ids: %s" pulse-id (str/join ", " channel-ids))))) +(defn- send-trigger-timezone + [] + (or (driver/report-timezone) + (qp.timezone/system-timezone-id) + "UTC")) + (mu/defn ^:private send-pulse-trigger "Build a Quartz trigger to send a pulse to a list of channel-ids." ^CronTrigger ([pulse-id :- pos-int? schedule-map :- u.cron/ScheduleMap - pc-ids :- [:set pos-int?]] - (send-pulse-trigger pulse-id schedule-map pc-ids 6)) + pc-ids :- [:set pos-int?] + timezone :- :string] + (send-pulse-trigger pulse-id schedule-map pc-ids timezone 6)) ([pulse-id :- pos-int? schedule-map :- u.cron/ScheduleMap pc-ids :- [:set pos-int?] + timezone :- :string priority :- pos-int?] (triggers/build (triggers/with-identity (send-pulse-trigger-key pulse-id schedule-map)) @@ -75,6 +87,7 @@ (triggers/with-schedule (cron/schedule (cron/cron-schedule (u.cron/schedule-map->cron-string schedule-map)) + (cron/in-time-zone (TimeZone/getTimeZone ^String timezone)) ;; if we miss a sync for one reason or another (such as system being down) do not try to run the sync again. ;; Just wait until the next sync cycle. ;; @@ -129,21 +142,39 @@ priority (ms-duration->priority (- end start))] (when (= :done result) (log/infof "Updating priority of trigger %s to %d" (.getName ^TriggerKey (send-pulse-trigger-key pulse-id schedule-map)) priority) - (task/reschedule-trigger! (send-pulse-trigger pulse-id schedule-map channel-ids priority)))) + (task/reschedule-trigger! (send-pulse-trigger pulse-id schedule-map channel-ids (send-trigger-timezone) priority)))) (log/infof "Skip sending pulse %d because all channels have no recipients" pulse-id)))) +;; called in [driver/report-timezone] setter +(defn update-send-pulse-triggers-timezone! + "Update the timezone of all SendPulse triggers if the report timezone changes." + [] + (let [triggers (-> send-pulse-job-key task/job-info :triggers) + new-timezone (send-trigger-timezone)] + (doseq [trigger triggers + :when (not= new-timezone (:timezone trigger))] ; skip if timezone is the same + (let [trigger-key (:key trigger) + channel-ids (get-in trigger [:data "channel-ids"]) + {:keys [pulse-id + schedule-map]} (send-pulse-trigger-key->info trigger-key)] + (log/infof "Updating timezone of trigger %s to %s. Was: %s" trigger-key new-timezone (:timezone trigger)) + (task/reschedule-trigger! (send-pulse-trigger pulse-id schedule-map channel-ids new-timezone (:priority trigger))))))) + (jobs/defjob ^{:doc "Triggers that send a pulse to a list of channels at a specific time"} SendPulse [context] (let [{:strs [pulse-id channel-ids]} (qc/from-job-data context) trigger-key (.. context getTrigger getKey getName)] - (send-pulse!* (send-pulse-trigger-key->schedule-map trigger-key) pulse-id channel-ids))) + (send-pulse!* (:schedule-map (send-pulse-trigger-key->info trigger-key)) pulse-id channel-ids))) ;;; ------------------------------------------------ Job: InitSendPulseTriggers ---------------------------------------------------- (declare update-send-pulse-trigger-if-needed!) -(defn- init-send-pulse-triggers! +(defn init-send-pulse-triggers! + "Update send pulse triggers for all active pulses. + Called once when Metabase starts up to create triggers for all existing PulseChannels + and whenever the report timezone changes." [] (let [trigger-slot->pc-ids (as-> (t2/select :model/PulseChannel {:select [:pc.*] @@ -227,7 +258,7 @@ (do (log/infof "Updating Send Pulse trigger %s for pulse %d with new pc-ids: %s, was: %s " trigger-key pulse-id new-pc-ids existing-pc-ids) (task/delete-trigger! trigger-key) - (task/add-trigger! (send-pulse-trigger pulse-id schedule-map new-pc-ids)))))) + (task/add-trigger! (send-pulse-trigger pulse-id schedule-map new-pc-ids (send-trigger-timezone))))))) ;;; -------------------------------------------------- Task init ------------------------------------------------ diff --git a/test/metabase/api/card_test.clj b/test/metabase/api/card_test.clj index d8b9d4afcbf89..1b1f80e016c1b 100644 --- a/test/metabase/api/card_test.clj +++ b/test/metabase/api/card_test.clj @@ -7,7 +7,6 @@ [clojure.string :as str] [clojure.test :refer :all] [clojure.tools.macro :as tools.macro] - [clojurewerkz.quartzite.conversion :as qc] [clojurewerkz.quartzite.scheduler :as qs] [dk.ative.docjure.spreadsheet :as spreadsheet] [medley.core :as m] @@ -2945,7 +2944,7 @@ (some->> (deref #'task.persist-refresh/refresh-job-key) task/job-info :triggers - (map (comp qc/from-job-data :data)) + (map :data) (filter (comp #{"individual"} #(get % "type"))) (map #(get % "persisted-id")) set)) diff --git a/test/metabase/models/model_index_test.clj b/test/metabase/models/model_index_test.clj index 9e7a1a417032d..fdd9131e6d642 100644 --- a/test/metabase/models/model_index_test.clj +++ b/test/metabase/models/model_index_test.clj @@ -2,7 +2,6 @@ (:require [clojure.set :as set] [clojure.test :refer :all] - [clojurewerkz.quartzite.conversion :as qc] [clojurewerkz.quartzite.scheduler :as qs] [malli.core :as mc] [malli.error :as me] @@ -105,7 +104,7 @@ (is (some? trigger) "Index trigger not found") (is (= (:schedule model-index) (:schedule trigger))) (is (= {"model-index-id" (:id model-index)} - (qc/from-job-data (:data trigger)))))) + (:data trigger))))) (testing "Deleting the model index removes the indexing task" (t2/delete! ModelIndex :id (:id model-index)) (is (nil? (index-trigger!)) "Index trigger not removed"))))))))) diff --git a/test/metabase/models/pulse_channel_test.clj b/test/metabase/models/pulse_channel_test.clj index 6a2ccb2a653f9..994007cce8bf3 100644 --- a/test/metabase/models/pulse_channel_test.clj +++ b/test/metabase/models/pulse_channel_test.clj @@ -422,11 +422,10 @@ :schedule_frame nil}) (defn send-pulse-triggers - [pulse-id] + [pulse-id & {:keys [additional-keys]}] (->> (task/job-info @#'task.send-pulses/send-pulse-job-key) :triggers - (map #(select-keys % [:key :schedule :data :priority])) - (map #(update % :data (fn [data] (into {} data)))) + (map #(select-keys % (concat [:key :schedule :data :priority] additional-keys))) (filter #(or (nil? pulse-id) (= pulse-id (get-in % [:data "pulse-id"])))) set)) diff --git a/test/metabase/task/send_pulses_test.clj b/test/metabase/task/send_pulses_test.clj index 3ff3d804def0f..c405bb7e09b9a 100644 --- a/test/metabase/task/send_pulses_test.clj +++ b/test/metabase/task/send_pulses_test.clj @@ -3,6 +3,8 @@ [clojure.set :as set] [clojure.test :refer :all] [clojurewerkz.quartzite.triggers :as triggers] + [java-time.api :as t] + [metabase.driver :as driver] [metabase.models.pulse :refer [Pulse]] [metabase.models.pulse-channel :refer [PulseChannel]] [metabase.models.pulse-channel-recipient :refer [PulseChannelRecipient]] @@ -213,3 +215,73 @@ (Thread/sleep 3000) (testing "make sure that all channels will be sent even though number of jobs exceed the thread pool" (is (= (set pc-ids) @sent-channel-ids)))))))))) + +(def ^:private daily-at-8am + {:schedule_type "daily" + :schedule_hour 8 + :schedule_day nil + :schedule_frame nil}) + +(defn- send-pusle-triggers-next-fire-time + [pulse-id] + (first (map :next-fire-time (pulse-channel-test/send-pulse-triggers pulse-id :additional-keys [:next-fire-time])))) + +(defn next-fire-hour + [expected-hour] + (let [now (t/offset-date-time (t/zone-offset 0)) + next-day? (>= (.getHour now) expected-hour)] + (cond-> (t/offset-date-time (.getYear now) + (.. now getMonth getValue) + (.getDayOfMonth now) + expected-hour + 0 0 0 (t/zone-offset 0)) + ;; if the current hour is greater than the expected hour, then it should be fired tomorrow + next-day? (t/plus (t/days 1)) + true t/java-date))) + +(deftest send-pulse-trigger-respect-report-timezone-test + (pulse-channel-test/with-send-pulse-setup! + (mt/with-temporary-setting-values [report-timezone "Asia/Ho_Chi_Minh" #_utc+7] + (mt/with-temp + [:model/Pulse {pulse :id} {} + :model/PulseChannel {_pc :id} (merge + {:pulse_id pulse + :channel_type :slack + :details {:channel "#random"}} + daily-at-8am)] + ;; if its want to be fired at 8 am utc+7, then it should be fired at 1am utc + (is (= (next-fire-hour 1) + (send-pusle-triggers-next-fire-time pulse))))) + + (mt/with-temporary-setting-values [report-timezone "UTC"] + (mt/with-temp + [:model/Pulse {pulse :id} {} + :model/PulseChannel {_pc :id} (merge + {:pulse_id pulse + :channel_type :slack + :details {:channel "#random"}} + daily-at-8am)] + (is (= (next-fire-hour 8) + (send-pusle-triggers-next-fire-time pulse))))))) + +(deftest change-report-timezone-will-update-triggers-timezone-test + (pulse-channel-test/with-send-pulse-setup! + (mt/discard-setting-changes [report-timezone] + (mt/with-temp + [:model/Pulse {pulse :id} {} + :model/PulseChannel {pc :id} (merge + {:pulse_id pulse + :channel_type :slack + :details {:channel "#random"}} + daily-at-8am)] + (testing "Sanity check" + (is (= #{(assoc (pulse-channel-test/pulse->trigger-info pulse daily-at-8am #{pc}) + :timezone "UTC" + :next-fire-time (next-fire-hour 8))} + (pulse-channel-test/send-pulse-triggers pulse :additional-keys [:next-fire-time :timezone])))) + (driver/report-timezone! "Asia/Ho_Chi_Minh") + (testing "changing report timezone will change the timezone and fire time of the trigger" + (is (= #{(assoc (pulse-channel-test/pulse->trigger-info pulse daily-at-8am #{pc}) + :timezone "Asia/Ho_Chi_Minh" + :next-fire-time (next-fire-hour 1))} + (pulse-channel-test/send-pulse-triggers pulse :additional-keys [:next-fire-time :timezone]))))))))