Permalink
Browse files

stream can follow files now

  • Loading branch information...
justone committed Jun 12, 2016
1 parent f8373e4 commit 090b63391ff86b5a25fd886d05900cb65064248e
Showing with 56 additions and 10 deletions.
  1. +4 −3 sink.go
  2. +52 −7 stream.go
@@ -1,12 +1,13 @@
package main
import (
"github.com/Sirupsen/logrus"
"fmt"
"github.com/justone/pmb/api"
)
type SinkCommand struct {
Name string `short:"n" long:"name" description:"Sink name."`
Name string `short:"n" long:"name" description:"Stream name." default:"default"`
}
var sinkCommand SinkCommand
@@ -40,7 +41,7 @@ func runSink(bus *pmb.PMB, conn *pmb.Connection, id string) error {
for {
message := <-subConn.In
logrus.Infof("Message received: %s", message.Contents)
fmt.Printf("%s: %s\n", message.Contents["identifier"], message.Contents["data"])
}
return nil
@@ -1,21 +1,29 @@
package main
import (
"time"
"fmt"
"net"
"os"
"github.com/Sirupsen/logrus"
"github.com/hpcloud/tail"
"github.com/justone/pmb/api"
)
type StreamCommand struct {
Name string `short:"n" long:"name" description:"Stream name."`
Name string `short:"n" long:"name" description:"Stream name." default:"default"`
Identifier string `short:"i" long:"identifier" description:"Unique identifier, defaults to local IP."`
File string `short:"f" long:"file" description:"File to stream, follows like 'tail -f'."`
}
var streamCommand StreamCommand
func (x *StreamCommand) Execute(args []string) error {
bus := pmb.GetPMB(globalOptions.Primary)
if _, err := os.Stat(streamCommand.File); os.IsNotExist(err) {
return fmt.Errorf("File %s not found.", streamCommand.File)
}
id := pmb.GenerateRandomID("stream")
conn, err := bus.ConnectClient(id, !globalOptions.TrustKey)
@@ -40,14 +48,51 @@ func runStream(bus *pmb.PMB, conn *pmb.Connection, id string) error {
return err
}
for {
time.Sleep(2 * time.Second)
logrus.Infof("Sending message")
var ident string
if len(streamCommand.Identifier) > 0 {
ident = streamCommand.Identifier
} else {
ident, err = localIP()
if err != nil {
return err
}
}
tailConfig := tail.Config{
Follow: true,
Location: &tail.SeekInfo{
Offset: 0,
Whence: os.SEEK_END,
},
}
fileTail, err := tail.TailFile(streamCommand.File, tailConfig)
if err != nil {
return err
}
for line := range fileTail.Lines {
subConn.Out <- pmb.Message{Contents: map[string]interface{}{
"type": "Stream",
"type": "Stream",
"identifier": ident,
"data": line.Text,
}}
}
return nil
}
func localIP() (string, error) {
hostname, err := os.Hostname()
if err != nil {
return "", err
}
addrs, err := net.LookupHost(hostname)
if err != nil {
return "", err
}
return addrs[0], nil
}

0 comments on commit 090b633

Please sign in to comment.