forked from Cloudxtreme/lavabot
-
Notifications
You must be signed in to change notification settings - Fork 2
/
hub_churner.go
119 lines (105 loc) · 3.08 KB
/
hub_churner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"encoding/json"
"errors"
"sort"
"time"
"github.com/bitly/go-nsq"
r "github.com/dancannon/gorethink"
"github.com/dchest/uniuri"
)
type HubEvent struct {
Type string `json:"type"`
Email string `json:"email"`
FirstName string `json:"first_name"`
}
func initChurner(change chan struct{}) {
cons, err := nsq.NewConsumer("hub", "hub", nsq.NewConfig())
if err != nil {
log.WithField("error", err.Error()).Fatal("Unable to consume the hub topic")
}
cons.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
log.Print("Handling hub event")
var ev *HubEvent
if err := json.Unmarshal(m.Body, &ev); err != nil {
log.Print("Erroring - invalid body")
return err
}
log.Printf("Type of the hub event is %s", ev.Type)
switch ev.Type {
case "onboarding":
log.Print("Diving into the lock")
stateLock.Lock()
log.Print("I'm inside!")
// Four emails in total
timers := []*Timer{
// 1. Welcome to Lavaboom
&Timer{
ID: uniuri.NewLen(uniuri.UUIDLen),
Time: time.Now().Add(time.Second * 15),
Name: *welcomeName,
Version: *welcomeVersion,
Sender: "hello",
From: "Felix from Lavaboom <hello@lavaboom.com>",
To: []string{ev.Email},
Input: map[string]interface{}{
"first_name": ev.FirstName,
},
},
// 2. Getting started
&Timer{
ID: uniuri.NewLen(uniuri.UUIDLen),
Time: time.Now().Add(time.Second * 30),
Name: *gettingStartedName,
Version: *gettingStartedVersion,
Sender: "hello",
From: "Tine from Lavaboom <hello@lavaboom.com>",
To: []string{ev.Email},
Input: map[string]interface{}{
"first_name": ev.FirstName,
},
},
// 3. Security information
&Timer{
ID: uniuri.NewLen(uniuri.UUIDLen),
Time: time.Now().Add(time.Minute * 2),
Name: *securityName,
Version: *securityVersion,
Sender: "hello",
From: "Andrei from Lavaboom <hello@lavaboom.com>",
To: []string{ev.Email},
Input: map[string]interface{}{
"first_name": ev.FirstName,
},
},
// 4. How's it going?
&Timer{
ID: uniuri.NewLen(uniuri.UUIDLen),
Time: time.Now().Add(time.Minute * 15),
Name: *whatsUpName,
Version: *whatsUpVersion,
Sender: "hello",
From: "Lavabot from Lavaboom <hello@lavaboom.com>",
To: []string{ev.Email},
Input: map[string]interface{}{
"first_name": ev.FirstName,
},
},
}
state = append(state, timers...)
if err := r.Db(*rethinkdbDatabase).Table("hub_state").Insert(timers).Exec(session); err != nil {
log.WithField("error", err.Error()).Error("Unable to insert events into database")
stateLock.Unlock()
return err
}
// Sort it and ping the worker
sort.Sort(state)
stateLock.Unlock()
change <- struct{}{}
default:
return errors.New("Not implemented")
}
return nil
}))
cons.ConnectToNSQLookupd(*lookupdAddress)
}