Skip to content

Commit

Permalink
refactor(gateway): update nop and influx ingester
Browse files Browse the repository at this point in the history
  • Loading branch information
pyadav committed Feb 8, 2024
1 parent a3493ae commit 1e8b321
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 36 deletions.
2 changes: 1 addition & 1 deletion gateway/internal/api/v1/chatcompletions.go
Expand Up @@ -58,6 +58,6 @@ func (s *V1Handler) ChatCompletions(
"completion_tokens": data.Usage.CompletionTokens,
}

go s.ingester.Ingest(ingesterdata, "logs")
go s.ingester.Ingest(ingesterdata, "analytics")
return connect.NewResponse(data), nil
}
1 change: 1 addition & 0 deletions gateway/internal/ingester/config.go
Expand Up @@ -10,4 +10,5 @@ type InfluxConfig struct {
Host string `yaml:"host" mapstructure:"host" default:"none" json:"host,omitempty"`
Token string `yaml:"token" mapstructure:"token" default:"json" json:"token,omitempty"`
Organization string `yaml:"organization" mapstructure:"organization" default:"json" json:"organization,omitempty"`
Database string `yaml:"database" mapstructure:"database" default:"json" json:"database,omitempty"`
}
29 changes: 13 additions & 16 deletions gateway/internal/ingester/influx/influx.go
Expand Up @@ -11,28 +11,25 @@ import (

type InfluxDBIngester struct {
client *influxdb3.Client
bucket string
Organization string
database string
organization string
logger *slog.Logger
}

func NewInfluxDBIngester(host, token, organization, bucket string, logger *slog.Logger) (*InfluxDBIngester, error) {
client, err := influxdb3.New(influxdb3.ClientConfig{
Host: host,
Token: token,
Organization: organization,
Database: bucket,
})
if err != nil {
return nil, err
// NewOptions creates a new Options instance with provided functional options
func NewInfluxIngester(opts ...Option) *InfluxDBIngester {
options := &Options{}

for _, opt := range opts {
opt(options)
}

return &InfluxDBIngester{
client: client,
Organization: organization,
bucket: bucket,
logger: logger,
}, nil
client: options.client,
database: options.database,
organization: options.organization,
logger: options.logger,
}
}

func (in *InfluxDBIngester) Ingest(data map[string]interface{}, measurement string) {
Expand Down
44 changes: 44 additions & 0 deletions gateway/internal/ingester/influx/options.go
@@ -0,0 +1,44 @@
package influx

import (
"log/slog"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)

type Options struct {
client *influxdb3.Client
database string
organization string
logger *slog.Logger
}

type Option func(*Options)

// WithClient sets the InfluxDB client in Options
func WithClient(client *influxdb3.Client) Option {
return func(o *Options) {
o.client = client
}
}

// WithDatabase sets the database in Options
func WithDatabase(database string) Option {
return func(o *Options) {
o.database = database
}
}

// WithOrganization sets the organization in Options
func WithOrganization(organization string) Option {
return func(o *Options) {
o.organization = organization
}
}

// WithLogger sets the logger in Options
func WithLogger(logger *slog.Logger) Option {
return func(o *Options) {
o.logger = logger
}
}
31 changes: 26 additions & 5 deletions gateway/internal/ingester/ingester.go
Expand Up @@ -3,8 +3,8 @@ package ingester
import (
"context"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/missingstudio/studio/backend/internal/ingester/influx"
"github.com/missingstudio/studio/backend/internal/ingester/noop"
"github.com/sagikazarmark/slog-shim"
)

Expand All @@ -16,18 +16,39 @@ type Ingester interface {

func GetIngester(ctx context.Context, cfg Config, logger *slog.Logger) Ingester {
if !cfg.Enabled {
return &noop.NoOpIngester{}
return nil
}

switch cfg.Provider {
case "influx":
ingester, err := influx.NewInfluxDBIngester(cfg.Influx.Host, cfg.Influx.Token, cfg.Influx.Organization, "logs", logger)
// Create a new client using an InfluxDB server base URL and an authentication token
client, err := influxdb3.New(influxdb3.ClientConfig{
Host: cfg.Influx.Host,
Token: cfg.Influx.Token,
Organization: cfg.Influx.Organization,
Database: cfg.Influx.Database,
})
if err != nil {
logger.Error("error starting influx server", "error", err)
return nil
}
return ingester

return influx.NewInfluxIngester(
influx.WithClient(client),
influx.WithLogger(logger),
influx.WithDatabase(cfg.Influx.Database),
influx.WithOrganization(cfg.Influx.Organization),
)
default:
return &noop.NoOpIngester{}
return nil
}
}

func GetIngesterWithDefault(ctx context.Context, cfg Config, logger *slog.Logger) Ingester {
ingester := GetIngester(ctx, cfg, logger)
if ingester == nil {
ingester = &NopIngester{}
}

return ingester
}
13 changes: 0 additions & 13 deletions gateway/internal/ingester/noop/noop.go

This file was deleted.

11 changes: 11 additions & 0 deletions gateway/internal/ingester/nop.go
@@ -0,0 +1,11 @@
package ingester

type NopIngester struct{}

func (n *NopIngester) Get(key string) ([]map[string]interface{}, error) {
return nil, nil
}
func (n *NopIngester) Ingest(data map[string]interface{}, key string) {}
func (n *NopIngester) Close() error {
return nil
}
2 changes: 1 addition & 1 deletion gateway/internal/server/server.go
Expand Up @@ -14,7 +14,7 @@ import (
)

func Serve(ctx context.Context, logger *slog.Logger, cfg *config.Config) error {
ingester := ingester.GetIngester(ctx, cfg.Ingester, logger)
ingester := ingester.GetIngesterWithDefault(ctx, cfg.Ingester, logger)
connectMux, err := connectrpc.NewConnectMux(v1.NewDeps(ingester))
if err != nil {
logger.Error("connect rpc mux not created", err)
Expand Down

0 comments on commit 1e8b321

Please sign in to comment.