-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.go
178 lines (154 loc) · 3.36 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// This program extends part 8.
//
// It connects to the peer specified by -peer.
// It accepts connections from peers and receives messages from them.
// When it sees a peer with an address it hasn't seen before, it makes a
// connection to that peer.
// It adds an ID field containing a random string to each outgoing message.
// When it recevies a message with an ID it hasn't seen before, it broadcasts
// that message to all connected peers.
//
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"os"
"sync"
"github.com/sbinet/whispering-gophers/util"
)
var (
peerAddr = flag.String("peer", "", "peer host:port")
self string
)
type Message struct {
// TODO: add ID field
Addr string
Body string
}
func main() {
flag.Parse()
l, err := util.Listen()
if err != nil {
log.Fatal(err)
}
self = l.Addr().String()
log.Println("Listening on", self)
go dial(*peerAddr)
go readInput()
for {
c, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go serve(c)
}
}
var peers = &Peers{m: make(map[string]chan<- Message)}
type Peers struct {
m map[string]chan<- Message
mu sync.RWMutex
}
// Add creates and returns a new channel for the given peer address.
// If an address already exists in the registry, it returns nil.
func (p *Peers) Add(addr string) <-chan Message {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.m[addr]; ok {
return nil
}
ch := make(chan Message)
p.m[addr] = ch
return ch
}
// Remove deletes the specified peer from the registry.
func (p *Peers) Remove(addr string) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.m, addr)
}
// List returns a slice of all active peer channels.
func (p *Peers) List() []chan<- Message {
p.mu.RLock()
defer p.mu.RUnlock()
l := make([]chan<- Message, 0, len(p.m))
for _, ch := range p.m {
l = append(l, ch)
}
return l
}
func broadcast(m Message) {
for _, ch := range peers.List() {
select {
case ch <- m:
default:
// Okay to drop messages sometimes.
}
}
}
func serve(c net.Conn) {
defer c.Close()
d := json.NewDecoder(c)
for {
var m Message
err := d.Decode(&m)
if err != nil {
log.Println(err)
return
}
// TODO: If this message has seen before, ignore it.
fmt.Printf("%#v\n", m)
broadcast(m)
go dial(m.Addr)
}
}
func readInput() {
s := bufio.NewScanner(os.Stdin)
for s.Scan() {
m := Message{
// TODO: use util.RandomID to populate the ID field.
Addr: self,
Body: s.Text(),
}
// TODO: Mark the message ID as seen.
broadcast(m)
}
if err := s.Err(); err != nil {
log.Fatal(err)
}
}
func dial(addr string) {
if addr == self {
return // Don't try to dial self.
}
ch := peers.Add(addr)
if ch == nil {
return // Peer already connected.
}
defer peers.Remove(addr)
c, err := net.Dial("tcp", addr)
if err != nil {
log.Println(addr, err)
return
}
defer c.Close()
e := json.NewEncoder(c)
for m := range ch {
err := e.Encode(m)
if err != nil {
log.Println(addr, err)
return
}
}
}
// TODO: Create a new map of seen message IDs and a mutex to protect it.
// Seen returns true if the specified id has been seen before.
// If not, it returns false and marks the given id as "seen".
func Seen(id string) bool {
// TODO: Get a write lock on the seen message IDs map and unlock it at before returning.
// TODO: Check if the id has been seen before and return that later.
// TODO: Mark the ID as seen in the map.
}