Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.

Kafka avro #152

Merged
merged 3 commits into from
Nov 6, 2021
Merged

Kafka avro #152

merged 3 commits into from
Nov 6, 2021

Conversation

xmcqueen
Copy link
Contributor

@xmcqueen xmcqueen commented Oct 7, 2021

This PR provides avro support for the kafka sink, whereby standard, non-strict json can be ingested into kafka with full avro encoding.

#147

Example

Here's an example config file showing the syntax including an example of a complete avro schema for kubernetes events:

logLevel: info
logFormat: json
route:
  routes:
    - match:
        - receiver: "dump"
receivers:
  - name: dump
    kafka:
      topic: kubeEvents
      brokers:
        - kafka.broker.some.com:13013
      avro:
        schemaID: aa6b1ca0e1ee2d885bfbc747f4a4011b
        schema: |-
          {"type":"record","name":"kubeEvents","fields":[{"name":"action","type":"string","default":""},{"name":"annotations","type":{"type":"map","values":"string"},"default":{}},{"name":"apiVersion","type":"string","default":""},{"name":"clusterName","type":"string","default":""},{"name":"count","type":"long","default":0},{"name":"deletionGracePeriodSeconds","type":"long","default":0},{"name":"deletionTimestamp","type":"string","default":""},{"name":"eventTime","type":["string","null"],"default":""},{"name":"finalizers","type":{"type":"array","items":"string"},"default":[]},{"name":"firstTimestamp","type":["string","null"],"default":""},{"name":"generateName","type":"string","default":""},{"name":"generation","type":"long","default":0},{"name":"involvedObject","type":{"type":"record","name":"io2","fields":[{"name":"apiVersion","type":"string","default":""},{"name":"fieldPath","type":"string","default":""},{"name":"kind","type":"string","default":""},{"name":"name","type":"string","default":""},{"name":"namespace","type":"string","default":""},{"name":"resourceVersion","type":"string","default":""},{"name":"uid","type":"string","default":""}]}},{"name":"kind","type":"string","default":""},{"name":"labels","type":{"type":"map","values":"string"},"default":{}},{"name":"lastTimestamp","type":["string","null"],"default":""},{"name":"managedFields","type":{"type":"array","items":"string"},"default":[]},{"name":"message","type":"string","default":""},{"name":"metadata","type":{"type":"record","name":"md","fields":[{"name":"annotations","type":{"type":"map","values":"string"},"default":{}},{"name":"clusterName","type":"string","default":""},{"name":"creationTimestamp","type":"string","default":""},{"name":"deletionGracePeriodSeconds","type":"long","default":0},{"name":"deletionTimestamp","type":"string","default":""},{"name":"generateName","type":"string","default":""},{"name":"generation","type":"long","default":0},{"name":"labels","type":{"type":"map","values":"string"},"default":{}},{"name":"name","type":"string","default":""},{"name":"namespace","type":"string","default":""},{"name":"resourceVersion","type":"string","default":""},{"name":"selfLink","type":"string","default":""},{"name":"uid","type":"string","default":""}]}},{"name":"name","type":"string","default":""},{"name":"namespace","type":"string","default":""},{"name":"ownerReferences","type":{"type":"array","items":"string"},"default":[]},{"name":"reason","type":"string","default":""},{"name":"reportingComponent","type":"string","default":""},{"name":"reportingInstance","type":"string","default":""},{"name":"resourceVersion","type":"string","default":""},{"name":"selfLink","type":"string","default":""},{"name":"source","type":{"type":"record","name":"so","fields":[{"name":"component","type":"string","default":""},{"name":"host","type":"string","default":""}]}},{"name":"type","type":"string","default":""},{"name":"uid","type":"string","default":""}]}
      tls:
        enable: true
        certFile: ./identity.cert
        keyFile: ./identity.key
        caFile: /etc/some-ca-bundle.crt

Dependency

The avro support is provided by goavro.

There's one file avro.go that provides the interface to goavro, and a small change in kafka.go to call the avro encoder and to handle the configuration data.

Fully Backwards Compatible

The schemaID, and the schema default to null, and if they are unspecified, the default behaviour is the expected plain-kafka sink.

@mustafaakin
Copy link
Contributor

Looks nice, will merge soon, out of curiosity, what changes did you do to upstream avro library?

@xmcqueen
Copy link
Contributor Author

xmcqueen commented Oct 7, 2021

I added support for standard json, so we could use schemas with unions. Avro expects json for union types to be a map indicating the type, not just a value of some type.

For a union taking a string, the avro libs expect the json to be like this:

{"string": "value1"}
{"int": 2}

All of the regular json in the wild is full of union types where the values are supplied are like (as you already know):

"value1"
2

all of which were being rejected.

@mustafaakin-atl mustafaakin-atl merged commit 7eef4f6 into opsgenie:master Nov 6, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants