Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1282 from 4r9h/issue_105_clean
Browse files Browse the repository at this point in the history
Redirecting logs to stdout and errors to stderr. Fixes #105
  • Loading branch information
rafrombrc committed Jan 22, 2015
2 parents 242896c + dd79762 commit a5f57a7
Show file tree
Hide file tree
Showing 15 changed files with 98 additions and 97 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ Backwards Incompatibilities
decoding might need to happen in separate goroutines, through `Deliverer`
objects available from the `NewDeliverer` method.

* Hekad and all clients started using stdio for informational messages and
stderr for error messages.

Bug Handling
------------

Expand Down
7 changes: 7 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ package client

import (
"github.com/mozilla-services/heka/message"
"log"
"os"
)

var (
LogInfo = log.New(os.Stdout, "", log.LstdFlags)
LogError = log.New(os.Stderr, "", log.LstdFlags)
)

type Client struct {
Expand Down
23 changes: 11 additions & 12 deletions cmd/heka-flood/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/mozilla-services/heka/message"
"github.com/mozilla-services/heka/plugins/tcp"
"io"
"log"
"math"
"math/rand"
"os"
Expand Down Expand Up @@ -99,7 +98,7 @@ func timerLoop(count, bytes *uint64, ticker *time.Ticker) {
} else {
zeroes = 0
}
log.Printf("Sent %d messages. %0.2f msg/sec %0.2f Mbit/sec\n", newCount, msgRate, bitRate)
client.LogInfo.Printf("Sent %d messages. %0.2f msg/sec %0.2f Mbit/sec\n", newCount, msgRate, bitRate)
}
}

