6
6
"io"
7
7
"log"
8
8
"os"
9
+ "strconv"
9
10
"sync"
10
11
"time"
11
12
@@ -22,11 +23,13 @@ type MsgList []message.MessagePack
22
23
type monitor struct {
23
24
connector string
24
25
spoolDir string
26
+ monitorT time.Duration
25
27
}
26
28
27
29
type picker struct {
28
30
connector string
29
31
msgcount map [string ]int
32
+ pickerT time.Duration
30
33
}
31
34
32
35
type sender struct {
@@ -35,21 +38,34 @@ type sender struct {
35
38
conn connectors.Connector
36
39
}
37
40
38
- func NewPicker (c string ) (* picker , error ) {
39
- var p picker
41
+ func NewPicker (c string , t string ) (* picker , error ) {
42
+ var (
43
+ p picker
44
+ err error
45
+ )
40
46
41
47
p .connector = c
42
48
p .msgcount = map [string ]int {}
49
+ T , err := strconv .ParseUint (t , 10 , 64 )
50
+ if err != nil {
51
+ return nil , err
52
+ }
53
+ p .pickerT = time .Duration (T )
43
54
44
55
return & p , nil
45
56
}
46
57
47
- func NewMonitor (c string , s string ) (* monitor , error ) {
58
+ func NewMonitor (c string , s string , t string ) (* monitor , error ) {
48
59
var m monitor
49
60
50
61
if s != "" {
51
62
m .connector = c
52
63
m .spoolDir = s
64
+ T , err := strconv .ParseUint (t , 10 , 64 )
65
+ if err != nil {
66
+ return nil , err
67
+ }
68
+ m .monitorT = time .Duration (T )
53
69
} else {
54
70
return nil , errors .New ("no spooldir, aborting" )
55
71
}
@@ -109,6 +125,23 @@ func (s *sender) SenderWorker(psCh <-chan *spool.FileGob, psfCh chan<- *spool.Fi
109
125
return nil
110
126
}
111
127
128
+ func (p * picker ) PickNext (allGobs * spool.SpooledGobs ) (* spool.FileGob , error ) {
129
+
130
+ var nextgob spool.FileGob
131
+
132
+ if len (* allGobs ) == 0 {
133
+ return nil , errors .New ("no gobs in spool" )
134
+ }
135
+
136
+ // here implement something meaningful
137
+ for _ , v := range * allGobs {
138
+ nextgob = v
139
+ break
140
+ }
141
+
142
+ return & nextgob , nil
143
+ }
144
+
112
145
func (p * picker ) PickerWorker (mpCh <- chan * spool.SpooledGobs , psCh chan <- * spool.FileGob , psfCh <- chan * spool.FileGob , wg * sync.WaitGroup , l * log.Logger ) error {
113
146
114
147
var newgobs * spool.SpooledGobs
@@ -118,8 +151,9 @@ func (p *picker) PickerWorker(mpCh <-chan *spool.SpooledGobs, psCh chan<- *spool
118
151
119
152
l .Println ("======================= Picker start ===========================================" )
120
153
// configurable picker/sender frequency
121
- ticker := time .Tick (1 * time .Second )
154
+ ticker := time .Tick (p . pickerT * time .Second )
122
155
for {
156
+ l .Printf ("PICKER %s: Users msg count %v\n " , p .connector , p .msgcount )
123
157
select {
124
158
case newgobs = <- mpCh :
125
159
l .Printf ("PICKER %s: Received gobs %#v\n " , p .connector , newgobs )
@@ -133,15 +167,16 @@ func (p *picker) PickerWorker(mpCh <-chan *spool.SpooledGobs, psCh chan<- *spool
133
167
l .Printf ("PICKER %s: Received FAILED gob %#v\n " , p .connector , failedGob )
134
168
// return to allGobs
135
169
allGobs [failedGob .Filename ] = * failedGob
170
+ p .msgcount [failedGob .User ]++
136
171
default :
137
172
l .Printf ("PICKER %s: allGobs: %#v\n " , p .connector , allGobs )
138
173
// HERE, call the Pick() and Send()
139
- for k , v := range allGobs {
140
- // pick first, send and delete
141
- l .Printf ("PICK %s: SEND to Sender: %#v\n " , p .connector , v )
142
- psCh <- & v
143
- delete ( allGobs , k )
144
- break
174
+ nextGob , err := p . PickNext ( & allGobs )
175
+ if err == nil {
176
+ l .Printf ("PICKER %s: SEND to Sender: %#v\n " , p .connector , nextGob )
177
+ p . msgcount [ nextGob . User ] --
178
+ psCh <- nextGob
179
+ delete ( allGobs , nextGob . Filename )
145
180
}
146
181
}
147
182
<- ticker
@@ -159,10 +194,10 @@ func (m *monitor) MonitorWorker(ch chan<- *spool.SpooledGobs, wg *sync.WaitGroup
159
194
160
195
defer wg .Done ()
161
196
// configurable monitor timer
162
- ticker := time .Tick (10 * time .Second )
197
+ ticker := time .Tick (m . monitorT * time .Second )
163
198
164
199
l .Println ("======================= Monitor start ==========================================" )
165
- l .Printf ("MON %s Starting\n " , m .connector )
200
+ l .Printf ("MONITOR %s Starting\n " , m .connector )
166
201
sp , err := spool .NewSpool (m .spoolDir )
167
202
if err != nil {
168
203
return err
@@ -173,7 +208,7 @@ func (m *monitor) MonitorWorker(ch chan<- *spool.SpooledGobs, wg *sync.WaitGroup
173
208
newList , err = sp .GetSpooledGobsList ()
174
209
lock .Unlock ()
175
210
if err != nil {
176
- l .Printf ("MON %s: Failed on Getspooledgobslist(), error %s\n " , m .connector , err )
211
+ l .Printf ("MONITOR %s: Failed on Getspooledgobslist(), error %s\n " , m .connector , err )
177
212
return err
178
213
}
179
214
// iterate over newlist and each file that doesn't exist in old, put into newfiles to be sent to the Picker
@@ -185,15 +220,15 @@ func (m *monitor) MonitorWorker(ch chan<- *spool.SpooledGobs, wg *sync.WaitGroup
185
220
// exists in old, do nothing
186
221
}
187
222
}
188
- l .Printf ("MON %s: Sending newFiles list: %#v\n " , m .connector , newFiles )
223
+ l .Printf ("MONITOR %s: Sending newFiles list: %#v\n " , m .connector , newFiles )
189
224
// send new-found files
190
225
ch <- newFiles
191
226
// oldlist=newlist
192
227
oldList = newList
193
228
// empty newfiles for the next iteration
194
229
newFiles = & spool.SpooledGobs {}
195
230
196
- l .Printf ("MON %s: Sleeping.\n " , m .connector )
231
+ l .Printf ("MONITOR %s: Sleeping.\n " , m .connector )
197
232
//time.Sleep(5 * time.Second)
198
233
//l.Printf("Time: %s\n", <-ticker)
199
234
<- ticker
@@ -211,12 +246,11 @@ func main() {
211
246
wg sync.WaitGroup
212
247
)
213
248
214
- // read configuration
215
- // how to handle hardcoding config file?
249
+ // read gobler configuration
216
250
cfg := config .NewConfigContainer ()
217
- err := cfg .GetConfig ("/etc/slurm/goslmailer .conf" )
251
+ err := cfg .GetConfig ("/etc/slurm/gobler .conf" )
218
252
if err != nil {
219
- fmt .Printf ("getConfig failed: %s" , err )
253
+ fmt .Printf ("getConfig(gobconfig) failed: %s" , err )
220
254
os .Exit (1 )
221
255
}
222
256
@@ -247,29 +281,27 @@ func main() {
247
281
// configurable buffer size
248
282
psChan := make (chan * spool.FileGob , 1 )
249
283
psChanFailed := make (chan * spool.FileGob , 1 )
250
- mon , err := NewMonitor (con , spd )
284
+ mon , err := NewMonitor (con , spd , cfg . Connectors [ con ][ "monitorT" ] )
251
285
if err != nil {
252
- log .Println ("Monitor inst failed" )
286
+ log .Printf ("Monitor %s inst FAILED \n " , con )
253
287
} else {
254
- log .Println ("Monitor startup..." )
288
+ log .Printf ("Monitor %s startup...\n " , con )
255
289
wg .Add (1 )
256
290
go mon .MonitorWorker (mpChan , & wg , log )
257
- log .Println ("Monitor exit..." )
258
291
}
259
- pickr , err := NewPicker (con )
292
+ pickr , err := NewPicker (con , cfg . Connectors [ con ][ "pickerT" ] )
260
293
if err != nil {
261
- log .Println ("Picker inst failed" )
294
+ log .Printf ("Picker %s inst FAILED \n " , con )
262
295
} else {
263
- log .Println ("Picker startup..." )
296
+ log .Printf ("Picker %s startup...\n " , con )
264
297
wg .Add (1 )
265
298
go pickr .PickerWorker (mpChan , psChan , psChanFailed , & wg , log )
266
- log .Println ("Monitor exit..." )
267
299
}
268
300
sendr , err := NewSender (con , cfg .Connectors [con ]["spoolDir" ], & conns )
269
301
if err != nil {
270
- log .Println ("Sender inst failed" )
302
+ log .Printf ("Sender %s inst failed\n " , con )
271
303
} else {
272
- log .Println ("Sender startup..." )
304
+ log .Printf ("Sender %s startup...\n " , con )
273
305
wg .Add (1 )
274
306
go sendr .SenderWorker (psChan , psChanFailed , & wg , log )
275
307
log .Println ("Sender exit..." )
0 commit comments