/
server.go
73 lines (67 loc) · 1.66 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main
import "time"
import "sync"
import "log"
import "net"
import "net/http"
import "fmt"
import gf "github.com/bnclabs/gofast"
import _ "github.com/bnclabs/gofast/http"
var mu sync.Mutex
var transs = make([]*gf.Transport, 0, 100)
func server() {
// start http server
go func() {
log.Println(http.ListenAndServe(":8080", nil))
}()
// listen
lis, err := net.Listen("tcp", options.addr)
if err != nil {
panic(fmt.Errorf("listen failed %v", err))
}
fmt.Printf("listening on %v\n", options.addr)
// wait for connections
runserver(lis)
}
func runserver(lis net.Listener) {
ver := testVersion(1)
opqend := 1000 + options.routines
config := newconfig(1000, opqend)
config["tags"] = options.tags
config["batchsize"] = options.batchsize
conncount := 0
for {
if conn, err := lis.Accept(); err == nil {
name := fmt.Sprintf("server-%v", conncount)
conncount += 1
fmt.Println("new transport", conn.RemoteAddr(), conn.LocalAddr())
trans, err := gf.NewTransport(name, conn, &ver, config)
if err != nil {
panic("NewTransport server failed")
}
mu.Lock()
transs = append(transs, trans)
mu.Unlock()
go func(trans *gf.Transport) {
trans.FlushPeriod(options.flushtick * time.Millisecond)
trans.SendHeartbeat(1 * time.Second)
trans.SubscribeMessage(
&msgPost{},
func(s *gf.Stream, msg gf.BinMessage) gf.StreamCallback {
// Fill up your handler code here.
return nil
})
if err := trans.Handshake(); err != nil {
panic(err)
}
//tick := time.Tick(1 * time.Second)
//for {
// <-tick
// if options.log == "debug" {
// printCounts(trans.Stat())
// }
//}
}(trans)
}
}
}