From 8e542c823382e7ba9f5bd1e27eaf55d51d09991d Mon Sep 17 00:00:00 2001 From: pechorin Date: Fri, 17 Apr 2020 21:43:48 +0300 Subject: [PATCH] updates --- go.mod | 1 + go.sum | 1 + main.go | 83 +++++++++++-------------------------------------------- web.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 100 insertions(+), 70 deletions(-) diff --git a/go.mod b/go.mod index efe9cbb..7b3ff8b 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gorilla/websocket v1.4.0 github.com/jmoiron/sqlx v1.2.0 github.com/mailru/go-clickhouse v1.3.0 + github.com/mitchellh/mapstructure v1.1.2 github.com/namsral/flag v1.7.4-pre github.com/satori/go.uuid v1.2.0 // indirect ) diff --git a/go.sum b/go.sum index b0c4aec..2feabde 100644 --- a/go.sum +++ b/go.sum @@ -99,6 +99,7 @@ github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/ github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/main.go b/main.go index 592d83c..2f83497 100644 --- a/main.go +++ b/main.go @@ -39,27 +39,6 @@ type App struct { ClientsIdSerialMux *sync.Mutex } -// ClientStream represents connected with websocket client -// this struct hold all connected information, like current query, interval -// last ping time, configs etc. -type ClientSession struct { - Id int64 - - Query ClientQuery - QueryRunners []*chan int - - Active bool - FetchInterval int16 - - CreatedAt time.Time - ClosedAt time.Time - LastKeepaliveAt time.Time -} - -type ClientQuery struct { - RawQuery string -} - func (app *App) Log(message string) { if app.Debug { fmt.Println(message) @@ -97,6 +76,22 @@ func New() *App { return app } +// move to monitor log ? +func (app *App) ClientSessionsMonitor() error { + timer := time.Tick(30 * time.Second) + + go func() { + for { + select { + case <-timer: + log.Printf("MONITOR - sessions: %d, fetching: %d", len(app.Clients), 0) + } + } + }() + + return nil +} + func (app *App) CreateClientSession() (cs *ClientSession, err error) { cs = new(ClientSession) @@ -117,52 +112,6 @@ func (app *App) CreateClientSession() (cs *ClientSession, err error) { return cs, err } -func (cs *ClientSession) Close() { - cs.Active = false - cs.ClosedAt = time.Now() - - for _, r := range cs.QueryRunners { - close(*r) - } -} - -func (cs *ClientSession) StartQueryRunner(app *App, results chan struct{}, ctrl chan int) error { - log.Printf("StartQuery runned %+v", cs) - - go func() { - for { - select { - case _, ok := <-ctrl: - if !ok { - log.Printf("StartQuery closed -> %+v", cs) - return - } - default: - log.Printf("StartQuery FETCH %+v", cs) - time.Sleep((time.Duration)(FetchIntervals[0]) * time.Second) - } - } - }() - - return nil -} - -// move to monitor log ? -func (app *App) ClientSessionsMonitor() error { - timer := time.Tick(30 * time.Second) - - go func() { - for { - select { - case <-timer: - log.Printf("MONITOR - sessions: %d, fetching: %d", len(app.Clients), 0) - } - } - }() - - return nil -} - func main() { app := New() defer app.Clickhouse.Close() diff --git a/web.go b/web.go index 41da39f..5b984bb 100644 --- a/web.go +++ b/web.go @@ -5,8 +5,10 @@ import ( "encoding/json" "log" "text/template" + "time" "github.com/gin-gonic/gin" + mapstruct "github.com/mitchellh/mapstructure" ) var ( @@ -32,6 +34,30 @@ type SocketActionResponse struct { Payload map[string]interface{} `json:"payload"` } +// ClientStream represents connected with websocket client +// this struct hold all connected information, like current query, interval +// last ping time, configs etc. +type ClientSession struct { + Id int64 + QueryRunners []*chan int + + Active bool + FetchInterval int + + CreatedAt time.Time + ClosedAt time.Time + LastKeepaliveAt time.Time +} + +type ClientQuery struct { + Query string + FetchInterval int16 `mapstructure:"fetch_interval"` +} + +type ClientQueries struct { + Queries []ClientQuery +} + func (app *App) renderError(c *gin.Context, err error) { c.String(500, err.Error()) } @@ -118,12 +144,27 @@ func (app *App) websocketController(c *gin.Context) { // assign to session client.QueryRunners = runners - // start new query runners - if err := client.StartQueryRunner(app, resultsCh, ctrlCh); err != nil { - log.Printf("error -> %s", err.Error()) + queries := new(ClientQueries) + if err := mapstruct.Decode(act.Payload, queries); err != nil { + log.Printf("decode error -> %s", err.Error()) break } + log.Printf("----- decoded queires -> %+v", queries) + + for _, query := range queries.Queries { + if valid := query.IsValid(); valid == false { + log.Printf("invalid query -> %s :: %+v", err.Error(), query) + break + } + + // start query runner + if err := client.StartQueryRunner(app, resultsCh, ctrlCh, query); err != nil { + log.Printf("error -> %s", err.Error()) + break + } + } + default: log.Printf("Unknown SocketAction -> %+v", act) } @@ -168,3 +209,41 @@ func (app *App) faviconController(c *gin.Context) { c.Writer.WriteHeader(200) c.Writer.WriteString(html) } + +func (cs *ClientSession) Close() { + cs.Active = false + cs.ClosedAt = time.Now() + + for _, r := range cs.QueryRunners { + close(*r) + } +} + +func (cs *ClientSession) StartQueryRunner(app *App, results chan struct{}, ctrl chan int, query ClientQuery) error { + log.Printf("StartQuery runned %+v", query) + + go func() { + for { + select { + case _, ok := <-ctrl: + if !ok { + log.Printf("StartQuery closed -> %+v", query) + return + } + default: + log.Printf("StartQuery FETCH %+v", query) + time.Sleep((time.Duration)(query.FetchInterval) * time.Second) + } + } + }() + + return nil +} + +func (q *ClientQuery) IsValid() (result bool) { + if q.FetchInterval > 0 { + return true + } else { + return false + } +}