/
sync.go
108 lines (82 loc) · 1.87 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package flow
import (
"encoding/json"
"fmt"
"github.com/gomodule/redigo/redis"
"go.uber.org/zap"
)
const CancelActionMessage = "cancelAction"
// SyncSubscribeTo subscribes to direktiv interna postgres pub/sub
func SyncSubscribeTo(log *zap.Logger, addr, topic string, fn func(interface{})) error {
pool := &redis.Pool{
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", addr)
},
}
conn := pool.Get()
_, err := conn.Do("PING")
if err != nil {
return fmt.Errorf("can't connect to redis, got error:\n%v", err)
}
go func() {
rc := pool.Get()
psc := redis.PubSubConn{Conn: rc}
if err := psc.PSubscribe(topic); err != nil {
log.Error(err.Error())
}
for {
switch v := psc.Receive().(type) {
default:
data, _ := json.Marshal(v)
log.Debug(string(data))
case redis.Message:
req := new(PubsubUpdate)
err = json.Unmarshal(v.Data, req)
if err != nil {
log.Error(fmt.Sprintf("Unexpected notification on database listener: %v", err))
} else {
fn(req)
}
}
}
}()
// reportProblem := func(ev pq.ListenerEventType, err error) {
// if err != nil {
// log.Error(err.Error())
// }
// }
/*
listener := pq.NewListener(dbConnString, 10*time.Second,
time.Minute, reportProblem)
err := listener.Listen(flowSync)
if err != nil {
return err
}
go func(l *pq.Listener) {
defer func() {
l.UnlistenAll()
l.Close()
}()
for {
notification, more := <-l.Notify
if !more {
log.Info("Database listener closed.")
return
}
if notification == nil {
continue
}
req := new(PubsubUpdate)
err = json.Unmarshal([]byte(notification.Extra), req)
if err != nil {
log.Error(fmt.Sprintf("Unexpected notification on database listener: %v", err))
continue
}
if req.Handler == topic {
fn(req)
}
}
}(listener)
*/
return nil
}