Expand Down Expand Up @@ -159,7 +158,7 @@ func makeVariableMessage(encoder client.StreamEncoder, items int,

var stream []byte
if err := encoder.EncodeMessageStream(msg, &stream); err != nil {
log.Println(err)
client.LogError.Println(err)
}
ma[x] = stream
}
Expand Down Expand Up @@ -203,7 +202,7 @@ func makePayload(size uint64, rdm *randomDataMaker) (payload string) {
if _, err := io.CopyN(payloadSuffix, rdm, int64(size)); err == nil {
payload = fmt.Sprintf("%s - %s", payload, payloadSuffix.String())
} else {
log.Println("Error getting random string: ", err)
client.LogError.Println("Error getting random string: ", err)
}
return
}
Expand Down Expand Up @@ -263,7 +262,7 @@ func makeFixedMessage(encoder client.StreamEncoder, size uint64,
msg.SetPayload(makePayload(size, rdm))
var stream []byte
if err := encoder.EncodeMessageStream(msg, &stream); err != nil {
log.Println(err)
client.LogError.Println(err)
}
ma[0] = stream
return ma
Expand Down Expand Up @@ -298,20 +297,20 @@ func main() {

var config FloodConfig
if _, err := toml.DecodeFile(*configFile, &config); err != nil {
log.Printf("Error decoding config file: %s", err)
client.LogError.Printf("Error decoding config file: %s", err)
return
}
var test FloodTest
var ok bool
if test, ok = config[*configTest]; !ok {
log.Printf("Configuration test: '%s' was not found", *configTest)
client.LogError.Printf("Configuration test: '%s' was not found", *configTest)
return
}

if test.PprofFile != "" {
profFile, err := os.Create(test.PprofFile)
if err != nil {
log.Fatalln(err)
client.LogError.Fatalln(err)
}
pprof.StartCPUProfile(profFile)
defer pprof.StopCPUProfile()
Expand All @@ -323,14 +322,14 @@ func main() {
var goTlsConfig *tls.Config
goTlsConfig, err = tcp.CreateGoTlsConfig(&test.Tls)
if err != nil {
log.Fatalf("Error creating TLS config: %s\n", err)
client.LogError.Fatalf("Error creating TLS config: %s\n", err)
}
sender, err = client.NewTlsSender(test.Sender, test.IpAddress, goTlsConfig)
} else {
sender, err = client.NewNetworkSender(test.Sender, test.IpAddress)
}
if err != nil {
log.Fatalf("Error creating sender: %s\n", err)
client.LogError.Fatalf("Error creating sender: %s\n", err)
}

unsignedEncoder := client.NewProtobufEncoder(nil)
Expand Down Expand Up @@ -408,7 +407,7 @@ func main() {
}
bytesSent += uint64(len(buf))
if err = sendMessage(sender, buf, corrupt); err != nil {
log.Printf("Error sending message: %s\n", err.Error())
client.LogError.Printf("Error sending message: %s\n", err.Error())
} else {
msgsDelivered++
}
Expand All @@ -418,6 +417,6 @@ func main() {
}
}
sender.Close()
log.Println("Clean shutdown: ", msgsSent, " messages sent; ",
client.LogInfo.Println("Clean shutdown: ", msgsSent, " messages sent; ",
msgsDelivered, " messages delivered.")
}
7 changes: 3 additions & 4 deletions cmd/heka-inject/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"flag"
"github.com/mozilla-services/heka/client"
"github.com/mozilla-services/heka/message"
"log"
"os"
"time"
)
Expand Down Expand Up @@ -74,11 +73,11 @@ func (hc *HekaClient) injectMessage(m *InjectData) (err error) {

err = hc.encoder.EncodeMessageStream(msg, &stream)
if err != nil {
log.Printf("Inject: [error] encode message: %s\n", err)
client.LogError.Printf("Inject: [error] encode message: %s\n", err)
}
err = hc.sender.SendMessage(stream)
if err != nil {
log.Printf("Inject: [error] send message: %s\n", err)
client.LogError.Printf("Inject: [error] send message: %s\n", err)
}
return nil
}
Expand Down Expand Up @@ -122,7 +121,7 @@ func main() {
if err == nil {
err := hc.injectMessage(data)
if err != nil {
log.Printf("Inject: [error] %s\n", err)
client.LogError.Printf("Inject: [error] %s\n", err)
}
}
}
16 changes: 8 additions & 8 deletions cmd/heka-logstreamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"flag"
"fmt"
"github.com/bbangert/toml"
"github.com/mozilla-services/heka/client"
"github.com/mozilla-services/heka/logstreamer"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -62,11 +62,11 @@ func main() {

p, err := os.Open(*configFile)
if err != nil {
log.Fatalf("Error opening config file: %s", err)
client.LogError.Fatalf("Error opening config file: %s", err)
}
fi, err := p.Stat()
if err != nil {
log.Fatalf("Error fetching config file info: %s", err)
client.LogError.Fatalf("Error fetching config file info: %s", err)
}

fconfig := make(FileConfig)
Expand All @@ -81,12 +81,12 @@ func main() {
}
fPath := filepath.Join(*configFile, fName)
if _, err = toml.DecodeFile(fPath, &fconfig); err != nil {
log.Fatalf("Error decoding config file: %s", err)
client.LogError.Fatalf("Error decoding config file: %s", err)
}
}
} else {
if _, err := toml.DecodeFile(*configFile, &fconfig); err != nil {
log.Fatalf("Error decoding config file: %s", err)
client.LogError.Fatalf("Error decoding config file: %s", err)
}
}

Expand Down Expand Up @@ -116,7 +116,7 @@ func parseConfig(name string, prim toml.Primitive) {
LogDirectory: "/var/log",
}
if err := toml.PrimitiveDecode(prim, &config); err != nil {
log.Printf("Error decoding config file: %s", err)
client.LogError.Printf("Error decoding config file: %s", err)
return
}

