Skip to content
This repository was archived by the owner on May 16, 2022. It is now read-only.

Commit 7ec4f79

Browse files
committed
fix: rapid-picker-sender-spam, add: allowlist@mailto, cleanup continuation
1 parent c679548 commit 7ec4f79

File tree

11 files changed

+138
-64
lines changed

11 files changed

+138
-64
lines changed

cmd/gobler/conmon.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ type conMon struct {
1919
pickerT time.Duration
2020
pickSendBufLen int
2121
numSenders int
22+
maxMsgPU int
2223
}
2324

2425
const (
2526
monitorTdefault = 10
2627
pickerTdefault = 2
2728
psBufLenDefault = 1
2829
numSendersDefault = 1
30+
maxMsgPUDefault = 10
2931
)
3032

3133
// getConfTime converts config string to time.Duration value.
@@ -50,7 +52,7 @@ func getConfTime(e string) (time.Duration, error) {
5052
}
5153
}
5254

53-
func NewConMon(con string, conCfg map[string]string) (*conMon, error) {
55+
func NewConMon(con string, conCfg map[string]string, l *log.Logger) (*conMon, error) {
5456
var (
5557
cm conMon
5658
err error
@@ -63,16 +65,26 @@ func NewConMon(con string, conCfg map[string]string) (*conMon, error) {
6365
if err != nil {
6466
// return nil, errors.New("psBufLen is not integer")
6567
// todo: no need to be so agressive, let's do default... or should we abort so the user knows he made a mistake?
66-
psbl = psBufLenDefault
68+
cm.pickSendBufLen = psBufLenDefault
69+
} else {
70+
cm.pickSendBufLen = psbl
6771
}
68-
cm.pickSendBufLen = psbl
6972

7073
ns, err := strconv.Atoi(conCfg["numSenders"])
7174
if err != nil {
7275
//return nil, errors.New("numSenders is not integer")
73-
cm.numSenders = psBufLenDefault
76+
cm.numSenders = numSendersDefault
77+
} else {
78+
cm.numSenders = ns
79+
}
80+
81+
mpu, err := strconv.Atoi(conCfg["maxMsgPU"])
82+
if err != nil {
83+
//return nil, errors.New("maxNewMsgPU is not integer")
84+
cm.maxMsgPU = maxMsgPUDefault
85+
} else {
86+
cm.maxMsgPU = mpu
7487
}
75-
cm.numSenders = ns
7688

7789
// if monitorT is specified...
7890
if e, ok := conCfg["monitorT"]; ok {
@@ -95,7 +107,7 @@ func NewConMon(con string, conCfg map[string]string) (*conMon, error) {
95107
// nothing specified in config, use default seconds
96108
cm.pickerT = time.Duration(pickerTdefault) * time.Second
97109
}
98-
110+
l.Printf("CM setup: %#v\n", cm)
99111
return &cm, nil
100112
}
101113

@@ -106,7 +118,7 @@ func (cm *conMon) SpinUp(conns connectors.Connectors, wg *sync.WaitGroup, log *l
106118
psChan := make(chan *spool.FileGob, cm.pickSendBufLen)
107119
psChanFailed := make(chan *spool.FileGob, cm.pickSendBufLen)
108120

109-
mon, err := NewMonitor(cm.conn, cm.spoolDir, cm.monitorT)
121+
mon, err := NewMonitor(cm.conn, cm.spoolDir, cm.monitorT, cm.maxMsgPU)
110122
if err != nil {
111123
log.Printf("Monitor %s inst FAILED\n", cm.conn)
112124
} else {
@@ -115,7 +127,7 @@ func (cm *conMon) SpinUp(conns connectors.Connectors, wg *sync.WaitGroup, log *l
115127
go mon.MonitorWorker(mpChan, wg, log)
116128
}
117129

118-
pickr, err := NewPicker(cm.conn, cm.pickerT)
130+
pickr, err := NewPicker(cm.conn, cm.spoolDir, cm.pickerT, cm.maxMsgPU)
119131
if err != nil {
120132
log.Printf("Picker %s inst FAILED\n", cm.conn)
121133
} else {

cmd/gobler/gobler.conf

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
"adaptiveCardTemplate": "/etc/slurm/adaptive_card_template.json",
1010
"url": "http://localhost:9999/",
1111
"useLookup": "GECOS",
12-
"monitorT": "10000ms",
13-
"pickerT": "300ms",
12+
"monitorT": "20000ms",
13+
"pickerT": "5000ms",
1414
"psBufLen": "3",
15-
"numSenders": "1"
15+
"numSenders": "1",
16+
"maxMsgPU": "5"
1617
},
1718
"mailto": {
1819
"name": "original slurm mail functionality, extended.",
@@ -22,8 +23,8 @@
2223
"allowList": "pja",
2324
"blockList": "",
2425
"spoolDir": "/tmp/mailspool",
25-
"monitorT": "20",
26-
"pickerT": "5"
26+
"monitorT": "200",
27+
"pickerT": "60"
2728
},
2829
"textfile": {
2930
"path": "/tmp"

cmd/gobler/gobler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func main() {
5959
if ok {
6060
log.Printf("MAIN: %s spoolDir exists: %s - %s\n", con, cfg.Connectors[con]["spoolDir"], spd)
6161

62-
cm, err := NewConMon(con, cfg.Connectors[con])
62+
cm, err := NewConMon(con, cfg.Connectors[con], log)
6363
if err != nil {
6464
log.Printf("MAIN: NewConMon(%s) failed with: %s\n", con, err)
6565
log.Printf("MAIN: skipping %s...\n", con)

cmd/gobler/monitor.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,49 @@ type monitor struct {
1313
connector string
1414
spoolDir string
1515
monitorT time.Duration
16+
maxMsgPU int
1617
}
1718

1819
// NewMonitor creates and initializes a new monitor object with:
19-
// c connector name, s spooldir location (from config file) and t polling time period.
20-
func NewMonitor(c string, s string, t time.Duration) (*monitor, error) {
20+
// c connector name, s spooldir location (from config file), t polling time period and mpu is MaximumMessagesPerUser (from conf.)
21+
func NewMonitor(c string, s string, t time.Duration, mpu int) (*monitor, error) {
2122
var m monitor
2223

2324
if s != "" {
2425
m.connector = c
2526
m.spoolDir = s
2627
m.monitorT = t
28+
m.maxMsgPU = mpu
2729
} else {
2830
return nil, errors.New("no spooldir, aborting")
2931
}
3032

3133
return &m, nil
3234
}
3335

36+
// if trimming logic works ok from picker, remove this and also remove maxMsgPU from monitor struct...
37+
//
38+
//func (m *monitor) trimExcessiveMsgs(newFiles *spool.SpooledGobs, mpu int, l *log.Logger) error {
39+
// uc := make(map[string]int)
40+
//
41+
// for f, fg := range *newFiles {
42+
// uc[fg.User]++
43+
// if uc[fg.User] > mpu {
44+
// lock.Lock()
45+
// err := os.Remove(m.spoolDir + "/" + fg.Filename)
46+
// if err != nil {
47+
// l.Printf("MONITOR %s: error removing file %s\n", m.connector, err)
48+
// }
49+
// lock.Unlock()
50+
// l.Printf("MONITOR %s: Gob %s deleted\n", m.connector, f)
51+
// uc[fg.User]--
52+
// delete(*newFiles, f)
53+
// }
54+
// }
55+
// l.Printf("UserCount map == %#v\n", uc)
56+
// return nil
57+
//}
58+
3459
func (m *monitor) MonitorWorker(ch chan<- *spool.SpooledGobs, wg *sync.WaitGroup, l *log.Logger) error {
3560

3661
var oldList, newList, newFiles *spool.SpooledGobs
@@ -49,7 +74,7 @@ func (m *monitor) MonitorWorker(ch chan<- *spool.SpooledGobs, wg *sync.WaitGroup
4974
for {
5075
lock.Lock()
5176
// get new list of files
52-
newList, err = sp.GetSpooledGobsList()
77+
newList, err = sp.GetSpooledGobsList(l)
5378
lock.Unlock()
5479
if err != nil {
5580
l.Printf("MONITOR %s: Failed on Getspooledgobslist(), error %s\n", m.connector, err)
@@ -64,12 +89,14 @@ func (m *monitor) MonitorWorker(ch chan<- *spool.SpooledGobs, wg *sync.WaitGroup
6489
// exists in old, do nothing
6590
}
6691
}
67-
l.Printf("MONITOR %s: Sending newFiles list: %#v\n", m.connector, newFiles)
92+
93+
// todo: decide if here we do the purge of newFiles for messages above maxMsgPU, or in picker?
94+
//m.trimExcessiveMsgs(newFiles, m.maxMsgPU, l)
95+
6896
// send new-found files
97+
l.Printf("MONITOR %s: Sent %d gobs\n", m.connector, len(*newFiles))
6998
ch <- newFiles
70-
// oldlist=newlist
7199
oldList = newList
72-
// empty newfiles for the next iteration
73100
newFiles = &spool.SpooledGobs{}
74101

75102
<-ticker

cmd/gobler/picker.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"errors"
55
"log"
6+
"os"
67
"sync"
78
"time"
89

@@ -12,18 +13,22 @@ import (
1213
// picker holds connector name string, msgcount map with {username:SpooledGobsCount} and pickerT polling period
1314
type picker struct {
1415
connector string
16+
spoolDir string
1517
msgcount map[string]int // map holding {username:SpooledGobsCount}
1618
pickerT time.Duration
19+
maxMsgPU int
1720
}
1821

1922
// NewPicker creates and initializes a new picker object with:
2023
// c connector name and t polling time period parameters.
21-
func NewPicker(c string, t time.Duration) (*picker, error) {
24+
func NewPicker(c string, s string, t time.Duration, mpu int) (*picker, error) {
2225
var p picker
2326

2427
p.connector = c
28+
p.spoolDir = s
2529
p.msgcount = map[string]int{}
2630
p.pickerT = t
31+
p.maxMsgPU = mpu
2732

2833
return &p, nil
2934
}
@@ -45,6 +50,24 @@ func (p *picker) PickNext(allGobs *spool.SpooledGobs) (*spool.FileGob, error) {
4550
return &nextgob, nil
4651
}
4752

53+
func (p *picker) trimExcessiveMsgs(allGobs *spool.SpooledGobs, mpu int, l *log.Logger) error {
54+
for f, fg := range *allGobs {
55+
if p.msgcount[fg.User] > mpu {
56+
lock.Lock()
57+
err := os.Remove(p.spoolDir + "/" + fg.Filename)
58+
if err != nil {
59+
l.Printf("PICKER %s: error removing file %s\n", p.connector, err)
60+
} else {
61+
l.Printf("PICKER %s: Gob %s deleted\n", p.connector, f)
62+
p.msgcount[fg.User]--
63+
delete(*allGobs, f)
64+
}
65+
lock.Unlock()
66+
}
67+
}
68+
return nil
69+
}
70+
4871
func (p *picker) PickerWorker(mpCh <-chan *spool.SpooledGobs, psCh chan<- *spool.FileGob, psfCh <-chan *spool.FileGob, wg *sync.WaitGroup, l *log.Logger) error {
4972

5073
var newgobs *spool.SpooledGobs
@@ -58,33 +81,34 @@ func (p *picker) PickerWorker(mpCh <-chan *spool.SpooledGobs, psCh chan<- *spool
5881
l.Printf("PICKER %s: Users msg count %v\n", p.connector, p.msgcount)
5982
select {
6083
case newgobs = <-mpCh:
61-
l.Printf("PICKER %s: Received gobs %#v\n", p.connector, newgobs)
84+
l.Printf("PICKER %s: Received %d gobs.\n", p.connector, len(*newgobs))
6285
// iterate and increase the counter
6386
for k, v := range *newgobs {
6487
p.msgcount[v.User]++
6588
// append newgobs to allgobs
6689
allGobs[k] = v
6790
}
91+
// todo: or call trimExcessiveMsgs here?
92+
p.trimExcessiveMsgs(&allGobs, p.maxMsgPU, l)
6893
case failedGob := <-psfCh:
6994
l.Printf("PICKER %s: Received FAILED gob %#v\n", p.connector, failedGob)
7095
// return to allGobs
7196
allGobs[failedGob.Filename] = *failedGob
7297
p.msgcount[failedGob.User]++
7398
case <-ticker:
74-
l.Printf("PICKER %s: Chan status: mpCh = %d msgs psCh = %d / %d msgs, psfCh = %d / %d msgs \n", p.connector, len(mpCh), len(psCh), cap(psCh), len(psfCh), cap(psfCh))
75-
}
76-
if len(psCh) < cap(psCh) {
77-
//l.Printf("PICKER %s: allGobs: %#v\n", p.connector, allGobs)
78-
// HERE, call the Pick() and Send()
79-
nextGob, err := p.PickNext(&allGobs)
80-
if err == nil {
81-
l.Printf("PICKER %s: SEND to Sender: %#v\n", p.connector, nextGob)
82-
p.msgcount[nextGob.User]--
83-
psCh <- nextGob
84-
delete(allGobs, nextGob.Filename)
99+
l.Printf("PICKER %s on TICK: Chan status: mpCh = %d msgs psCh = %d / %d msgs, psfCh = %d / %d msgs \n", p.connector, len(mpCh), len(psCh), cap(psCh), len(psfCh), cap(psfCh))
100+
if len(psCh) < cap(psCh) {
101+
// HERE, call the Pick() and Send()
102+
nextGob, err := p.PickNext(&allGobs)
103+
if err == nil {
104+
l.Printf("PICKER %s: SEND to Sender: %#v\n", p.connector, nextGob)
105+
p.msgcount[nextGob.User]--
106+
psCh <- nextGob
107+
delete(allGobs, nextGob.Filename)
108+
}
109+
} else {
110+
l.Printf("psCh FULL!\n")
85111
}
86-
} else {
87-
l.Printf("psCh FULL!\n")
88112
}
89113
}
90114

cmd/gobler/sender.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (s *sender) SenderWorker(psCh <-chan *spool.FileGob, psfCh chan<- *spool.Fi
4343
l.Printf("SENDER %s#%d: newspool returned error %s\n", s.connector, s.num, err)
4444
continue
4545
}
46-
mp, err := sd.FetchGob(msg.Filename)
46+
mp, err := sd.FetchGob(msg.Filename, l)
4747
if err != nil {
4848
l.Printf("SENDER %s#%d: fetchgob returned error %s\n", s.connector, s.num, err)
4949
continue
@@ -60,9 +60,11 @@ func (s *sender) SenderWorker(psCh <-chan *spool.FileGob, psfCh chan<- *spool.Fi
6060
err = os.Remove(s.spoolDir + "/" + msg.Filename)
6161
if err != nil {
6262
l.Printf("SENDER %s#%d: error removing file %s\n", s.connector, s.num, err)
63+
// todo: unlock and return error? or leave this logged and proceed?
64+
} else {
65+
l.Printf("SENDER %s#%d: Gob deleted\n", s.connector, s.num)
6366
}
6467
lock.Unlock()
65-
l.Printf("SENDER %s#: Gob deleted\n", s.connector)
6668
}
6769
}
6870
l.Println("======================= Sender end =============================================")

cmd/goslmailer/goslmailer.conf.annotated_example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"mailCmd": "/bin/mail", # This would be the place to put the content of original slurm MailProg parameter, e.g. '/bin/mail'
1616
"mailCmdParams": "-s \"Job {{ .SlurmEnvironment.SLURM_JOB_ID }} ({{ .SlurmEnvironment.SLURM_JOB_NAME }}) {{ .SlurmEnvironment.SLURM_JOB_MAIL_TYPE }}\"", # templateable subject line with data from JobContext structure
1717
"mailTemplate": "/etc/slurm/mailTemplate.tmpl", # unused, to be used as mail body
18-
"allowList": "pja", # unused, allow certain e-mail targets to use this connector
18+
"allowList": ".+@(x|y).ac.at", # re2 expression https://github.com/google/re2/wiki/Syntax , describing e-mails that are allowed to send
1919
"blockList": "" # unused, block certain e-mail targets to use this connector
2020
},
2121
"textfile": { # fictitious "textfile" connector, package code for it doesn't exist, implementation left as the exercise for the reader

connectors/mailto/send.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package mailto
22

33
import (
44
"bytes"
5+
"errors"
56
"log"
67
"os/exec"
8+
"regexp"
79
"text/template"
810

911
"github.com/pja237/goslmailer/internal/message"
@@ -38,6 +40,19 @@ func (c *Connector) SendMessage(mp *message.MessagePack, useSpool bool, l *log.L
3840
// todo:
3941
// - call lookup on targetUserId
4042
// - test if in allowList/blockList
43+
// - implement useSpool mechanics for gobler
44+
45+
// allowList
46+
re, err := regexp.Compile(c.allowList)
47+
if err != nil {
48+
return err
49+
}
50+
if !re.Match([]byte(mp.TargetUser)) {
51+
// not in allowList
52+
return errors.New("not allowed to send mail to user")
53+
}
54+
55+
// send:
4156
cmd := exec.Command(c.mailCmd, cmdparams.String(), mp.TargetUser)
4257
//cmd.Stdin = bytes.NewBuffer([]byte{0x04})
4358
out, e := cmd.Output()

0 commit comments

Comments
 (0)