Skip to content

Commit

Permalink
Merge pull request #1 from ww24/manager
Browse files Browse the repository at this point in the history
Manager
  • Loading branch information
ww24 committed Oct 18, 2015
2 parents 1c6ecfc + 41fc93e commit bf36d90
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 115 deletions.
31 changes: 31 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
language: go
go:
- tip
env:
- "PATH=/home/travis/gopath/bin:$PATH"
before_install:
- go get github.com/mitchellh/gox
- go get github.com/tcnksm/ghr
- go get github.com/axw/gocov/gocov
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
script:
- go test -v -covermode=count -coverprofile=coverage.out
- goveralls -coverprofile=coverage.out -service travis-ci -repotoken $COVERALLS_TOKEN
before_deploy:
- gox -output dist/{{.OS}}_{{.Arch}}/{{.Dir}}
- find dist -name "*_*" -type d -exec cp -R static {} \;
- find dist -name "*_*" -type d -exec zip -r {}".zip" {}"/" \;
- zip -r all-in-one.zip dist/*
- tar czf all-in-one.tar.gz dist/*
deploy:
provider: releases
api_key: $GITHUB_TOKEN
file:
- all-in-one.zip
- all-in-one.tar.gz
skip_cleanup: true
on:
tags: true
after_deploy:
- ghr --username $(echo $TRAVIS_REPO_SLUG | awk 'BEGIN{FS="/"}{print$1}') --token $GITHUB_TOKEN --replace $TRAVIS_TAG dist/
34 changes: 2 additions & 32 deletions gopham.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package main

import (
"encoding/json"
"flag"
"log"
"net"
Expand All @@ -22,7 +21,7 @@ import (
)

var (
connAdd, connDel, connSafe = pham.ConnectionManager()
manager = pham.NewConnectionManager()
)

func main() {
Expand Down Expand Up @@ -114,40 +113,11 @@ func NewHandler() http.Handler {
}

// encode json
bytes, err := json.Marshal(data)
connectionLen, err := manager.Broadcast(data)
if err != nil {
panic(err)
}

connectionLen := 0
// broadcast message
connSafe(func(connections []pham.Connection) {
defer func() {
cause := recover()
if cause != nil {
err = cause.(error)
}
}()

connectionLen = len(connections)
for _, connection := range connections {
err := connection.Send(bytes)
if err != nil {
panic(err)
}
}
})

if err != nil {
ctx.JSON(500, gin.H{
"status": "ng",
"error": err.Error(),
"connections": connectionLen,
"message": data,
})
return
}

ctx.JSON(200, gin.H{
"status": "ok",
"connections": connectionLen,
Expand Down
103 changes: 91 additions & 12 deletions gopham_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"net/http"
"net/http/httptest"
"sync"
"testing"

"github.com/ww24/gopham/pham"
Expand All @@ -15,6 +16,28 @@ import (
"golang.org/x/net/websocket"
)

func post(url string, data pham.JSON) (str string, err error) {
jsonb, err := json.Marshal(data)
if err != nil {
return
}

buf := bytes.NewBuffer(jsonb)
res, err := http.Post(url, "application/json", buf)
if err != nil {
return
}
defer res.Body.Close()

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return
}
str = string(body)

return
}

func TestWebSocket(t *testing.T) {
ts := httptest.NewServer(NewHandler())
defer ts.Close()
Expand Down Expand Up @@ -85,24 +108,80 @@ func TestServerSentEvents(t *testing.T) {
<-ch
}

func post(url string, data pham.JSON) (str string, err error) {
jsonb, err := json.Marshal(data)
if err != nil {
return
func BenchmarkWebSocket(b *testing.B) {
ts := httptest.NewServer(NewHandler())
defer ts.Close()

wg := new(sync.WaitGroup)
for i := 0; i < b.N; i++ {
ws, err := websocket.Dial("ws://"+ts.Listener.Addr().String()+"/subscribe", "", "http://localhost/")
if err != nil {
panic(err)
}
defer ws.Close()

wg.Add(1)

go func() {
msg := make([]byte, 512)
size, err := ws.Read(msg)
if err != nil {
log.Println("err:", err)
}
log.Println("TestWebSocket:received:", size, string(msg[:size]))

wg.Done()
}()
}

buf := bytes.NewBuffer(jsonb)
res, err := http.Post(url, "application/json", buf)
// send realtime message
res, err := post(ts.URL, pham.JSON{
"channel": "test",
"data": pham.JSON{
"type": "ping",
},
})
if err != nil {
return
panic(err)
}
defer res.Body.Close()
log.Println("TestWebSocket:sent:", res)

body, err := ioutil.ReadAll(res.Body)
wg.Wait()
}

func BenchmarkServerSentEvents(b *testing.B) {
ts := httptest.NewServer(NewHandler())
defer ts.Close()

wg := new(sync.WaitGroup)
for i := 0; i < b.N; i++ {
cli, err := client.NewSSEClient(ts.URL + "/sse")
if err != nil {
panic(err)
}
defer cli.Close()

wg.Add(1)

go func() {
data := <-cli.Listener
log.Println("TestServerSentEvents:received:", data)

wg.Done()
}()
}

// send realtime message
res, err := post(ts.URL, pham.JSON{
"channel": `test`,
"data": pham.JSON{
"type": "ping",
},
})
if err != nil {
return
panic(err)
}
str = string(body)
log.Println("TestServerSentEvents:sent:", res)

return
wg.Wait()
}
58 changes: 0 additions & 58 deletions pham/connections.go

This file was deleted.

95 changes: 95 additions & 0 deletions pham/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package pham

import (
"encoding/json"
"log"
"sync"
)

var (
manager = &ConnectionManager{
connections: make([]Connection, 0, 100),
connMutex: new(sync.Mutex),
connAdd: make(chan Connection, 1),
connDel: make(chan Connection, 1),
}
)

// NewConnectionManager is ConnectionManager constructor
func NewConnectionManager() (connectionManager *ConnectionManager) {
connectionManager = manager

// watch add & delete event
go func() {
for {
func() {
select {
case conn := <-manager.connAdd:
log.Println("server: new connection")
manager.connMutex.Lock()
defer manager.connMutex.Unlock()

manager.connections = append(manager.connections, conn)
log.Println("connections:", len(manager.connections))

case conn := <-manager.connDel:
log.Println("server: connection closed")
manager.connMutex.Lock()
defer manager.connMutex.Unlock()

for i, ws := range manager.connections {
if ws == conn {
manager.connections = append(manager.connections[:i], manager.connections[i+1:]...)
log.Println("connections:", len(manager.connections))
break
}
}
}
}()
}
}()

return
}

// AddConnection add connection to connections
func (manager *ConnectionManager) AddConnection(conn Connection) {
manager.connAdd <- conn
}

// DelConnection delete connection from connections
func (manager *ConnectionManager) DelConnection(conn Connection) {
manager.connDel <- conn
}

// Broadcast message
func (manager *ConnectionManager) Broadcast(data JSON) (length int, err error) {
defer manager.connMutex.Unlock()
manager.connMutex.Lock()

// get connections length
length = len(manager.connections)

// encode json
bytes, err := json.Marshal(data)
if err != nil {
return
}

// broadcast
for _, connection := range manager.connections {
err = connection.Send(bytes)
if err != nil {
return
}
}

return
}

// ConnEach get connections safety
func (manager *ConnectionManager) ConnEach(f func([]Connection)) {
defer manager.connMutex.Unlock()
manager.connMutex.Lock()
f(manager.connections)
}
Loading

0 comments on commit bf36d90

Please sign in to comment.