Skip to content

Commit

Permalink
Parallel message processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
kak-tus committed Nov 7, 2023
1 parent 8a98c8a commit c11f33e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
2023-11-07 v1.6.0
- Parallel message processing.

2023-08-31 v1.5.6
- Ban old users with first message (bigger newbie store period).

Expand Down
11 changes: 6 additions & 5 deletions config/cnf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ type Cnf struct {
}

type Tg struct {
BotName string
Path string
Proxy string
Token string
URL string
BotName string
Path string
Processors int `default:"10"`
Proxy string
Token string
URL string
}

type DB struct {
Expand Down
37 changes: 28 additions & 9 deletions telegram/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type InstanceObj struct {
router *chi.Mux
stop chan bool
stor *storage.InstanceObj
upd tgbotapi.UpdatesChannel
}

type Options struct {
Expand Down Expand Up @@ -80,7 +81,7 @@ func (hdl *InstanceObj) Start() error {

hdl.oldLog.Info(resp.Description)

upd := hdl.bot.ListenForWebhook("/" + hdl.cnf.Path)
hdl.upd = hdl.bot.ListenForWebhook("/" + hdl.cnf.Path)

// HACK TODO
// We must register our handler again in internal router
Expand All @@ -90,6 +91,8 @@ func (hdl *InstanceObj) Start() error {
r.Handle("/"+hdl.cnf.Path, http.DefaultServeMux)
})

hdl.processors()

hdl.lock.Add(1)
defer hdl.lock.Done()

Expand All @@ -106,16 +109,32 @@ func (hdl *InstanceObj) Start() error {
case <-hdl.stop:
tick.Stop()
return nil
case msg := <-upd:
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
}
}
}

err := hdl.process(ctx, msg)
if err != nil {
hdl.oldLog.Error(err)
func (hdl *InstanceObj) processors() {
hdl.lock.Add(hdl.cnf.Processors)

for i := 0; i < hdl.cnf.Processors; i++ {
go func() {
for {
select {
case <-hdl.stop:
hdl.lock.Done()
return
case msg := <-hdl.upd:
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)

err := hdl.process(ctx, msg)
if err != nil {
hdl.oldLog.Error(err)
}

cancel()
}
}

cancel()
}
}()
}
}

Expand Down

0 comments on commit c11f33e

Please sign in to comment.