Skip to content

Commit

Permalink
Syncronize topic operations
Browse files Browse the repository at this point in the history
  • Loading branch information
jupp0r committed Apr 21, 2016
1 parent ad913f4 commit 3cc21e8
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 14 deletions.
3 changes: 2 additions & 1 deletion hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@ func (h *pubsubHub) Dump(topic string) []string {
return []string{}
}

return t.Dump()
res := t.Dump()
return res
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dump(w http.ResponseWriter, r *http.Request) {
messages := hub.Dump(topic)

for _, message := range messages {
c.WriteMessage(websocket.TextMessage, []byte(message))
c.WriteJSON([]byte(message))
}
}

Expand Down
2 changes: 1 addition & 1 deletion persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (p *inMemoryPersister) run() {
log.Info("persisting", "message", message)
p.messages = append(p.messages, message)
case read := <-p.readChannel:
log.Info("writing persisted messages", "num", len(p.messages))
log.Info("writing persisted messages", "messages", p.messages)
read <- p.messages
case <-p.closeChannel:
return
Expand Down
28 changes: 27 additions & 1 deletion persister_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package main

import "testing"
import (
"strconv"
"testing"
)

func TestInMemoryPersister(t *testing.T) {
p := NewInMemoryPersister()
Expand All @@ -23,3 +26,26 @@ func TestInMemoryPersister(t *testing.T) {
}
}
}

func TestDumping(t *testing.T) {
p := NewInMemoryPersister()

for k := 0; k < 100; k++ {
testData := []string{}
for i := 0; i < 10; i++ {
testData = append(testData, strconv.Itoa(i))
}

for _, s := range testData {
p.Persist(s)
}

result := p.Read()

for i, _ := range testData {
if testData[i] != result[i] {
t.Fatalf("Read wrong data. Expected %s, got %s", testData[i], result[i])
}
}
}
}
41 changes: 31 additions & 10 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,25 @@ type Topic interface {
type addMessage struct {
Conn connection
Channel chan []byte
ready chan struct{}
}

type removeMessage struct {
Conn connection
ready chan struct{}
}

type publishMessage struct {
Message string
ready chan struct{}
}

type pubsubTopic struct {
subscribers map[connection]chan []byte
publishChannel chan publishMessage
addChannel chan addMessage
removeChannel chan connection
removeChannel chan removeMessage
readChannel chan chan []string
Persister
}

Expand All @@ -33,7 +41,8 @@ func NewTopic(p Persister, log log.Logger) Topic {
subscribers: make(map[connection]chan []byte),
publishChannel: make(chan publishMessage),
addChannel: make(chan addMessage),
removeChannel: make(chan connection),
removeChannel: make(chan removeMessage),
readChannel: make(chan chan []string),
Persister: p,
}

Expand All @@ -45,20 +54,24 @@ func NewTopic(p Persister, log log.Logger) Topic {
func (t *pubsubTopic) AddSubscriber(c connection, log log.Logger) chan []byte {
log.Info("Add subscriber")
subscriberChannel := make(chan []byte, 10000)

t.addChannel <- addMessage{c, subscriberChannel}

ready := make(chan struct{})
t.addChannel <- addMessage{c, subscriberChannel, ready}
<-ready
return subscriberChannel
}

func (t *pubsubTopic) RemoveSubscriber(c connection, log log.Logger) {
log.Info("Remove subscriber")
t.removeChannel <- c
ready := make(chan struct{})
t.removeChannel <- removeMessage{c, ready}
<-ready
}

func (t *pubsubTopic) Publish(message string, log log.Logger) {
log.Info("Publish", "message", message)
t.publishChannel <- publishMessage{message}
ready := make(chan struct{})
t.publishChannel <- publishMessage{message, ready}
<-ready
}

func (t *pubsubTopic) run() {
Expand All @@ -67,10 +80,15 @@ func (t *pubsubTopic) run() {
case m := <-t.publishChannel:
t.publishToSubscribers(m.Message)
t.Persist(m.Message)
m.ready <- struct{}{}
case add := <-t.addChannel:
t.addSubscriber(add)
case conn := <-t.removeChannel:
t.removeSubscriber(conn)
add.ready <- struct{}{}
case remove := <-t.removeChannel:
t.removeSubscriber(remove.Conn)
remove.ready <- struct{}{}
case read := <-t.readChannel:
read <- t.Read()
}
}
}
Expand Down Expand Up @@ -98,5 +116,8 @@ func publishToSubscriber(message string, c chan []byte) {
}

func (t *pubsubTopic) Dump() []string {
return t.Read()
result := make(chan []string)
t.readChannel <- result
res := <-result
return res
}

0 comments on commit 3cc21e8

Please sign in to comment.