Skip to content

Commit

Permalink
Merge pull request #13 from usedatabrew/feature/log-processor
Browse files Browse the repository at this point in the history
Feature/log processor
  • Loading branch information
le-vlad committed Jan 2, 2024
2 parents 526084b + 800caba commit d4504be
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 12 deletions.
7 changes: 6 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@
.idea/dataSources
.idea/dataSources.local.xml
fly.toml
build/
build/
.fleet/
examples/
Formula/
dist/
.github/
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*.so
*.dylib

examples/websocket_to_pg_stream/postgres_storage

# Test binary, built with `go test -c`
*.test

Expand Down
12 changes: 9 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21-bookworm
FROM golang:1.21-alpine as build

WORKDIR /app

Expand All @@ -8,8 +8,14 @@ RUN go mod tidy

RUN cd cmd/blink && go build

FROM busybox

WORKDIR /app

COPY --from=build /app/cmd/blink/blink /app/

ENV GOMAXPROCS=2

ENTRYPOINT ["./cmd/blink/blink"]
ENTRYPOINT ["/app/blink"]

CMD ["./cmd/blink/blink"]
CMD ["/app/blink"]
29 changes: 29 additions & 0 deletions examples/websocket_to_pg_stream/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: "blink_ws"

services:
blink_instance:
container_name: "blink"
image: usedatabrew/blink:1.5.2
ports:
- 3333:3333
volumes:
- ./blink.yaml:/blink.yaml
command: [ "start", "--config", "/blink.yaml" ]
networks:
- postgres
postgres:
image: postgres:14-alpine
ports:
- 5432:5432
volumes:
- ./postgres_storage:/var/lib/postgresql/data
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: lorem123
POSTGRES_DB: db
networks:
- postgres

networks:
postgres:
driver: bridge
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ require (
github.com/jackc/pgx/v5 v5.4.3
github.com/prometheus/client_golang v1.11.1
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/reiver/go-cast v0.0.0-20220716221226-5360a56d42f4
github.com/sashabaranov/go-openai v1.17.9
github.com/spf13/cobra v1.6.1
github.com/twmb/franz-go v1.15.2
Expand Down Expand Up @@ -72,7 +71,6 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/reiver/go-fck v0.0.1 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
Expand Down
232 changes: 228 additions & 4 deletions go.sum

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions internal/processors/log/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package log

type Config struct {
StreamName string `json:"stream" yaml:"stream"`
}
34 changes: 34 additions & 0 deletions internal/processors/log/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package log

import (
"context"
"github.com/charmbracelet/log"
"github.com/usedatabrew/blink/internal/message"
"github.com/usedatabrew/blink/internal/schema"
"github.com/usedatabrew/blink/internal/stream_context"
)

type Plugin struct {
config Config
ctx *stream_context.Context
log *log.Logger
}

func NewLogPlugin(appctx *stream_context.Context, config Config) (*Plugin, error) {
return &Plugin{config: config, ctx: appctx, log: log.WithPrefix("log-processor")}, nil
}

func (p *Plugin) Process(context context.Context, msg *message.Message) (*message.Message, error) {
if p.config.StreamName == "*" || msg.GetStream() == p.config.StreamName {
jsonMarshaledMessage, _ := msg.Data.MarshalJSON()
p.log.Info("message received", "message", string(jsonMarshaledMessage))
}

return msg, nil
}

// EvolveSchema will not be executed for
func (p *Plugin) EvolveSchema(streamSchema *schema.StreamSchemaObj) error {

return nil
}
1 change: 1 addition & 0 deletions internal/processors/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
LambdaProcessor ProcessorDriver = "lambda"
HttpProcessor ProcessorDriver = "http"
SQLEnrichProcessor ProcessorDriver = "sql_enrich"
LogProcessor ProcessorDriver = "log"
)

type DataProcessor interface {
Expand Down
3 changes: 1 addition & 2 deletions internal/sinks/stdout/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stdout

import (
"context"
"fmt"
"github.com/charmbracelet/log"
"github.com/usedatabrew/blink/internal/message"
"github.com/usedatabrew/blink/internal/schema"
Expand Down Expand Up @@ -40,8 +39,8 @@ func (s *SinkPlugin) GetType() sinks.SinkDriver {
return sinks.StdOutSinkType
}

// SetExpectedSchema for Stdout component does nothing, since this component is used mostly for debugging
func (s *SinkPlugin) SetExpectedSchema(schema []schema.StreamSchema) {
fmt.Println("Expected sink schema", schema)
}

func (s *SinkPlugin) Stop() {
Expand Down
7 changes: 7 additions & 0 deletions public/stream/processor_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/usedatabrew/blink/internal/metrics"
"github.com/usedatabrew/blink/internal/processors"
"github.com/usedatabrew/blink/internal/processors/http"
logProc "github.com/usedatabrew/blink/internal/processors/log"
"github.com/usedatabrew/blink/internal/processors/openai"
sqlproc "github.com/usedatabrew/blink/internal/processors/sql"
"github.com/usedatabrew/blink/internal/schema"
Expand Down Expand Up @@ -77,6 +78,12 @@ func (p *ProcessorWrapper) LoadDriver(driver processors.ProcessorDriver, cfg int
panic("can read driver config")
}
return http.NewHttpPlugin(p.ctx, driverConfig)
case processors.LogProcessor:
driverConfig, err := ReadDriverConfig[logProc.Config](cfg, logProc.Config{})
if err != nil {
panic("can read driver config")
}
return logProc.NewLogPlugin(p.ctx, driverConfig)
default:
return nil, errors.New("unregistered driver provided")
}
Expand Down

0 comments on commit d4504be

Please sign in to comment.