-
Notifications
You must be signed in to change notification settings - Fork 1
/
room.go
139 lines (121 loc) · 3.72 KB
/
room.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package models
import (
"encoding/json"
"net/http"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
cache "github.com/patrickmn/go-cache"
"github.com/phuonglvh/golang-first-pet/config"
logger "github.com/phuonglvh/golang-first-pet/utils/logger"
)
const (
socketBufferSize = 1024
messageBufferSize = 256
)
var upgrader = &websocket.Upgrader{ReadBufferSize: socketBufferSize, WriteBufferSize: socketBufferSize}
// Room define model for a room
type Room struct {
ID string
// Forward is a channel that holds incoming messages
// that should be forwarded to the other clients.
Forward chan *RawClientMessage
// join is a channel for clients wishing to join the room.
Join chan *Client
// Leave is a channel for clients wishing to Leave the room.
Leave chan *Client
// clients holds all current clients in this room.
clients map[*Client]bool
storage *cache.Cache
}
// NewRoom will create a new room
func NewRoom(ID string) *Room {
lifetime := config.Env.Chat.Message.Lifetime
cacheTTL := time.Duration(lifetime) * time.Minute
logger.Info.Printf("Set message lifetime to: %d minutes", lifetime)
return &Room{
ID: ID,
Forward: make(chan *RawClientMessage),
Join: make(chan *Client),
Leave: make(chan *Client),
clients: make(map[*Client]bool),
storage: cache.New(cacheTTL, cacheTTL),
}
}
// Run will start listen to room event
func (room *Room) Run() {
for {
select {
case client := <-room.Join:
// joining
room.clients[client] = true
logger.Info.Printf("Client %s has joined the room %s", client.ID, room.ID)
// room.sendPastMessages(client)
case client := <-room.Leave:
// leaving
delete(room.clients, client)
close(client.Send)
logger.Info.Printf("Client %s has left the room %s", client.ID, room.ID)
case fwdMsg := <-room.Forward:
message := &Message{
ID: uuid.New().String(),
Content: fwdMsg.Content,
Sender: fwdMsg.Sender,
Timestamp: time.Now().Unix() * 1000,
}
room.storage.Add(message.ID, message, cache.DefaultExpiration)
logger.Trace.Printf("Client has sent message to others in room %s: %s", room.ID, fwdMsg.Content)
// forward message to all clients
room.sendMessageToAllExcept(message, fwdMsg.Sender)
}
}
}
// GetMessages returns room's valid messages
func (room *Room) GetMessages() []*Message {
messages := []*Message{}
items := room.storage.Items()
for _, msg := range items {
messages = append(messages, msg.Object.(*Message))
}
return messages
}
func (room *Room) sendMessageToAll(message *Message) {
for client := range room.clients {
room.sendMessageToClient(client, message)
}
}
func (room *Room) sendMessageToAllExcept(message *Message, clientID string) {
for client := range room.clients {
if client.ID == clientID {
continue
}
room.sendMessageToClient(client, message)
}
}
func (room *Room) sendMessageToClient(client *Client, messagge *Message) {
bytes, _ := json.Marshal(messagge)
client.Send <- bytes
logger.Trace.Printf("Sent a message to client %s: %s", client.ID, string(bytes))
}
func (room *Room) sendPastMessages(client *Client) {
logger.Trace.Printf("Sending past %d messages to client %s", room.storage.ItemCount(), client.ID)
items := room.storage.Items()
for _, msg := range items {
room.sendMessageToClient(client, msg.Object.(*Message))
}
}
func (room *Room) ServeHTTP(w http.ResponseWriter, req *http.Request) {
socket, err := upgrader.Upgrade(w, req, nil)
if err != nil {
logger.Error.Fatalf("Room %s has encountered error while serving http: %s", room.ID, err)
return
}
logger.Info.Printf("Room %s is waiting for clients", room.ID)
client := &Client{
Socket: socket,
Send: make(chan []byte, messageBufferSize),
Room: room,
}
room.Join <- client
client.Read()
}