Skip to content

Commit

Permalink
[extension/storage/redis_storage] Add a new Redis extension to store …
Browse files Browse the repository at this point in the history
…data in transit
  • Loading branch information
atoulme committed May 24, 2024
1 parent 4fad287 commit 1c2e4aa
Show file tree
Hide file tree
Showing 23 changed files with 867 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/redis-storage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: redis_storage

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds a new storage extension using Redis to store data in transit

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31682]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ extension/solarwindsapmsettingsextension/ @open-telemetry/collect
extension/storage/ @open-telemetry/collector-contrib-approvers @dmitryax @atoulme @djaglowski
extension/storage/dbstorage/ @open-telemetry/collector-contrib-approvers @dmitryax @atoulme
extension/storage/filestorage/ @open-telemetry/collector-contrib-approvers @djaglowski
extension/storage/redisstorage/ @open-telemetry/collector-contrib-approvers @atoulme
extension/sumologicextension/ @open-telemetry/collector-contrib-approvers @aboguszewski-sumo @kkujawa-sumo @mat-rumian @rnishtala-sumo @sumo-drosiek @swiatekm-sumo

internal/aws/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @mxiamxia
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ body:
- extension/storage
- extension/storage/dbstorage
- extension/storage/filestorage
- extension/storage/redisstorage
- extension/sumologic
- internal/aws
- internal/collectd
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ body:
- extension/storage
- extension/storage/dbstorage
- extension/storage/filestorage
- extension/storage/redisstorage
- extension/sumologic
- internal/aws
- internal/collectd
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ body:
- extension/storage
- extension/storage/dbstorage
- extension/storage/filestorage
- extension/storage/redisstorage
- extension/sumologic
- internal/aws
- internal/collectd
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ body:
- extension/storage
- extension/storage/dbstorage
- extension/storage/filestorage
- extension/storage/redisstorage
- extension/sumologic
- internal/aws
- internal/collectd
Expand Down
1 change: 1 addition & 0 deletions extension/storage/redisstorage/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
46 changes: 46 additions & 0 deletions extension/storage/redisstorage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# File Storage

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development] |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aextension%2Fredisstorage%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aextension%2Fredisstorage) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aextension%2Fredisstorage%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aextension%2Fredisstorage) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme) \| Seeking more code owners! |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

The Redis Storage extension can persist state to a Redis cluster.

The extension requires read and write access to a Redis cluster.


## Example

```
extensions:
redis_storage:
redis_storage/all_settings:
endpoint: localhost:6379
password: ""
db: 0
expiration: 5m
service:
extensions: [redis_storage, redis_storage/all_settings]
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [nop]
# Data pipeline is required to load the config.
receivers:
nop:
processors:
nop:
exporters:
nop:
```
18 changes: 18 additions & 0 deletions extension/storage/redisstorage/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package redisstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/redisstorage"

import (
"time"

"go.opentelemetry.io/collector/config/configopaque"
)

// Config defines configuration for file storage extension.
type Config struct {
Endpoint string `mapstructure:"endpoint"`
Password configopaque.String `mapstructure:"password"`
DB int `mapstructure:"db"`
Expiration time.Duration `mapstructure:"expiration"`
}
58 changes: 58 additions & 0 deletions extension/storage/redisstorage/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package redisstorage

import (
"path/filepath"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/redisstorage/internal/metadata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

tests := []struct {
id component.ID
expected component.Config
}{
{
id: component.NewID(metadata.Type),
expected: func() component.Config {
ret := NewFactory().CreateDefaultConfig()
ret.(*Config).Endpoint = "localhost:1234"
return ret
}(),
},
{
id: component.NewIDWithName(metadata.Type, "all_settings"),
expected: &Config{
Endpoint: "localhost:1234",
Password: "passwd",
DB: 1,
Expiration: 3 * time.Hour,
},
},
}
for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
})
}
}
5 changes: 5 additions & 0 deletions extension/storage/redisstorage/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml
package redisstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/redisstorage"
129 changes: 129 additions & 0 deletions extension/storage/redisstorage/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package redisstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/redisstorage"

import (
"context"
"errors"
"fmt"
"time"

"github.com/redis/go-redis/v9"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.uber.org/zap"
)

type redisStorage struct {
cfg *Config
logger *zap.Logger
client *redis.Client
}

// Ensure this storage extension implements the appropriate interface
var _ storage.Extension = (*redisStorage)(nil)

func newRedisStorage(logger *zap.Logger, config *Config) (extension.Extension, error) {
return &redisStorage{
cfg: config,
logger: logger,
}, nil
}

// Start runs cleanup if configured
func (rs *redisStorage) Start(context.Context, component.Host) error {
c := redis.NewClient(&redis.Options{
Addr: rs.cfg.Endpoint,
Password: string(rs.cfg.Password),
DB: rs.cfg.DB,
})
rs.client = c
return nil
}

// Shutdown will close any open databases
func (rs *redisStorage) Shutdown(context.Context) error {
if rs.client == nil {
return nil
}
return rs.client.Close()
}

type redisClient struct {
client *redis.Client
prefix string
expiration time.Duration
}

func (rc redisClient) Get(ctx context.Context, key string) ([]byte, error) {
b, err := rc.client.Get(ctx, rc.prefix+key).Bytes()
if errors.Is(err, redis.Nil) {
return nil, nil
}
return b, err
}

func (rc redisClient) Set(ctx context.Context, key string, value []byte) error {
_, err := rc.client.Set(ctx, rc.prefix+key, value, rc.expiration).Result()
return err
}

func (rc redisClient) Delete(ctx context.Context, key string) error {
_, err := rc.client.Del(ctx, rc.prefix+key).Result()
return err
}

func (rc redisClient) Batch(ctx context.Context, ops ...storage.Operation) error {
p := rc.client.Pipeline()
for _, op := range ops {
switch op.Type {
case storage.Delete:
p.Del(ctx, op.Key)
case storage.Get:
p.Get(ctx, op.Key)
case storage.Set:
p.Set(ctx, op.Key, op.Value, rc.expiration)
}
}
_, err := p.Exec(ctx)
return err
}

func (rc redisClient) Close(_ context.Context) error {
return nil
}

// GetClient returns a storage client for an individual component
func (rs *redisStorage) GetClient(_ context.Context, kind component.Kind, ent component.ID, name string) (storage.Client, error) {
var rawName string
if name == "" {
rawName = fmt.Sprintf("%s_%s_%s", kindString(kind), ent.Type(), ent.Name())
} else {
rawName = fmt.Sprintf("%s_%s_%s_%s", kindString(kind), ent.Type(), ent.Name(), name)
}

return redisClient{
client: rs.client,
prefix: rawName,
expiration: rs.cfg.Expiration,
}, nil
}

func kindString(k component.Kind) string {
switch k {
case component.KindReceiver:
return "receiver"
case component.KindProcessor:
return "processor"
case component.KindExporter:
return "exporter"
case component.KindExtension:
return "extension"
case component.KindConnector:
return "connector"
default:
return "other" // not expected
}
}
Loading

0 comments on commit 1c2e4aa

Please sign in to comment.