-
Notifications
You must be signed in to change notification settings - Fork 1
/
cached_schema_registry.go
48 lines (42 loc) · 1.86 KB
/
cached_schema_registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package pulsavro
import (
"github.com/linkedin/goavro/v2"
"sync"
)
// CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance
type CachedSchemaRegistryClient struct {
SchemaRegistryClient *SchemaRegistryClient
schemaCache map[string]*goavro.Codec
schemaCacheLock sync.RWMutex
schemaIdCache map[string]int
schemaIdCacheLock sync.RWMutex
}
func NewCachedSchemaRegistryClient(connect []string) *CachedSchemaRegistryClient {
SchemaRegistryClient := NewSchemaRegistryClient(connect)
return &CachedSchemaRegistryClient{SchemaRegistryClient: SchemaRegistryClient, schemaCache: make(map[string]*goavro.Codec), schemaIdCache: make(map[string]int)}
}
func NewCachedSchemaRegistryClientWithRetries(connect []string, retries int) *CachedSchemaRegistryClient {
SchemaRegistryClient := NewSchemaRegistryClientWithRetries(connect, retries)
return &CachedSchemaRegistryClient{SchemaRegistryClient: SchemaRegistryClient, schemaCache: make(map[string]*goavro.Codec), schemaIdCache: make(map[string]int)}
}
// GetSchemaCodecByTopic will return and cache the codec with the given topic information
func (client *CachedSchemaRegistryClient) GetSchemaCodecByTopic(topic string) (*goavro.Codec, error) {
client.schemaCacheLock.RLock()
cachedResult := client.schemaCache[topic]
client.schemaCacheLock.RUnlock()
if nil != cachedResult {
return cachedResult, nil
}
codec, err := client.SchemaRegistryClient.GetSchemaCodecByTopic(topic)
if err != nil {
return nil, err
}
client.schemaCacheLock.Lock()
client.schemaCache[topic] = codec
client.schemaCacheLock.Unlock()
return codec, nil
}
// CreateSchemaByTopic will create a schema for the specified topic
func (client *CachedSchemaRegistryClient) CreateSchemaByTopic(topic string, schema string) error {
return client.SchemaRegistryClient.CreateSchemaByTopic(topic, schema)
}