Skip to content

Commit

Permalink
Sink feedback (#243)
Browse files Browse the repository at this point in the history
* improve sink feedback so smart stuff can be built on it

* but not that

* cleanup logging, etc

* Rethink manifold

* add retry after headers

* but actually set the header

* make relay retry faster

* make manifold distribution errors a little more obvious

* clean up topic <-> stream config

* Scrap manifold conifg

* bump quickstart and devel version
  • Loading branch information
jakthom committed Apr 22, 2022
1 parent aeb1907 commit ec5a47c
Show file tree
Hide file tree
Showing 37 changed files with 505 additions and 385 deletions.
2 changes: 1 addition & 1 deletion .VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.5.1
v0.5.2
10 changes: 4 additions & 6 deletions cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type App struct {
config *config.Config
engine *gin.Engine
schemaCache *cache.SchemaCache
manifold *manifold.Manifold
manifold *manifold.SimpleManifold
sinks []sink.Sink
meta *tele.Meta
}
Expand Down Expand Up @@ -92,11 +92,12 @@ func (a *App) initializeSinks() {

func (a *App) initializeManifold() {
log.Info().Msg("initializing manifold")
m, err := manifold.BuildManifold(a.config.Manifold, &a.sinks)
manifold := manifold.SimpleManifold{}
err := manifold.Initialize(&a.sinks)
if err != nil {
log.Fatal().Stack().Err(err).Msg("could not build manifold")
}
a.manifold = m
a.manifold = &manifold
}

func (a *App) initializeRouter() {
Expand Down Expand Up @@ -287,15 +288,12 @@ func (a *App) Run() {
// Safe shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
shutDownManifold := make(chan bool, 1)
a.manifold.Run(a.meta, &shutDownManifold)
<-quit
log.Info().Msg("shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal().Stack().Err(err).Msg("server forced to shutdown")
}
shutDownManifold <- true
tele.Sis(a.meta)
}
2 changes: 1 addition & 1 deletion examples/devel/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.9"
services:
honeypot:
container_name: honeypot
image: ghcr.io/silverton-io/honeypot:v0.5.1
image: ghcr.io/silverton-io/honeypot:v0.5.2
volumes:
- type: bind
source: ./honeypot/devel.conf.yml
Expand Down
16 changes: 9 additions & 7 deletions examples/devel/honeypot/devel.conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,10 @@ schemaCache:
schemaDirectory:
enabled: true

manifold:
bufferRecordThreshold: 1
bufferByteThreshold: 1024
bufferTimeThreshold: 60

sinks:
- name: mysql
type: mysql
deliveryRequired: true
mysqlHost: mysql
mysqlPort: 3306
mysqlDbName: honeypot
Expand All @@ -104,6 +100,7 @@ sinks:
invalidTable: honeypot_invalid
- name: postgres
type: postgres
deliveryRequired: true
pgHost: postgres
pgPort: 5432
pgDbName: honeypot
Expand All @@ -113,6 +110,7 @@ sinks:
invalidTable: honeypot_invalid
- name: materialize
type: materialize
deliveryRequired: true
mzHost: materialized
mzPort: 6875
mzDbName: materialize
Expand All @@ -122,6 +120,7 @@ sinks:
invalidTable: honeypot_invalid
- name: clickhouse
type: clickhouse
deliveryRequired: true
clickhouseHost: clickhouse
clickhousePort: 9000
clickhouseDbName: honeypot
Expand All @@ -131,6 +130,7 @@ sinks:
invalidTable: honeypot_invalid
- name: mango
type: mongodb
deliveryRequired: true
mongoHosts:
- mongo
mongoDbPort: 27017
Expand All @@ -141,12 +141,14 @@ sinks:
invalidCollection: honeypotInvalid
- name: thepanda
type: kafka
deliveryRequired: true
kafkaBrokers:
- redpanda-1:29092
validEventTopic: honeypot-valid
invalidEventTopic: honeypot-invalid
validTopic: honeypot-valid
invalidTopic: honeypot-invalid
- name: elk
type: elasticsearch
deliveryRequired: true
elasticsearchHosts:
- http://elasticsearch:9200
elasticsearchUsername: elastic
Expand Down
2 changes: 1 addition & 1 deletion examples/quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.9"
services:
honeypot:
container_name: honeypot
image: ghcr.io/silverton-io/honeypot:v0.5.1
image: ghcr.io/silverton-io/honeypot:v0.5.2
volumes:
- type: bind
source: ./honeypot/quickstart.conf.yml
Expand Down
10 changes: 3 additions & 7 deletions examples/quickstart/honeypot/quickstart.conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,16 @@ schemaCache:
schemaDirectory:
enabled: true

manifold:
bufferRecordThreshold: 1
bufferByteThreshold: 1024
bufferTimeThreshold: 60

sinks:
- name: primary
type: kafka
deliveryRequired: true
kafkaBrokers:
- redpanda-1:29092 # internally advertised
- redpanda-2:29093 # internally advertised
- redpanda-3:29094 # internally advertised
invalidEventTopic: hpt-invalid
validEventTopic: hpt-valid
invalidTopic: hpt-invalid
validTopic: hpt-valid

squawkBox:
enabled: true
Expand Down
3 changes: 0 additions & 3 deletions pkg/config/manifold.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
package config

type Manifold struct {
BufferRecordThreshold int `json:"bufferRecordThreshold"`
BufferByteThreshold int `json:"bufferByteThreshold"`
BufferTimeThreshold int `json:"bufferTimeThreshold"`
}
16 changes: 10 additions & 6 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package config

type Sink struct {
Name string `json:"name"`
Type string `json:"type"`
Project string `json:"project,omitempty"`
KafkaBrokers []string `json:"kakfaBrokers,omitempty"`
Name string `json:"name"`
Type string `json:"type"`
DeliveryRequired bool `json:"deliveryRequired"`
Project string `json:"project,omitempty"`
KafkaBrokers []string `json:"kakfaBrokers,omitempty"`
// Kafka, Pubsub
ValidEventTopic string `json:"validEventTopic,omitempty"`
InvalidEventTopic string `json:"invalidEventTopic,omitempty"`
ValidTopic string `json:"validTopic,omitempty"`
InvalidTopic string `json:"invalidTopic,omitempty"`
// Kinesis
ValidStream string `json:"validStream,omitempty"`
InvalidStream string `json:"invalidStream,omitempty"`
// Relay, HTTP/S, etc
ValidUrl string `json:"validUrl,omitempty"`
InvalidUrl string `json:"invalidUrl,omitempty"`
Expand Down
9 changes: 7 additions & 2 deletions pkg/handler/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ func CloudeventsHandler(h EventHandlerParams) gin.HandlerFunc {
if c.ContentType() == "application/cloudevents+json" || c.ContentType() == "application/cloudevents-batch+json" {
envelopes := envelope.BuildCloudeventEnvelopesFromRequest(c, *h.Config)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
h.Manifold.Enqueue(annotatedEnvelopes)
c.JSON(http.StatusOK, response.Ok)
err := h.Manifold.Distribute(annotatedEnvelopes)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_60)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
} else {
c.JSON(http.StatusOK, response.Ok)
}
} else {
c.JSON(http.StatusBadRequest, response.InvalidContentType)
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/handler/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ func GenericHandler(h EventHandlerParams) gin.HandlerFunc {
if c.ContentType() == "application/json" {
envelopes := envelope.BuildGenericEnvelopesFromRequest(c, *h.Config)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
h.Manifold.Enqueue(annotatedEnvelopes)
c.JSON(200, response.Ok)
err := h.Manifold.Distribute(annotatedEnvelopes)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_60)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
} else {
c.JSON(200, response.Ok)
}

} else {
c.JSON(http.StatusBadRequest, response.InvalidContentType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (
type EventHandlerParams struct {
Config *config.Config
Cache *cache.SchemaCache
Manifold *manifold.Manifold
Manifold *manifold.SimpleManifold
}
11 changes: 10 additions & 1 deletion pkg/handler/relay.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package handler

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/silverton-io/honeypot/pkg/envelope"
"github.com/silverton-io/honeypot/pkg/response"
)

// RelayHandler processes incoming envelopes, splits them in half,
// and sends them to the configured sink. It relies on upstream validation.
func RelayHandler(h EventHandlerParams) gin.HandlerFunc {
fn := func(c *gin.Context) {
envelopes := envelope.BuildRelayEnvelopesFromRequest(c)
h.Manifold.Enqueue(envelopes)
err := h.Manifold.Distribute(envelopes)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_3)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
} else {
c.JSON(200, response.Ok)
}
}
return gin.HandlerFunc(fn)
}
9 changes: 7 additions & 2 deletions pkg/handler/snowplow.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@ func SnowplowHandler(h EventHandlerParams) gin.HandlerFunc {
fn := func(c *gin.Context) {
envelopes := envelope.BuildSnowplowEnvelopesFromRequest(c, *h.Config)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
h.Manifold.Enqueue(annotatedEnvelopes)
err := h.Manifold.Distribute(annotatedEnvelopes)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_60)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
} else {
c.JSON(http.StatusOK, response.Ok)
}
if c.Request.Method == http.MethodGet {
redirectUrl, _ := c.GetQuery("u")
if redirectUrl != "" && h.Config.Snowplow.OpenRedirectsEnabled {
log.Info().Msg("redirecting to " + redirectUrl)
c.Redirect(http.StatusFound, redirectUrl)
}
}
c.JSON(http.StatusOK, response.Ok)
}
return gin.HandlerFunc(fn)
}
9 changes: 7 additions & 2 deletions pkg/handler/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ func WebhookHandler(h EventHandlerParams) gin.HandlerFunc {
if c.ContentType() == "application/json" {
envelopes := envelope.BuildWebhookEnvelopesFromRequest(c)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
h.Manifold.Enqueue(annotatedEnvelopes)
c.JSON(http.StatusOK, response.Ok)
err := h.Manifold.Distribute(annotatedEnvelopes)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_60)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
} else {
c.JSON(http.StatusOK, response.Ok)
}
} else {
c.JSON(http.StatusBadRequest, response.InvalidContentType)
}
Expand Down
Loading

0 comments on commit ec5a47c

Please sign in to comment.