Skip to content

Commit

Permalink
feat: automatically remove queued messages with stale locks (#2872)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcooke committed Mar 12, 2024
1 parent 5d8213a commit d84152e
Show file tree
Hide file tree
Showing 14 changed files with 323 additions and 29 deletions.
2 changes: 1 addition & 1 deletion app/controllers/servers_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def destroy
end

def queue
@messages = @server.queued_messages.order(id: :desc).page(params[:page])
@messages = @server.queued_messages.order(id: :desc).page(params[:page]).includes(:ip_address)
@messages_with_message = @messages.include_message
end

Expand Down
4 changes: 2 additions & 2 deletions app/lib/message_dequeuer/initial_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def process
private

def check_message_exists
@queued_message.message
rescue Postal::MessageDB::Message::NotFound
return if @queued_message.message

log "unqueue because backend message has been removed."
remove_from_queue
stop_processing
Expand Down
4 changes: 2 additions & 2 deletions app/lib/message_dequeuer/single_message_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def process
private

def check_message_exists
queued_message.message
rescue Postal::MessageDB::Message::NotFound
return if queued_message.message

log "unqueueing because backend message has been removed"
remove_from_queue
stop_processing
Expand Down
3 changes: 2 additions & 1 deletion app/lib/worker/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class Process
ProcessMessageRetentionScheduledTask,
PruneSuppressionListsScheduledTask,
PruneWebhookRequestsScheduledTask,
SendNotificationsScheduledTask
SendNotificationsScheduledTask,
TidyQueuedMessagesTask
].freeze

# @param [Integer] thread_count The number of worker threads to run in this process
Expand Down
6 changes: 5 additions & 1 deletion app/models/concerns/has_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ def self.included(base)
end

def message
@message ||= server.message_db.message(message_id)
return @message if instance_variable_defined?("@message")

@message = server.message_db.message(message_id)
rescue Postal::MessageDB::Message::NotFound
@message = nil
end

def message=(message)
Expand Down
10 changes: 7 additions & 3 deletions app/models/queued_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ class QueuedMessage < ApplicationRecord

belongs_to :server
belongs_to :ip_address, optional: true
belongs_to :user, optional: true

before_create :allocate_ip_address

scope :ready_with_delayed_retry, -> { where("retry_after IS NULL OR retry_after < ?", 30.seconds.ago) }
scope :with_stale_lock, -> { where("locked_at IS NOT NULL AND locked_at < ?", Postal::Config.postal.queued_message_lock_stale_days.days.ago) }

def retry_now
update(retry_after: nil)
update!(retry_after: nil)
end

def send_bounce
Expand All @@ -50,7 +50,11 @@ def send_bounce
end

def allocate_ip_address
return unless Postal.ip_pools? && message && pool = server.ip_pool_for_message(message)
return unless Postal.ip_pools?
return if message.nil?

pool = server.ip_pool_for_message(message)
return if pool.nil?

self.ip_address = pool.ip_addresses.select_by_priority
end
Expand Down
18 changes: 18 additions & 0 deletions app/scheduled_tasks/tidy_queued_messages_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

class TidyQueuedMessagesTask < ApplicationScheduledTask

def call
QueuedMessage.with_stale_lock.in_batches do |messages|
messages.each do |message|
logger.info "removing queued message #{message.id} (locked at #{message.locked_at} by #{message.locked_by})"
message.destroy
end
end
end

def self.next_run_after
quarter_to_each_hour
end

end
64 changes: 45 additions & 19 deletions app/views/messages/_list.html.haml
Original file line number Diff line number Diff line change
@@ -1,24 +1,50 @@
%ul.messageList
- for message in messages

- if message.is_a?(QueuedMessage)
- queued_message = message
- message = message.message
%li.messageList__message
= link_to organization_server_message_path(organization, @server, message.id), :class => 'messageList__link' do
.messageList__details{:class => 'messageList__details--' + message.scope}
%p.messageList__subject= message.subject || "No subject"
%dl.messageList__addresses
%dt To
%dd
- if message.rcpt_to_return_path?
%span.returnPathTag Return Path
- else
= message.rcpt_to || "none"
%dt From
%dd= message.mail_from || "none"


