forked from rapidpro/mailroom
/
session_triggered.go
136 lines (108 loc) · 3.8 KB
/
session_triggered.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package hooks
import (
"context"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/queue"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
func init() {
models.RegisterEventHook(events.TypeSessionTriggered, handleSessionTriggered)
}
// StartStartHook is our hook to fire our session starts
type StartStartHook struct{}
var startStartHook = &StartStartHook{}
// InsertStartHook is our hook to fire insert our starts
type InsertStartHook struct{}
var insertStartHook = &InsertStartHook{}
// Apply queues up our flow starts
func (h *StartStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
rc := rp.Get()
defer rc.Close()
// for each of our sessions
for _, es := range sessions {
for _, e := range es {
start := e.(*models.FlowStart)
taskQ := queue.HandlerQueue
priority := queue.DefaultPriority
// if we are starting groups, queue to our batch queue instead, but with high priority
if len(start.GroupIDs()) > 0 {
taskQ = queue.BatchQueue
priority = queue.HighPriority
}
err := queue.AddTask(rc, taskQ, queue.StartFlow, int(org.OrgID()), start, priority)
if err != nil {
return errors.Wrapf(err, "error queuing flow start")
}
}
}
return nil
}
// Apply inserts our starts
func (h *InsertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
rc := rp.Get()
defer rc.Close()
starts := make([]*models.FlowStart, 0, len(sessions))
// for each of our sessions
for s, es := range sessions {
for _, e := range es {
event := e.(*events.SessionTriggeredEvent)
// we skip over any session starts that involve groups if we are in a batch start
if len(sessions) > 1 && len(event.Groups) > 0 {
logrus.WithField("session_id", s.ID).Error("ignoring session trigger on group in batch")
continue
}
// look up our flow
f, err := org.Flow(event.Flow.UUID)
if err != nil {
return errors.Wrapf(err, "unable to load flow with UUID: %s", event.Flow.UUID)
}
flow := f.(*models.Flow)
// load our groups by uuid
groupIDs := make([]models.GroupID, 0, len(event.Groups))
for i := range event.Groups {
group := org.GroupByUUID(event.Groups[i].UUID)
if group != nil {
groupIDs = append(groupIDs, group.ID())
}
}
// load our contacts by uuid
contactIDs, err := models.ContactIDsFromReferences(ctx, tx, org, event.Contacts)
if err != nil {
return errors.Wrapf(err, "error loading contacts by reference")
}
// create our start
start := models.NewFlowStart(
org.OrgID(), flow.FlowType(), flow.ID(),
groupIDs, contactIDs, event.URNs, event.CreateContact,
true, true,
event.RunSummary, nil,
)
starts = append(starts, start)
// this will add our task for our start after we commit
s.AddPostCommitEvent(startStartHook, start)
}
}
// insert all our starts
err := models.InsertFlowStarts(ctx, tx, starts)
if err != nil {
return errors.Wrapf(err, "error inserting flow starts for session triggers")
}
return nil
}
// handleSessionTriggered queues this event for being started after our sessions are committed
func handleSessionTriggered(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error {
event := e.(*events.SessionTriggeredEvent)
logrus.WithFields(logrus.Fields{
"contact_uuid": session.ContactUUID(),
"session_id": session.ID(),
"flow": event.Flow.Name,
"flow_uuid": event.Flow.UUID,
}).Debug("session triggered")
session.AddPreCommitEvent(insertStartHook, event)
return nil
}