Skip to content
This repository has been archived by the owner on Jul 9, 2021. It is now read-only.

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
pechorin committed Apr 17, 2020
1 parent a90ea43 commit 8e542c8
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 70 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -8,6 +8,7 @@ require (
github.com/gorilla/websocket v1.4.0
github.com/jmoiron/sqlx v1.2.0
github.com/mailru/go-clickhouse v1.3.0
github.com/mitchellh/mapstructure v1.1.2
github.com/namsral/flag v1.7.4-pre
github.com/satori/go.uuid v1.2.0 // indirect
)
1 change: 1 addition & 0 deletions go.sum
Expand Up @@ -99,6 +99,7 @@ github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
83 changes: 16 additions & 67 deletions main.go
Expand Up @@ -39,27 +39,6 @@ type App struct {
ClientsIdSerialMux *sync.Mutex
}

// ClientStream represents connected with websocket client
// this struct hold all connected information, like current query, interval
// last ping time, configs etc.
type ClientSession struct {
Id int64

Query ClientQuery
QueryRunners []*chan int

Active bool
FetchInterval int16

CreatedAt time.Time
ClosedAt time.Time
LastKeepaliveAt time.Time
}

type ClientQuery struct {
RawQuery string
}

func (app *App) Log(message string) {
if app.Debug {
fmt.Println(message)
Expand Down Expand Up @@ -97,6 +76,22 @@ func New() *App {
return app
}

// move to monitor log ?
func (app *App) ClientSessionsMonitor() error {
timer := time.Tick(30 * time.Second)

go func() {
for {
select {
case <-timer:
log.Printf("MONITOR - sessions: %d, fetching: %d", len(app.Clients), 0)
}
}
}()

return nil
}

func (app *App) CreateClientSession() (cs *ClientSession, err error) {
cs = new(ClientSession)

Expand All @@ -117,52 +112,6 @@ func (app *App) CreateClientSession() (cs *ClientSession, err error) {
return cs, err
}

func (cs *ClientSession) Close() {
cs.Active = false
cs.ClosedAt = time.Now()

for _, r := range cs.QueryRunners {
close(*r)
}
}

func (cs *ClientSession) StartQueryRunner(app *App, results chan struct{}, ctrl chan int) error {
log.Printf("StartQuery runned %+v", cs)

go func() {
for {
select {
case _, ok := <-ctrl:
if !ok {
log.Printf("StartQuery closed -> %+v", cs)
return
}
default:
log.Printf("StartQuery FETCH %+v", cs)
time.Sleep((time.Duration)(FetchIntervals[0]) * time.Second)
}
}
}()

return nil
}

// move to monitor log ?
func (app *App) ClientSessionsMonitor() error {
timer := time.Tick(30 * time.Second)

go func() {
for {
select {
case <-timer:
log.Printf("MONITOR - sessions: %d, fetching: %d", len(app.Clients), 0)
}
}
}()

return nil
}

func main() {
app := New()
defer app.Clickhouse.Close()
Expand Down
85 changes: 82 additions & 3 deletions web.go
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/json"
"log"
"text/template"
"time"

"github.com/gin-gonic/gin"
mapstruct "github.com/mitchellh/mapstructure"
)

var (
Expand All @@ -32,6 +34,30 @@ type SocketActionResponse struct {
Payload map[string]interface{} `json:"payload"`
}

// ClientStream represents connected with websocket client
// this struct hold all connected information, like current query, interval
// last ping time, configs etc.
type ClientSession struct {
Id int64
QueryRunners []*chan int

Active bool
FetchInterval int

CreatedAt time.Time
ClosedAt time.Time
LastKeepaliveAt time.Time
}

type ClientQuery struct {
Query string
FetchInterval int16 `mapstructure:"fetch_interval"`
}

type ClientQueries struct {
Queries []ClientQuery
}

func (app *App) renderError(c *gin.Context, err error) {
c.String(500, err.Error())
}
Expand Down Expand Up @@ -118,12 +144,27 @@ func (app *App) websocketController(c *gin.Context) {
// assign to session
client.QueryRunners = runners

// start new query runners
if err := client.StartQueryRunner(app, resultsCh, ctrlCh); err != nil {
log.Printf("error -> %s", err.Error())
queries := new(ClientQueries)
if err := mapstruct.Decode(act.Payload, queries); err != nil {
log.Printf("decode error -> %s", err.Error())
break
}

log.Printf("----- decoded queires -> %+v", queries)

for _, query := range queries.Queries {
if valid := query.IsValid(); valid == false {
log.Printf("invalid query -> %s :: %+v", err.Error(), query)
break
}

// start query runner
if err := client.StartQueryRunner(app, resultsCh, ctrlCh, query); err != nil {
log.Printf("error -> %s", err.Error())
break
}
}

default:
log.Printf("Unknown SocketAction -> %+v", act)
}
Expand Down Expand Up @@ -168,3 +209,41 @@ func (app *App) faviconController(c *gin.Context) {
c.Writer.WriteHeader(200)
c.Writer.WriteString(html)
}

func (cs *ClientSession) Close() {
cs.Active = false
cs.ClosedAt = time.Now()

for _, r := range cs.QueryRunners {
close(*r)
}
}

func (cs *ClientSession) StartQueryRunner(app *App, results chan struct{}, ctrl chan int, query ClientQuery) error {
log.Printf("StartQuery runned %+v", query)

go func() {
for {
select {
case _, ok := <-ctrl:
if !ok {
log.Printf("StartQuery closed -> %+v", query)
return
}
default:
log.Printf("StartQuery FETCH %+v", query)
time.Sleep((time.Duration)(query.FetchInterval) * time.Second)
}
}
}()

return nil
}

func (q *ClientQuery) IsValid() (result bool) {
if q.FetchInterval > 0 {
return true
} else {
return false
}
}

0 comments on commit 8e542c8

Please sign in to comment.