Skip to content

Commit

Permalink
Refactor Messenger interface.
Browse files Browse the repository at this point in the history
Remove `messenger.go` and move the interface definition to `manager`
and the `Message` struct to the `models` package, removing superflous
and redundant message structs used in multiple places.
  • Loading branch information
knadh committed May 8, 2023
1 parent 9ffc912 commit 917696a
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 108 deletions.
7 changes: 3 additions & 4 deletions cmd/init.go
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/knadh/listmonk/internal/media"
"github.com/knadh/listmonk/internal/media/providers/filesystem"
"github.com/knadh/listmonk/internal/media/providers/s3"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/internal/messenger/email"
"github.com/knadh/listmonk/internal/messenger/postback"
"github.com/knadh/listmonk/internal/subimporter"
Expand Down Expand Up @@ -484,7 +483,7 @@ func initImporter(q *models.Queries, db *sqlx.DB, app *App) *subimporter.Importe
}

// initSMTPMessenger initializes the SMTP messenger.
func initSMTPMessenger(m *manager.Manager) messenger.Messenger {
func initSMTPMessenger(m *manager.Manager) manager.Messenger {
var (
mapKeys = ko.MapKeys("smtp")
servers = make([]email.Server, 0, len(mapKeys))
Expand Down Expand Up @@ -526,13 +525,13 @@ func initSMTPMessenger(m *manager.Manager) messenger.Messenger {

// initPostbackMessengers initializes and returns all the enabled
// HTTP postback messenger backends.
func initPostbackMessengers(m *manager.Manager) []messenger.Messenger {
func initPostbackMessengers(m *manager.Manager) []manager.Messenger {
items := ko.Slices("messengers")
if len(items) == 0 {
return nil
}

var out []messenger.Messenger
var out []manager.Messenger
for _, item := range items {
if !item.Bool("enabled") {
continue
Expand Down
5 changes: 2 additions & 3 deletions cmd/main.go
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/internal/manager"
"github.com/knadh/listmonk/internal/media"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/internal/subimporter"
"github.com/knadh/listmonk/models"
"github.com/knadh/paginator"
Expand All @@ -43,7 +42,7 @@ type App struct {
constants *constants
manager *manager.Manager
importer *subimporter.Importer
messengers map[string]messenger.Messenger
messengers map[string]manager.Messenger
media media.Store
i18n *i18n.I18n
bounce *bounce.Manager
Expand Down Expand Up @@ -167,7 +166,7 @@ func main() {
db: db,
constants: initConstants(),
media: initMediaStore(),
messengers: make(map[string]messenger.Messenger),
messengers: make(map[string]manager.Messenger),
log: lo,
bufLog: bufLog,
captcha: initCaptcha(),
Expand Down
4 changes: 2 additions & 2 deletions cmd/notifications.go
Expand Up @@ -3,7 +3,7 @@ package main
import (
"bytes"

"github.com/knadh/listmonk/internal/manager"
"github.com/knadh/listmonk/models"
)

const (
Expand Down Expand Up @@ -32,7 +32,7 @@ func (app *App) sendNotification(toEmails []string, subject, tplName string, dat
return err
}

m := manager.Message{}
m := models.Message{}
m.ContentType = app.notifTpls.contentType
m.From = app.constants.FromEmail
m.To = toEmails
Expand Down
8 changes: 4 additions & 4 deletions cmd/public.go
Expand Up @@ -13,7 +13,7 @@ import (
"strings"

"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/internal/manager"
"github.com/knadh/listmonk/models"
"github.com/labstack/echo/v4"
"github.com/lib/pq"
Expand Down Expand Up @@ -566,17 +566,17 @@ func handleSelfExportSubscriberData(c echo.Context) error {

// Send the data as a JSON attachment to the subscriber.
const fname = "data.json"
if err := app.messengers[emailMsgr].Push(messenger.Message{
if err := app.messengers[emailMsgr].Push(models.Message{
ContentType: app.notifTpls.contentType,
From: app.constants.FromEmail,
To: []string{data.Email},
Subject: app.i18n.Ts("email.data.title"),
Body: msg.Bytes(),
Attachments: []messenger.Attachment{
Attachments: []models.Attachment{
{
Name: fname,
Content: b,
Header: messenger.MakeAttachmentHeader(fname, "base64"),
Header: manager.MakeAttachmentHeader(fname, "base64"),
},
},
}); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions cmd/settings.go
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/knadh/koanf/parsers/json"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/knadh/koanf/v2"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/internal/messenger/email"
"github.com/knadh/listmonk/models"
"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -250,7 +249,7 @@ func handleTestSMTPSettings(c echo.Context) error {
return err
}

m := messenger.Message{}
m := models.Message{}
m.ContentType = app.notifTpls.contentType
m.From = app.constants.FromEmail
m.To = []string{to}
Expand Down
9 changes: 4 additions & 5 deletions cmd/tx.go
Expand Up @@ -9,7 +9,6 @@ import (
"strings"

"github.com/knadh/listmonk/internal/manager"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/models"
"github.com/labstack/echo/v4"
)
Expand Down Expand Up @@ -56,9 +55,9 @@ func handleSendTxMessage(c echo.Context) error {
app.i18n.Ts("globals.messages.invalidFields", "name", fmt.Sprintf("file: %s", err.Error())))
}

m.Attachments = append(m.Attachments, models.TxAttachment{
m.Attachments = append(m.Attachments, models.Attachment{
Name: f.Filename,
Header: messenger.MakeAttachmentHeader(f.Filename, "base64"),
Header: manager.MakeAttachmentHeader(f.Filename, "base64"),
Content: b,
})
}
Expand Down Expand Up @@ -121,7 +120,7 @@ func handleSendTxMessage(c echo.Context) error {
}

// Prepare the final message.
msg := manager.Message{}
msg := models.Message{}
msg.Subscriber = sub
msg.To = []string{sub.Email}
msg.From = m.FromEmail
Expand All @@ -130,7 +129,7 @@ func handleSendTxMessage(c echo.Context) error {
msg.Messenger = m.Messenger
msg.Body = m.Body
for _, a := range m.Attachments {
msg.Attachments = append(msg.Attachments, messenger.Attachment{
msg.Attachments = append(msg.Attachments, models.Attachment{
Name: a.Name,
Header: a.Header,
Content: a.Content,
Expand Down
48 changes: 31 additions & 17 deletions internal/manager/manager.go
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/Masterminds/sprig/v3"
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/models"
"github.com/paulbellamy/ratecounter"
)
Expand All @@ -40,6 +39,15 @@ type Store interface {
DeleteSubscriber(id int64) error
}

// Messenger is an interface for a generic messaging backend,
// for instance, e-mail, SMS etc.
type Messenger interface {
Name() string
Push(models.Message) error
Flush() error
Close() error
}

// CampStats contains campaign stats like per minute send rate.
type CampStats struct {
SendRate int
Expand All @@ -51,7 +59,7 @@ type Manager struct {
cfg Config
store Store
i18n *i18n.I18n
messengers map[string]messenger.Messenger
messengers map[string]Messenger
notifCB models.AdminNotifCallback
logger *log.Logger

Expand All @@ -73,7 +81,7 @@ type Manager struct {
campMsgQueue chan CampaignMessage
campMsgErrorQueue chan msgError
campMsgErrorCounts map[int]int
msgQueue chan Message
msgQueue chan models.Message

// Sliding window keeps track of the total number of messages sent in a period
// and on reaching the specified limit, waits until the window is over before
Expand All @@ -98,14 +106,6 @@ type CampaignMessage struct {
unsubURL string
}

// Message represents a generic message to be pushed to a messenger.
type Message struct {
messenger.Message

// Messenger is the messenger backend to use: email|postback.
Messenger string
}

// Config has parameters for configuring the manager.
type Config struct {
// Number of subscribers to pull from the DB in a single iteration.
Expand Down Expand Up @@ -163,14 +163,14 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18
i18n: i,
notifCB: notifCB,
logger: l,
messengers: make(map[string]messenger.Messenger),
messengers: make(map[string]Messenger),
camps: make(map[int]*models.Campaign),
campRates: make(map[int]*ratecounter.RateCounter),
tpls: make(map[int]*models.Template),
links: make(map[string]string),
subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
msgQueue: make(chan Message, cfg.Concurrency),
msgQueue: make(chan models.Message, cfg.Concurrency),
campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
campMsgErrorCounts: make(map[int]int),
slidingWindowStart: time.Now(),
Expand Down Expand Up @@ -202,7 +202,7 @@ func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) (C
}

// AddMessenger adds a Messenger messaging backend to the manager.
func (m *Manager) AddMessenger(msg messenger.Messenger) error {
func (m *Manager) AddMessenger(msg Messenger) error {
id := msg.Name()
if _, ok := m.messengers[id]; ok {
return fmt.Errorf("messenger '%s' is already loaded", id)
Expand All @@ -213,7 +213,7 @@ func (m *Manager) AddMessenger(msg messenger.Messenger) error {

// PushMessage pushes an arbitrary non-campaign Message to be sent out by the workers.
// It times out if the queue is busy.
func (m *Manager) PushMessage(msg Message) error {
func (m *Manager) PushMessage(msg models.Message) error {
t := time.NewTicker(pushTimeout)
defer t.Stop()

Expand Down Expand Up @@ -355,7 +355,7 @@ func (m *Manager) worker() {
numMsg++

// Outgoing message.
out := messenger.Message{
out := models.Message{
From: msg.from,
To: []string{msg.to},
Subject: msg.subject,
Expand Down Expand Up @@ -410,7 +410,7 @@ func (m *Manager) worker() {
return
}

err := m.messengers[msg.Messenger].Push(msg.Message)
err := m.messengers[msg.Messenger].Push(msg)
if err != nil {
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
}
Expand Down Expand Up @@ -801,3 +801,17 @@ func (m *Manager) makeGnericFuncMap() template.FuncMap {

return f
}

// MakeAttachmentHeader is a helper function that returns a
// textproto.MIMEHeader tailored for attachments, primarily
// email. If no encoding is given, base64 is assumed.
func MakeAttachmentHeader(filename, encoding string) textproto.MIMEHeader {
if encoding == "" {
encoding = "base64"
}
h := textproto.MIMEHeader{}
h.Set("Content-Disposition", "attachment; filename="+filename)
h.Set("Content-Type", "application/json; name=\""+filename+"\"")
h.Set("Content-Transfer-Encoding", encoding)
return h
}
4 changes: 2 additions & 2 deletions internal/messenger/email/email.go
Expand Up @@ -7,7 +7,7 @@ import (
"net/smtp"
"net/textproto"

"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/models"
"github.com/knadh/smtppool"
)

Expand Down Expand Up @@ -92,7 +92,7 @@ func (e *Emailer) Name() string {
}

// Push pushes a message to the server.
func (e *Emailer) Push(m messenger.Message) error {
func (e *Emailer) Push(m models.Message) error {
// If there are more than one SMTP servers, send to a random
// one from the list.
var (
Expand Down
55 changes: 0 additions & 55 deletions internal/messenger/messenger.go

This file was deleted.

0 comments on commit 917696a

Please sign in to comment.