- if message.nil? && queued_message
%li.messageList__message
.messageList__link
.messageList__details
%p.messageList__subject Deleted message ##{queued_message.message_id}
%dl.messageList__addresses
%dt Domain
%dd= queued_message.domain
%dt Locked
%dd= queued_message.locked? ? "Yes" : "No"
.messageList__meta
%p.messageList__timestamp= queued_message.created_at.in_time_zone.to_fs(:long)
%p.messageList__status
%span.label{:class => "label--messageStatus-deleted"} Deleted

.messageList__meta
%p.messageList__timestamp= message.timestamp.in_time_zone.to_fs(:long)
%p.messageList__status
- if message.read?
%span.label.label--purple Opened
%span.label{:class => "label--messageStatus-#{message.status.underscore}"}= message.status.underscore.humanize

- else
%li.messageList__message
= link_to organization_server_message_path(organization, @server, message.id), :class => 'messageList__link' do
.messageList__details{:class => 'messageList__details--' + message.scope}
%p.messageList__subject= message.subject || "No subject"
%dl.messageList__addresses
%dt To
%dd
- if message.rcpt_to_return_path?
%span.returnPathTag Return Path
- else
= message.rcpt_to || "none"
%dt From
%dd= message.mail_from || "none"
- if queued_message
%dt Attempts
%dd= queued_message.attempts
%dt Retry after
%dd= queued_message.retry_after&.to_fs(:short) || "ASAP"

.messageList__meta
%p.messageList__timestamp= message.timestamp.in_time_zone.to_fs(:long)
%p.messageList__status
- if message.read?
%span.label.label--purple Opened
%span.label{:class => "label--messageStatus-#{message.status.underscore}"}= message.status.underscore.humanize
1 change: 1 addition & 0 deletions doc/config/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This document contains all the environment variables which are available for thi
| `POSTAL_SIGNING_KEY_PATH` | String | Path to the private key used for signing | $config-file-root/signing.key |
| `POSTAL_SMTP_RELAYS` | Array of strings | An array of SMTP relays in the format of smtp://host:port | [] |
| `POSTAL_TRUSTED_PROXIES` | Array of strings | An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses) | [] |
| `POSTAL_QUEUED_MESSAGE_LOCK_STALE_DAYS` | Integer | The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried. | 1 |
| `WEB_SERVER_DEFAULT_PORT` | Integer | The default port the web server should listen on unless overriden by the PORT environment variable | 5000 |
| `WEB_SERVER_DEFAULT_BIND_ADDRESS` | String | The default bind address the web server should listen on unless overriden by the BIND_ADDRESS environment variable | 127.0.0.1 |
| `WEB_SERVER_MAX_THREADS` | Integer | The maximum number of threads which can be used by the web server | 5 |
Expand Down
2 changes: 2 additions & 0 deletions doc/config/yaml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ postal:
smtp_relays: []
# An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses)
trusted_proxies: []
# The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried.
queued_message_lock_stale_days: 1

web_server:
# The default port the web server should listen on unless overriden by the PORT environment variable
Expand Down
5 changes: 5 additions & 0 deletions lib/postal/config_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ module Postal
description "An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses)"
transform { |ip| IPAddr.new(ip) }
end

integer :queued_message_lock_stale_days do
description "The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried."
default 1
end
end

group :web_server do
Expand Down
6 changes: 6 additions & 0 deletions spec/factories/ip_pool_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,11 @@
factory :ip_pool do
name { "Default Pool" }
default { true }

trait :with_ip_address do
after(:create) do |ip_pool|
ip_pool.ip_addresses << create(:ip_address, ip_pool: ip_pool)
end
end
end
end

0 comments on commit d84152e

Please sign in to comment.