Skip to content

Commit

Permalink
log file support, data race fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Derek Collison committed Aug 2, 2013
1 parent 377ade5 commit cf0f302
Show file tree
Hide file tree
Showing 25 changed files with 227 additions and 103 deletions.
35 changes: 20 additions & 15 deletions gnatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ package main

import (
"flag"
"log"
"os"
"strings"

"github.com/apcera/gnatsd/server"
Expand All @@ -15,6 +13,7 @@ func main() {
// logging setup
server.LogSetup()

// Server Options
opts := server.Options{}

var showVersion bool
Expand All @@ -24,44 +23,52 @@ func main() {
// Parse flags
flag.IntVar(&opts.Port, "port", server.DEFAULT_PORT, "Port to listen on.")
flag.IntVar(&opts.Port, "p", server.DEFAULT_PORT, "Port to listen on.")
flag.StringVar(&opts.Host, "host", server.DEFAULT_HOST, "Network host to listen on.")
flag.StringVar(&opts.Host, "h", server.DEFAULT_HOST, "Network host to listen on.")
flag.StringVar(&opts.Host, "addr", server.DEFAULT_HOST, "Network host to listen on.")
flag.StringVar(&opts.Host, "a", server.DEFAULT_HOST, "Network host to listen on.")
flag.StringVar(&opts.Host, "net", server.DEFAULT_HOST, "Network host to listen on.")
flag.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.")
flag.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.")
flag.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.")
flag.BoolVar(&opts.Trace, "trace", false, "Enable Trace logging.")
flag.BoolVar(&debugAndTrace, "DV", false, "Enable Debug and Trace logging.")
flag.BoolVar(&opts.Logtime, "T", true, "Timestamp log entries.")
flag.BoolVar(&opts.Logtime, "logtime", true, "Timestamp log entries.")
flag.StringVar(&opts.Username, "user", "", "Username required for connection.")
flag.StringVar(&opts.Password, "pass", "", "Password required for connection.")
flag.StringVar(&opts.Authorization, "auth", "", "Authorization token required for connection.")
flag.IntVar(&opts.HttpPort, "m", 0, "HTTP Port for /varz, /connz endpoints.")
flag.IntVar(&opts.HttpPort, "http_port", 0, "HTTP Port for /varz, /connz endpoints.")
flag.StringVar(&configFile, "c", "", "Configuration file.")
flag.StringVar(&configFile, "config", "", "Configuration file.")
flag.StringVar(&configFile, "P", "", "File to store process pid.")
flag.StringVar(&configFile, "pid", "", "File to store process pid.")
flag.StringVar(&configFile, "l", "", "File to store logging output.")
flag.StringVar(&configFile, "log", "", "File to store logging output.")
flag.StringVar(&opts.PidFile, "P", "", "File to store process pid.")
flag.StringVar(&opts.PidFile, "pid", "", "File to store process pid.")
flag.StringVar(&opts.LogFile, "l", "", "File to store logging output.")
flag.StringVar(&opts.LogFile, "log", "", "File to store logging output.")
flag.BoolVar(&showVersion, "version", false, "Print version information.")
flag.BoolVar(&showVersion, "v", false, "Print version information.")

flag.Usage = server.Usage

flag.Parse()

// Show version and exit
if showVersion {
server.PrintServerAndExit()
}

// One flag can set multiple options.
if debugAndTrace {
opts.Trace, opts.Debug = true, true
}

// Process args, version only for now
// Process args looking for non-flaf options,
// 'version' and 'help' only for now
for _, arg := range flag.Args() {
arg = strings.ToLower(arg)
if arg == "version" {
switch strings.ToLower(arg) {
case "version":
server.PrintServerAndExit()
case "help":
server.Usage()
}
}

Expand All @@ -72,14 +79,12 @@ func main() {
if configFile != "" {
fileOpts, err = server.ProcessConfigFile(configFile)
if err != nil {
log.Printf("%v\n", err)
os.Exit(1)
server.PrintAndDie(err.Error())
}
}

// Create the server with appropriate options.
mOpts := server.MergeOptions(fileOpts, &opts)
s := server.New(mOpts)
s := server.New(server.MergeOptions(fileOpts, &opts))

// Start things up. Block here til done.
s.Start()
Expand Down
6 changes: 3 additions & 3 deletions hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func Jesteress(data []byte) uint32 {
// Cases: 0,1,2,3,4,5,6,7
if (dlen & _DWSZ) > 0 {
k1 := *(*uint64)(unsafe.Pointer(&data[i]))
h32 = uint32(uint64(h32) ^ k1) * _YP32
h32 = uint32(uint64(h32)^k1) * _YP32
i += _DWSZ
}
if (dlen & _WSZ) > 0 {
Expand Down Expand Up @@ -111,10 +111,10 @@ func Yorikke(data []byte) uint32 {
for ; dlen >= _DDDWSZ; dlen -= _DDDWSZ {
k1 := *(*uint64)(unsafe.Pointer(&data[i]))
k2 := *(*uint64)(unsafe.Pointer(&data[i+4]))
h32 = uint32((uint64(h32) ^ (((k1<<5 | k1>>27)) ^ k2)) * _YP32)
h32 = uint32((uint64(h32) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32)
k1 = *(*uint64)(unsafe.Pointer(&data[i+8]))
k2 = *(*uint64)(unsafe.Pointer(&data[i+12]))
h32b = uint32((uint64(h32b) ^ (((k1<<5 | k1>>27)) ^ k2)) * _YP32)
h32b = uint32((uint64(h32b) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32)
i += _DDDWSZ
}
if (dlen & _DDWSZ) > 0 {
Expand Down
6 changes: 3 additions & 3 deletions hashmap/rand_evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ func (h *HashMap) RemoveRandom() {
if h.used == 0 {
return
}
index := (rand.Int())&int(h.msk)
index := (rand.Int()) & int(h.msk)
// Walk forward til we find an entry
for i := index ; i < len(h.bkts) ; i++ {
for i := index; i < len(h.bkts); i++ {
e := &h.bkts[i]
if *e != nil {
*e = (*e).next
Expand All @@ -31,7 +31,7 @@ func (h *HashMap) RemoveRandom() {
}
// If we are here we hit end and did not remove anything,
// use the index and walk backwards.
for i := index ; i >= 0 ; i-- {
for i := index; i >= 0; i-- {
e := &h.bkts[i]
if *e != nil {
*e = (*e).next
Expand Down
2 changes: 1 addition & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (c *client) processMsg(msg []byte) {
}
// Check to see if we have already sent it here.
if rmap == nil {
rmap = make(map[string]struct{}, len(srv.routes))
rmap = make(map[string]struct{}, srv.numRoutes())
}

if sub.client == nil || sub.client.route == nil || sub.client.route.remoteId == "" {
Expand Down
16 changes: 8 additions & 8 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestClientConnect(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true}) {
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -116,7 +116,7 @@ func TestClientConnect(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true, Username:"derek", Password:"foo"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -131,7 +131,7 @@ func TestClientConnect(t *testing.T) {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}

if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true, Username:"derek", Password:"foo", Name:"router"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -146,7 +146,7 @@ func TestClientConnect(t *testing.T) {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}

if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true, Authorization:"YZZ222", Name:"router"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
}
Expand Down Expand Up @@ -289,12 +289,12 @@ func TestClientPubWithQueueSub(t *testing.T) {

var n1, n2, received int
for ; ; received += 1 {
time.Sleep(10*time.Millisecond)
time.Sleep(10 * time.Millisecond)
l, err := cr.ReadString('\n')
if err != nil {
break
}
matches := msgPat.FindAllStringSubmatch(l,-1)[0]
matches := msgPat.FindAllStringSubmatch(l, -1)[0]

// Count which sub
switch matches[SID_INDEX] {
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestClientUnSub(t *testing.T) {

var received int
for ; ; received += 1 {
time.Sleep(10*time.Millisecond)
time.Sleep(10 * time.Millisecond)
l, err := cr.ReadString('\n')
if err != nil {
break
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestClientUnSubMax(t *testing.T) {

var received int
for ; ; received += 1 {
time.Sleep(10*time.Millisecond)
time.Sleep(10 * time.Millisecond)
l, err := cr.ReadString('\n')
if err != nil {
break
Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

const (
VERSION = "go-0.3.1"
VERSION = "go-0.3.2"

DEFAULT_PORT = 4222
DEFAULT_HOST = "0.0.0.0"
Expand Down
18 changes: 16 additions & 2 deletions server/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package server
import (
"fmt"
"log"
"os"
"strings"
"sync/atomic"
)
Expand All @@ -17,15 +18,29 @@ var nolog int32

func LogSetup() {
log.SetFlags(0)
atomic.StoreInt32(&nolog, 0)
atomic.StoreInt32(&debug, 0)
atomic.StoreInt32(&trace, 0)
}

func (s *Server) LogInit() {
// Reset
LogSetup()

if s.opts.Logtime {
log.SetFlags(log.LstdFlags)
}
if s.opts.NoLog {
atomic.StoreInt32(&nolog, 1)
}
if s.opts.LogFile != "" {
flags := os.O_WRONLY | os.O_APPEND | os.O_CREATE
file, err := os.OpenFile(s.opts.LogFile, flags, 0660)
if err != nil {
PrintAndDie(fmt.Sprintf("Error opening logfile: %q", s.opts.LogFile))
}
log.SetOutput(file)
}
if s.opts.Debug {
Log(s.opts)
atomic.StoreInt32(&debug, 1)
Expand Down Expand Up @@ -56,7 +71,7 @@ func logStr(v []interface{}) string {
args = append(args, fmt.Sprintf("%+v", vt))
}
}
return fmt.Sprintf("[%s]", strings.Join(args,", "))
return fmt.Sprintf("[%s]", strings.Join(args, ", "))
}

func Log(v ...interface{}) {
Expand Down Expand Up @@ -100,4 +115,3 @@ func Tracef(format string, v ...interface{}) {
Trace(fmt.Sprintf(format, v...))
}
}

2 changes: 1 addition & 1 deletion server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func ProcessConfigFile(configFile string) (*Options, error) {

data, err := ioutil.ReadFile(configFile)
if err != nil {
return nil, err
return nil, fmt.Errorf("Error opening config file: %v", err)
}

m, err := conf.Parse(string(data))
Expand Down
24 changes: 14 additions & 10 deletions server/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@ import (

func TestDefaultOptions(t *testing.T) {
golden := &Options{
Host: DEFAULT_HOST,
Port: DEFAULT_PORT,
MaxConn: DEFAULT_MAX_CONNECTIONS,
PingInterval: DEFAULT_PING_INTERVAL,
MaxPingsOut: DEFAULT_PING_MAX_OUT,
SslTimeout: float64(SSL_TIMEOUT) / float64(time.Second),
AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
MaxControlLine: MAX_CONTROL_LINE_SIZE,
MaxPayload: MAX_PAYLOAD_SIZE,
Host: DEFAULT_HOST,
Port: DEFAULT_PORT,
MaxConn: DEFAULT_MAX_CONNECTIONS,
PingInterval: DEFAULT_PING_INTERVAL,
MaxPingsOut: DEFAULT_PING_MAX_OUT,
SslTimeout: float64(SSL_TIMEOUT) / float64(time.Second),
AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
MaxControlLine: MAX_CONTROL_LINE_SIZE,
MaxPayload: MAX_PAYLOAD_SIZE,
ClusterAuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
}

opts := &Options{}
processOptions(opts)

if !reflect.DeepEqual(golden, opts) {
t.Fatal("Default options are incorrect")
t.Fatalf("Default Options are incorrect.\nexpected: %+v\ngot: %+v",
golden, opts)
}
}

Expand Down Expand Up @@ -66,6 +68,8 @@ func TestMergeOverrides(t *testing.T) {
Trace: true,
Logtime: false,
HttpPort: DEFAULT_HTTP_PORT,
LogFile: "/tmp/gnatsd.log",
PidFile: "/tmp/gnatsd.pid",
}
fopts, err := ProcessConfigFile("./configs/test.conf")
if err != nil {
Expand Down

0 comments on commit cf0f302

Please sign in to comment.