Skip to content

Commit

Permalink
Schema metadata (#220)
Browse files Browse the repository at this point in the history
* Create annotator package
* Add schema metadata to jsonschemas
* Write schema metadata as headers/attributes to kafka/pubsub respectively
  • Loading branch information
jakthom committed Apr 1, 2022
1 parent d7dbaa7 commit c4874c8
Show file tree
Hide file tree
Showing 37 changed files with 250 additions and 118 deletions.
2 changes: 1 addition & 1 deletion .VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.1.43
v0.1.44
2 changes: 1 addition & 1 deletion cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (a *App) Run() {
a.manifold.Run(a.meta, &shutDownManifold)
<-quit
log.Info().Msg("shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal().Stack().Err(err).Msg("server forced to shutdown")
Expand Down
2 changes: 1 addition & 1 deletion examples/elasticsearch-kibana/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.9"
services:
honeypot:
container_name: honeypot
image: ghcr.io/silverton-io/honeypot:v0.1.43
image: ghcr.io/silverton-io/honeypot:v0.1.44
volumes:
- type: bind
source: ./honeypot/elastic-kibana.conf.yml
Expand Down
4 changes: 2 additions & 2 deletions examples/honeypot-honeypot-relay/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.9"
services:
honeypot:
container_name: honeypot
image: ghcr.io/silverton-io/honeypot:v0.1.43
image: ghcr.io/silverton-io/honeypot:v0.1.44
volumes:
- type: bind
source: ./honeypot/honeypot1.conf.yml
Expand All @@ -30,7 +30,7 @@ services:

relay:
container_name: relay
image: ghcr.io/silverton-io/honeypot:v0.1.43
image: ghcr.io/silverton-io/honeypot:v0.1.44
volumes:
- type: bind
source: ./honeypot/honeypot2.conf.yml
Expand Down
2 changes: 1 addition & 1 deletion examples/quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.9"
services:
honeypot:
container_name: honeypot
image: ghcr.io/silverton-io/honeypot:v0.1.43
image: ghcr.io/silverton-io/honeypot:v0.1.44
volumes:
- type: bind
source: ./honeypot/quickstart.conf.yml
Expand Down
56 changes: 56 additions & 0 deletions pkg/annotator/annotator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package annotator

import (
"github.com/rs/zerolog/log"
"github.com/silverton-io/honeypot/pkg/cache"
"github.com/silverton-io/honeypot/pkg/envelope"
"github.com/silverton-io/honeypot/pkg/protocol"
"github.com/silverton-io/honeypot/pkg/validator"
"github.com/tidwall/gjson"
)

func getMetadataFromSchema(schema []byte) envelope.EventMetadata {
schemaContents := gjson.ParseBytes(schema)
vendor := schemaContents.Get("self.vendor").String()
primaryNamespace := schemaContents.Get("self.primaryNamespace").String()
secondaryNamespace := schemaContents.Get("self.secondaryNamespace").String()
tertiaryNamespace := schemaContents.Get("self.tertiaryNamespace").String()
name := schemaContents.Get("self.name").String()
version := schemaContents.Get("self.version").String()
format := schemaContents.Get("self.format").String()
path := schemaContents.Get("title").String()
return envelope.EventMetadata{
Vendor: vendor,
PrimaryNamespace: primaryNamespace,
SecondaryNamespace: secondaryNamespace,
TertiaryNamespace: tertiaryNamespace,
Name: name,
Version: version,
Format: format,
Path: path,
}
}

func Annotate(envelopes []envelope.Envelope, cache *cache.SchemaCache) []envelope.Envelope {
var e []envelope.Envelope
for _, envelope := range envelopes {
log.Debug().Msg("annotating event")
switch envelope.EventProtocol {
case protocol.WEBHOOK:
var schema []byte
eventMetadata := getMetadataFromSchema(schema)
envelope.EventMetadata = &eventMetadata
e = append(e, envelope)
case protocol.RELAY:
e = append(e, envelope) // Don't annotate
default:
isValid, validationError, schemaContents := validator.ValidateEvent(envelope.Payload, cache)
envelope.IsValid = &isValid
envelope.ValidationError = &validationError
eventMetadata := getMetadataFromSchema(schemaContents)
envelope.EventMetadata = &eventMetadata
e = append(e, envelope)
}
}
return e
}
34 changes: 30 additions & 4 deletions pkg/envelope/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ import (
"github.com/silverton-io/honeypot/pkg/event"
)

const (
EVENT_VENDOR string = "vendor"
EVENT_PRIMARY_NAMESPACE string = "primaryNamespace"
EVENT_SECONDARY_NAMESPACE string = "secondaryNamespace"
EVENT_TERTIARY_NAMESPACE string = "tertiaryNamespace"
EVENT_NAME string = "name"
EVENT_VERSION string = "version"
EVENT_FORMAT string = "format"
EVENT_PATH string = "path"
)

type PayloadValidationError struct {
Field string `json:"field"`
Description string `json:"description"`
Expand All @@ -20,10 +31,10 @@ type ValidationError struct {
}

type Envelope struct {
Id uuid.UUID `json:"id"`
EventProtocol string `json:"eventProtocol"`
EventSchema string `json:"eventSchema"`
Source string `json:"source"`
Id uuid.UUID `json:"id"`
EventProtocol string `json:"eventProtocol"`
EventMetadata *EventMetadata `json:"eventMetadata"`
SourceMetadata `json:"sourceMetadata"`
Tstamp time.Time `json:"tstamp"`
Ip string `json:"ip"`
IsValid *bool `json:"isValid"`
Expand All @@ -32,3 +43,18 @@ type Envelope struct {
ValidationError *ValidationError `json:"validationErrors"`
Payload event.Event `json:"payload"`
}

type EventMetadata struct {
Vendor string `json:"vendor,omitempty"`
PrimaryNamespace string `json:"primaryNamespace,omitempty"`
SecondaryNamespace string `json:"secondaryNamespace,omitempty"`
TertiaryNamespace string `json:"tertiaryNamespace,omitempty"`
Name string `json:"name,omitempty"`
Version string `json:"version,omitempty"`
Format string `json:"format,omitempty"`
Path string `json:"path,omitempty"`
}

type SourceMetadata struct {
Name string `json:"name,omitempty"`
}
9 changes: 0 additions & 9 deletions pkg/envelope/envelopeBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,10 @@ import (

func buildSnowplowEnvelope(spEvent snowplow.SnowplowEvent) Envelope {
isRelayed := false
schema := spEvent.Schema()
uid := uuid.New()
if schema == nil {
schema = spEvent.EventName // FIXME? Is this the right approach?
}
envelope := Envelope{
Id: uid,
EventProtocol: protocol.SNOWPLOW,
EventSchema: *schema,
Tstamp: time.Now().UTC(),
Ip: *spEvent.UserIpAddress,
Payload: spEvent,
Expand Down Expand Up @@ -74,7 +69,6 @@ func BuildGenericEnvelopesFromRequest(c *gin.Context, conf config.Config) []Enve
envelope := Envelope{
Id: uid,
EventProtocol: protocol.GENERIC,
EventSchema: genEvent.Payload.Schema,
Tstamp: time.Now().UTC(),
Ip: c.ClientIP(),
Payload: genEvent,
Expand All @@ -99,9 +93,7 @@ func BuildCloudeventEnvelopesFromRequest(c *gin.Context, conf config.Config) []E
envelope := Envelope{
Id: uid,
EventProtocol: protocol.CLOUDEVENTS,
EventSchema: cEvent.DataSchema,
Tstamp: time.Now().UTC(),
Source: cEvent.Source,
Ip: c.ClientIP(),
Payload: cEvent,
IsRelayed: &isRelayed,
Expand Down Expand Up @@ -175,7 +167,6 @@ func BuildWebhookEnvelopesFromRequest(c *gin.Context) []Envelope {
envelope := Envelope{
Id: uid,
EventProtocol: protocol.WEBHOOK,
EventSchema: *whEvent.Schema(),
Tstamp: time.Now().UTC(),
Ip: c.ClientIP(),
Payload: whEvent,
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/silverton-io/honeypot/pkg/annotator"
"github.com/silverton-io/honeypot/pkg/envelope"
"github.com/silverton-io/honeypot/pkg/response"
"github.com/silverton-io/honeypot/pkg/validator"
)

func CloudeventsHandler(h EventHandlerParams) gin.HandlerFunc {
fn := func(c *gin.Context) {
if c.ContentType() == "application/cloudevents+json" || c.ContentType() == "application/cloudevents-batch+json" {
envelopes := envelope.BuildCloudeventEnvelopesFromRequest(c, *h.Config)
annotatedEnvelopes := validator.Annotate(envelopes, h.Cache)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
h.Manifold.Enqueue(annotatedEnvelopes)
c.JSON(http.StatusOK, response.Ok)
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package handler

import (
"github.com/gin-gonic/gin"
"github.com/silverton-io/honeypot/pkg/annotator"
"github.com/silverton-io/honeypot/pkg/envelope"
"github.com/silverton-io/honeypot/pkg/response"
"github.com/silverton-io/honeypot/pkg/validator"
)

func GenericHandler(h EventHandlerParams) gin.HandlerFunc {
fn := func(c *gin.Context) {
envelopes := envelope.BuildGenericEnvelopesFromRequest(c, *h.Config)
annotatedEnvelopes := validator.Annotate(envelopes, h.Cache)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
h.Manifold.Enqueue(annotatedEnvelopes)
c.JSON(200, response.Ok)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/snowplow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (

"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"github.com/silverton-io/honeypot/pkg/annotator"
"github.com/silverton-io/honeypot/pkg/envelope"
"github.com/silverton-io/honeypot/pkg/response"
"github.com/silverton-io/honeypot/pkg/validator"
)

func SnowplowHandler(h EventHandlerParams) gin.HandlerFunc {
fn := func(c *gin.Context) {
envelopes := envelope.BuildSnowplowEnvelopesFromRequest(c, *h.Config)
annotatedEnvelopes := validator.Annotate(envelopes, h.Cache)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
h.Manifold.Enqueue(annotatedEnvelopes)
if c.Request.Method == http.MethodGet {
redirectUrl, _ := c.GetQuery("u")
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/squawkbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/silverton-io/honeypot/pkg/annotator"
"github.com/silverton-io/honeypot/pkg/envelope"
"github.com/silverton-io/honeypot/pkg/protocol"
"github.com/silverton-io/honeypot/pkg/validator"
)

func SquawkboxHandler(h EventHandlerParams, eventProtocol string) gin.HandlerFunc {
Expand All @@ -20,7 +20,7 @@ func SquawkboxHandler(h EventHandlerParams, eventProtocol string) gin.HandlerFun
case protocol.GENERIC:
envelopes = envelope.BuildGenericEnvelopesFromRequest(c, *h.Config)
}
annotatedEnvelopes := validator.Annotate(envelopes, h.Cache)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
c.JSON(http.StatusOK, annotatedEnvelopes)
}
return gin.HandlerFunc(fn)
Expand Down
6 changes: 4 additions & 2 deletions pkg/handler/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/silverton-io/honeypot/pkg/annotator"
"github.com/silverton-io/honeypot/pkg/envelope"
"github.com/silverton-io/honeypot/pkg/response"
)

func WebhookHandler(handlerParams EventHandlerParams) gin.HandlerFunc {
func WebhookHandler(h EventHandlerParams) gin.HandlerFunc {
fn := func(c *gin.Context) {
if c.ContentType() == "application/json" {
envelopes := envelope.BuildWebhookEnvelopesFromRequest(c)
handlerParams.Manifold.Enqueue(envelopes)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
h.Manifold.Enqueue(annotatedEnvelopes)
c.JSON(http.StatusOK, response.Ok)
} else {
c.JSON(http.StatusBadRequest, response.InvalidContentType)
Expand Down
4 changes: 2 additions & 2 deletions pkg/manifold/manifold.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func (m *Manifold) Run(meta *tele.Meta, shutdown *chan bool) {
if *e.IsValid {
log.Debug().Msg("appending valid envelope to buffer...")
m.AppendValidEnvelope(e)
meta.ProtocolStats.IncrementValid(e.EventProtocol, e.EventSchema, 1)
meta.ProtocolStats.IncrementValid(e.EventProtocol, e.EventMetadata, 1)
} else {
log.Debug().Msg("appending invalid envelope to buffer...")
m.AppendInvalidEnvelope(e)
meta.ProtocolStats.IncrementInvalid(e.EventProtocol, e.EventSchema, 1)
meta.ProtocolStats.IncrementInvalid(e.EventProtocol, e.EventMetadata, 1)
}
m.PurgeBuffersToSinksIfFull(ctx, meta)
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/sink/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,20 @@ func (s *KafkaSink) batchPublish(ctx context.Context, topic string, envelopes []
var wg sync.WaitGroup
for _, event := range envelopes {
payload, _ := json.Marshal(event)
record := &kgo.Record{Topic: topic, Value: payload}
headers := []kgo.RecordHeader{
{Key: envelope.EVENT_VENDOR, Value: []byte(event.EventMetadata.Vendor)},
{Key: envelope.EVENT_PRIMARY_NAMESPACE, Value: []byte(event.EventMetadata.PrimaryNamespace)},
{Key: envelope.EVENT_SECONDARY_NAMESPACE, Value: []byte(event.EventMetadata.SecondaryNamespace)},
{Key: envelope.EVENT_TERTIARY_NAMESPACE, Value: []byte(event.EventMetadata.TertiaryNamespace)},
{Key: envelope.EVENT_NAME, Value: []byte(event.EventMetadata.Name)},
{Key: envelope.EVENT_VERSION, Value: []byte(event.EventMetadata.Version)},
}
record := &kgo.Record{
Key: []byte(event.EventMetadata.Path), // FIXME! Add configurable partition assignment
Topic: topic,
Value: payload,
Headers: headers,
}
wg.Add(1)
s.client.Produce(ctx, record, func(r *kgo.Record, err error) {
defer wg.Done()
Expand Down
6 changes: 3 additions & 3 deletions pkg/sink/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (s *PostgresSink) batchPublish(ctx context.Context, tableName string, envel
row := []interface{}{
envelope.Id,
envelope.EventProtocol,
envelope.EventSchema,
envelope.Source,
envelope.EventMetadata,
envelope.SourceMetadata,
envelope.Tstamp,
envelope.Ip,
envelope.IsValid,
Expand All @@ -78,7 +78,7 @@ func (s *PostgresSink) batchPublish(ctx context.Context, tableName string, envel
}
copyCount, err := s.conn.CopyFrom(
pgx.Identifier{tableName},
[]string{"id", "eventProtocol", "eventSchema", "source", "tstamp", "ip", "isValid", "isRelayed", "validationError", "payload"},
[]string{"id", "eventProtocol", "eventMetadata", "sourceMetadata", "tstamp", "ip", "isValid", "isRelayed", "validationError", "payload"}, // FIXME - this is ugly.
pgx.CopyFromRows(rows),
)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/sink/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func (s *PubsubSink) batchPublish(ctx context.Context, topic *pubsub.Topic, enve
payload, _ := json.Marshal(event)
msg := &pubsub.Message{
Data: payload,
Attributes: map[string]string{
envelope.EVENT_VENDOR: event.EventMetadata.Vendor,
envelope.EVENT_PRIMARY_NAMESPACE: event.EventMetadata.PrimaryNamespace,
envelope.EVENT_SECONDARY_NAMESPACE: event.EventMetadata.SecondaryNamespace,
envelope.EVENT_TERTIARY_NAMESPACE: event.EventMetadata.TertiaryNamespace,
envelope.EVENT_NAME: event.EventMetadata.Name,
envelope.EVENT_VERSION: event.EventMetadata.Version,
},
}
result := topic.Publish(ctx, msg)
wg.Add(1)
Expand Down
Loading

0 comments on commit c4874c8

Please sign in to comment.