forked from Cloud-Foundations/Dominator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
getImageUpdates.go
118 lines (113 loc) · 3.36 KB
/
getImageUpdates.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package rpcd
import (
"github.com/Cloud-Foundations/Dominator/lib/image"
"github.com/Cloud-Foundations/Dominator/lib/srpc"
"github.com/Cloud-Foundations/Dominator/proto/imageserver"
)
func (t *srpcType) GetImageUpdates(conn *srpc.Conn) error {
defer conn.Flush()
t.logger.Printf("New image replication client connected from: %s\n",
conn.RemoteAddr())
select {
case <-t.finishedReplication:
default:
t.logger.Println(
"Blocking replication client until I've finished replicating")
<-t.finishedReplication
t.logger.Printf(
"Replication finished, unblocking replication client: %s\n",
conn.RemoteAddr())
}
t.incrementNumReplicationClients(true)
defer t.incrementNumReplicationClients(false)
addChannel := t.imageDataBase.RegisterAddNotifier()
deleteChannel := t.imageDataBase.RegisterDeleteNotifier()
mkdirChannel := t.imageDataBase.RegisterMakeDirectoryNotifier()
defer t.imageDataBase.UnregisterAddNotifier(addChannel)
defer t.imageDataBase.UnregisterDeleteNotifier(deleteChannel)
defer t.imageDataBase.UnregisterMakeDirectoryNotifier(mkdirChannel)
directories := t.imageDataBase.ListDirectories()
image.SortDirectories(directories)
for _, directory := range directories {
imageUpdate := imageserver.ImageUpdate{
Directory: &directory,
Operation: imageserver.OperationMakeDirectory,
}
if err := conn.Encode(imageUpdate); err != nil {
t.logger.Println(err)
return err
}
}
for _, imageName := range t.imageDataBase.ListImages() {
imageUpdate := imageserver.ImageUpdate{Name: imageName}
if err := conn.Encode(imageUpdate); err != nil {
t.logger.Println(err)
return err
}
}
// Signal end of initial image list.
if err := conn.Encode(imageserver.ImageUpdate{}); err != nil {
t.logger.Println(err)
return err
}
if err := conn.Flush(); err != nil {
t.logger.Println(err)
return err
}
t.logger.Println(
"Finished sending initial image list to replication client")
closeChannel := conn.GetCloseNotifier()
for {
select {
case imageName := <-addChannel:
if err := sendUpdate(conn, imageName,
imageserver.OperationAddImage); err != nil {
t.logger.Println(err)
return err
}
case imageName := <-deleteChannel:
if err := sendUpdate(conn, imageName,
imageserver.OperationDeleteImage); err != nil {
t.logger.Println(err)
return err
}
case directory := <-mkdirChannel:
if err := sendMakeDirectory(conn, directory); err != nil {
t.logger.Println(err)
return err
}
case err := <-closeChannel:
if err == nil {
t.logger.Printf("Image replication client disconnected: %s\n",
conn.RemoteAddr())
return nil
}
t.logger.Println(err)
return err
}
if err := conn.Flush(); err != nil {
t.logger.Println(err)
return err
}
}
}
func (t *srpcType) incrementNumReplicationClients(increment bool) {
t.numReplicationClientsLock.Lock()
defer t.numReplicationClientsLock.Unlock()
if increment {
t.numReplicationClients++
} else {
t.numReplicationClients--
}
}
func sendUpdate(encoder srpc.Encoder, name string, operation uint) error {
imageUpdate := imageserver.ImageUpdate{Name: name, Operation: operation}
return encoder.Encode(imageUpdate)
}
func sendMakeDirectory(encoder srpc.Encoder, directory image.Directory) error {
imageUpdate := imageserver.ImageUpdate{
Directory: &directory,
Operation: imageserver.OperationMakeDirectory,
}
return encoder.Encode(imageUpdate)
}