Expand All @@ -133,11 +133,11 @@ func parseConfig(name string, prim toml.Primitive) {
oldest, _ := time.ParseDuration(config.OldestDuration)
ls, err := logstreamer.NewLogstreamSet(sp, oldest, config.LogDirectory, "")
if err != nil {
log.Fatalf("Error initializing LogstreamSet: %s\n", err.Error())
client.LogError.Fatalf("Error initializing LogstreamSet: %s\n", err.Error())
}
streams, errs := ls.ScanForLogstreams()
if errs.IsError() {
log.Fatalf("Error scanning: %s\n", errs)
client.LogError.Fatalf("Error scanning: %s\n", errs)
}

fmt.Printf("Found %d Logstream(s) for section [%s].\n", len(streams), name)
Expand Down
15 changes: 7 additions & 8 deletions cmd/heka-sbmgr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/mozilla-services/heka/pipeline"
"github.com/mozilla-services/heka/plugins/tcp"
"io/ioutil"
"log"
"os"
"time"
)
Expand All @@ -51,7 +50,7 @@ func main() {

var config SbmgrConfig
if _, err := toml.DecodeFile(*configFile, &config); err != nil {
log.Printf("Error decoding config file: %s", err)
client.LogError.Printf("Error decoding config file: %s", err)
return
}
var sender *client.NetworkSender
Expand All @@ -60,14 +59,14 @@ func main() {
var goTlsConfig *tls.Config
goTlsConfig, err = tcp.CreateGoTlsConfig(&config.Tls)
if err != nil {
log.Fatalf("Error creating TLS config: %s\n", err)
client.LogError.Fatalf("Error creating TLS config: %s\n", err)
}
sender, err = client.NewTlsSender("tcp", config.IpAddress, goTlsConfig)
} else {
sender, err = client.NewNetworkSender("tcp", config.IpAddress)
}
if err != nil {
log.Fatalf("Error creating sender: %s\n", err.Error())
client.LogError.Fatalf("Error creating sender: %s\n", err.Error())
}
encoder := client.NewProtobufEncoder(&config.Signer)
manager := client.NewClient(sender, encoder)
Expand All @@ -84,13 +83,13 @@ func main() {
case "load":
code, err := ioutil.ReadFile(*scriptFile)
if err != nil {
log.Printf("Error reading scriptFile: %s\n", err.Error())
client.LogError.Printf("Error reading scriptFile: %s\n", err.Error())
return
}
msg.SetPayload(string(code))
conf, err := ioutil.ReadFile(*scriptConfig)
if err != nil {
log.Printf("Error reading scriptConfig: %s\n", err.Error())
client.LogError.Printf("Error reading scriptConfig: %s\n", err.Error())
return
}
f, _ := message.NewField("config", string(conf), "toml")
Expand All @@ -99,13 +98,13 @@ func main() {
f, _ := message.NewField("name", *filterName, "")
msg.AddField(f)
default:
log.Printf("Invalid action: %s", *action)
client.LogError.Printf("Invalid action: %s", *action)
}

f1, _ := message.NewField("action", *action, "")
msg.AddField(f1)
err = manager.SendMessage(msg)
if err != nil {
log.Printf("Error sending message: %s\n", err.Error())
client.LogError.Printf("Error sending message: %s\n", err.Error())
}
}
13 changes: 6 additions & 7 deletions cmd/heka-sbmgrload/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/mozilla-services/heka/message"
"github.com/mozilla-services/heka/pipeline"
"github.com/mozilla-services/heka/plugins/tcp"
"log"
"os"
"time"
)
Expand Down Expand Up @@ -101,7 +100,7 @@ preserve_data = true

var config SbmgrConfig
if _, err := toml.DecodeFile(*configFile, &config); err != nil {
log.Printf("Error decoding config file: %s", err)
client.LogError.Printf("Error decoding config file: %s", err)
return
}
var sender *client.NetworkSender
Expand All @@ -110,14 +109,14 @@ preserve_data = true
var goTlsConfig *tls.Config
goTlsConfig, err = tcp.CreateGoTlsConfig(&config.Tls)
if err != nil {
log.Fatalf("Error creating TLS config: %s\n", err)
client.LogError.Fatalf("Error creating TLS config: %s\n", err)
}
sender, err = client.NewTlsSender("tcp", config.IpAddress, goTlsConfig)
} else {
sender, err = client.NewNetworkSender("tcp", config.IpAddress)
}
if err != nil {
log.Fatalf("Error creating sender: %s\n", err.Error())
client.LogError.Fatalf("Error creating sender: %s\n", err.Error())
}
encoder := client.NewProtobufEncoder(&config.Signer)
manager := client.NewClient(sender, encoder)
Expand All @@ -140,7 +139,7 @@ preserve_data = true
msg.AddField(f1)
err = manager.SendMessage(msg)
if err != nil {
log.Printf("Error sending message: %s\n", err.Error())
client.LogError.Printf("Error sending message: %s\n", err.Error())
}
}
case "unload":
Expand All @@ -157,12 +156,12 @@ preserve_data = true
msg.AddField(f1)
err = manager.SendMessage(msg)
if err != nil {
log.Printf("Error sending message: %s\n", err.Error())
client.LogError.Printf("Error sending message: %s\n", err.Error())
}
}

default:
log.Printf("Invalid action: %s\n", *action)
client.LogError.Printf("Invalid action: %s\n", *action)
}

}

0 comments on commit a5f57a7

Please sign in to comment.