forked from rapidpro/mailroom
-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.go
125 lines (109 loc) · 3.81 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package ivr
import (
"context"
"encoding/json"
"time"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/queue"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
func init() {
mailroom.AddTaskFunction(queue.StartIVRFlowBatch, handleFlowStartTask)
}
func handleFlowStartTask(ctx context.Context, mr *mailroom.Mailroom, task *queue.Task) error {
// decode our task body
if task.Type != queue.StartIVRFlowBatch {
return errors.Errorf("unknown event type passed to ivr worker: %s", task.Type)
}
batch := &models.FlowStartBatch{}
err := json.Unmarshal(task.Task, batch)
if err != nil {
return errors.Wrapf(err, "error unmarshalling flow start batch: %s", string(task.Task))
}
return HandleFlowStartBatch(ctx, mr.Config, mr.DB, mr.RP, batch)
}
// HandleFlowStartBatch starts a batch of contacts in an IVR flow
func HandleFlowStartBatch(bg context.Context, config *config.Config, db *sqlx.DB, rp *redis.Pool, batch *models.FlowStartBatch) error {
ctx, cancel := context.WithTimeout(bg, time.Minute*5)
defer cancel()
// contacts we will exclude either because they are in a flow or have already been in this one
exclude := make(map[models.ContactID]bool, 5)
// filter out anybody who has has a flow run in this flow if appropriate
if !batch.RestartParticipants() {
// find all participants that have been in this flow
started, err := models.FindFlowStartedOverlap(ctx, db, batch.FlowID(), batch.ContactIDs())
if err != nil {
return errors.Wrapf(err, "error finding others started flow: %d", batch.FlowID())
}
for _, c := range started {
exclude[c] = true
}
}
// filter out our list of contacts to only include those that should be started
if !batch.IncludeActive() {
// find all participants active in other sessions
active, err := models.FindActiveSessionOverlap(ctx, db, models.IVRFlow, batch.ContactIDs())
if err != nil {
return errors.Wrapf(err, "error finding other active sessions: %d", batch.FlowID())
}
for _, c := range active {
exclude[c] = true
}
}
// filter into our final list of contacts
contactIDs := make([]models.ContactID, 0, len(batch.ContactIDs()))
for _, c := range batch.ContactIDs() {
if !exclude[c] {
contactIDs = append(contactIDs, c)
}
}
// load our org assets
org, err := models.GetOrgAssets(ctx, db, batch.OrgID())
if err != nil {
return errors.Wrapf(err, "error loading org assets for org: %d", batch.OrgID())
}
// ok, we can initiate calls for the remaining contacts
contacts, err := models.LoadContacts(ctx, db, org, contactIDs)
if err != nil {
return errors.Wrapf(err, "error loading contacts")
}
// for each contacts, request a call start
for _, contact := range contacts {
start := time.Now()
ctx, cancel := context.WithTimeout(bg, time.Minute)
session, err := RequestCallStart(ctx, config, db, org, batch, contact)
cancel()
if err != nil {
logrus.WithError(err).Errorf("error starting ivr flow for contact: %d and flow: %d", contact.ID(), batch.FlowID())
continue
}
if session == nil {
logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"contact_id": contact.ID(),
"start_id": batch.StartID(),
}).Info("call start skipped, no suitable channel")
continue
}
logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"contact_id": contact.ID(),
"status": session.Status(),
"start_id": batch.StartID(),
"external_id": session.ExternalID(),
}).Info("requested call for contact")
}
// if this is a last batch, mark our start as started
if batch.IsLast() {
err := models.MarkStartComplete(bg, db, batch.StartID())
if err != nil {
return errors.Wrapf(err, "error trying to set batch as complete")
}
}
return nil
}