Skip to content

Commit

Permalink
pull in pubsub library
Browse files Browse the repository at this point in the history
  • Loading branch information
jdrews committed Jun 4, 2022
1 parent ecc48a5 commit e114a3a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 23 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module github.com/jdrews/logstation
go 1.16

require (
github.com/cskr/pubsub v1.0.2 // indirect
github.com/fstab/grok_exporter v0.2.8
github.com/gorilla/websocket v1.4.2
github.com/gorilla/websocket v1.5.0
github.com/labstack/echo/v4 v4.3.0
github.com/prometheus/common v0.4.1 // indirect
github.com/rakyll/statik v0.1.7
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZq
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -16,8 +18,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
Expand Down
50 changes: 30 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,40 @@
package main

import (
"errors"
"github.com/cskr/pubsub"
"github.com/fstab/grok_exporter/tailer/fswatcher"
"github.com/fstab/grok_exporter/tailer/glob"
"github.com/gorilla/websocket"
"github.com/jdrews/logstation/internal"
_ "github.com/jdrews/logstation/statik"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/rakyll/statik/fs"
"github.com/sirupsen/logrus"
"net/http"
"os"
"syscall"
)

var (
upgrader = websocket.Upgrader{}
)

func main() {
pubSub := internal.NewPubsub()
pubSub := pubsub.New(1)

//begin watching the file
go follow("test/logfile.log", pubSub)

e := echo.New()

e.Use(middleware.Logger())

statikFS, err := fs.New()
if err != nil {
e.Logger.Fatal(err)
}

e.Use(middleware.Logger())
e.Use(middleware.Recover())

h := http.FileServer(statikFS)

e.GET("/*", echo.WrapHandler(http.StripPrefix("/", h)))
Expand All @@ -48,35 +49,44 @@ func main() {
e.Logger.Fatal(e.Start(":8081"))
}

func wshandler(c echo.Context, pubSub *internal.Pubsub) error {
func wshandler(c echo.Context, pubSub *pubsub.PubSub) error {
logger := logrus.New()
logger.SetOutput(os.Stdout)
// Disable the following line in production. Using in development so I can `npm start` and dev the frontend. It bypasses CORS
upgrader.CheckOrigin = func(r *http.Request) bool { return true }

var err error

linesChannel := pubSub.Subscribe("lines")

ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
return nil
}
defer ws.Close()

linesChannel := pubSub.Sub("lines")
defer pubSub.Unsub(linesChannel, "lines")

for line := range linesChannel {
// Write
err := ws.WriteMessage(websocket.TextMessage, []byte(line))
err := ws.WriteMessage(websocket.TextMessage, []byte(line.(string)))
if err != nil {
//TODO handle wsasend "An established connection was aborted by the software in your host machine"
// {"time":"2021-08-08T23:56:03.7797377-04:00","level":"ERROR","prefix":"echo","file":"main.go","line":"69","message":"write tcp [::1]:8081->[::1]:27058: wsasend: An established connection was aborted by the software in your host machine."}
c.Logger().Error(err)
if errors.Is(err, syscall.WSAECONNABORTED) {
logger.Warn("Lost connection to websocket client! Maybe they're gone? Closing this connection. More info: ")
logger.Warn(err)
break
} else if errors.Is(err, syscall.WSAECONNRESET) {
logger.Warn("Lost connection to websocket client! Maybe they're gone? Closing this connection. More info: ")
logger.Warn(err)
break
} else {
logger.Error(err)
break
}
}
}
return err
return nil
}

func follow(path string, pubSub *internal.Pubsub) error {
func follow(path string, pubSub *pubsub.PubSub) error {
logger := logrus.New()
logger.Level = logrus.DebugLevel
//logger.Level = logrus.DebugLevel
logger.SetOutput(os.Stdout)

parsedGlob, err := glob.Parse(path)
Expand All @@ -90,7 +100,7 @@ func follow(path string, pubSub *internal.Pubsub) error {
select {
case line := <-tailer.Lines():
logger.Debug(line.Line)
pubSub.Publish("lines", line.Line)
pubSub.Pub(line.Line, "lines")
default:
continue
}
Expand Down

0 comments on commit e114a3a

Please sign in to comment.