Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…y-collector-contrib into balance-metrics-by-resources
  • Loading branch information
SHaaD94 committed Apr 29, 2024
2 parents e56a50e + dd361de commit c74f6c7
Show file tree
Hide file tree
Showing 27 changed files with 593 additions and 134 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feature_win-perf-receiver-partial-errors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: windowsperfcountersreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Returns partial errors for failures during scraping to prevent throwing out all successfully retrieved metrics

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [16712]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/fix-cwlogs-xray.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awsxrayreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Retain CloudWatch Log Group when translating X-Ray segments

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31784]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
27 changes: 27 additions & 0 deletions .chloggen/fix-stanza-matcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Fix issue when `exclude_older_than` is enabled without `ordering_criteria` configured"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32681]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
27 changes: 27 additions & 0 deletions .chloggen/fix_compressor-kinesis-exporter-thread-safe.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awskinesisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: the compressor was crashing under high load due it not being thread safe.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32589]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: removed compressor abstraction and each execution has its own buffer (so it's thread safe)

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/zipkin_alpha.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: zipkinencodingextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Move zipkinencodingextension to alpha

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32702]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion .github/workflows/scripts/ping-codeowners-on-new-issue.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ if [[ -n "${PING_LINES}" ]]; then
# to get the newlines to render correctly, using string formatting
# causes the newlines to be interpreted literally.
echo -e "Pinging code owners:\n${PING_LINES}"
echo -e "Pinging code owners:\n${PING_LINES}\n%s" "${LABELS_COMMENT}" \
echo -e "Pinging code owners:\n${PING_LINES}\n" "${LABELS_COMMENT}" \
| gh issue comment "${ISSUE}" -F -
else
echo "No code owners were found to ping"
Expand Down
8 changes: 1 addition & 7 deletions exporter/awskinesisexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/batch"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/producer"
)

Expand Down Expand Up @@ -90,16 +89,11 @@ func createExporter(ctx context.Context, c component.Config, log *zap.Logger, op
return nil, err
}

compressor, err := compress.NewCompressor(conf.Encoding.Compression)
if err != nil {
return nil, err
}

encoder, err := batch.NewEncoder(
conf.Encoding.Name,
batch.WithMaxRecordSize(conf.MaxRecordSize),
batch.WithMaxRecordsPerBatch(conf.MaxRecordsPerBatch),
batch.WithCompression(compressor),
batch.WithCompressionType(conf.Compression),
)

if err != nil {
Expand Down
24 changes: 14 additions & 10 deletions exporter/awskinesisexporter/internal/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Batch struct {
maxBatchSize int
maxRecordSize int

compression compress.Compressor
compressionType string

records []types.PutRecordsRequestEntry
}
Expand All @@ -54,20 +54,18 @@ func WithMaxRecordSize(size int) Option {
}
}

func WithCompression(compressor compress.Compressor) Option {
func WithCompressionType(compressionType string) Option {
return func(bt *Batch) {
if compressor != nil {
bt.compression = compressor
}
bt.compressionType = compressionType
}
}

func New(opts ...Option) *Batch {
bt := &Batch{
maxBatchSize: MaxBatchedRecords,
maxRecordSize: MaxRecordSize,
compression: compress.NewNoopCompressor(),
records: make([]types.PutRecordsRequestEntry, 0, MaxBatchedRecords),
maxBatchSize: MaxBatchedRecords,
maxRecordSize: MaxRecordSize,
compressionType: "none",
records: make([]types.PutRecordsRequestEntry, 0, MaxBatchedRecords),
}

for _, op := range opts {
Expand All @@ -78,7 +76,13 @@ func New(opts ...Option) *Batch {
}

func (b *Batch) AddRecord(raw []byte, key string) error {
record, err := b.compression.Do(raw)

compressor, err := compress.NewCompressor(b.compressionType)
if err != nil {
return err
}

record, err := compressor(raw)
if err != nil {
return err
}
Expand Down
103 changes: 54 additions & 49 deletions exporter/awskinesisexporter/internal/compress/compresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,77 +9,82 @@ import (
"compress/gzip"
"compress/zlib"
"fmt"
"io"
)

type bufferedResetWriter interface {
Write(p []byte) (int, error)
Flush() error
Reset(newWriter io.Writer)
Close() error
}

type Compressor interface {
Do(in []byte) (out []byte, err error)
}

var _ Compressor = (*compressor)(nil)

type compressor struct {
compression bufferedResetWriter
}
type Compressor func(in []byte) ([]byte, error)

func NewCompressor(format string) (Compressor, error) {
c := &compressor{
compression: &noop{},
}
switch format {
case "flate":
w, err := flate.NewWriter(nil, flate.BestSpeed)
if err != nil {
return nil, err
}
c.compression = w
return flateCompressor, nil
case "gzip":
w, err := gzip.NewWriterLevel(nil, gzip.BestSpeed)
if err != nil {
return nil, err
}
c.compression = w

return gzipCompressor, nil
case "zlib":
w, err := zlib.NewWriterLevel(nil, zlib.BestSpeed)
if err != nil {
return nil, err
}
c.compression = w
return zlibCompressor, nil
case "noop", "none":
// Already the default case
default:
return nil, fmt.Errorf("unknown compression format: %s", format)
return noopCompressor, nil
}

return nil, fmt.Errorf("unknown compression format: %s", format)
}

func flateCompressor(in []byte) ([]byte, error) {
var buf bytes.Buffer
w, _ := flate.NewWriter(&buf, flate.BestSpeed)
defer w.Close()

_, err := w.Write(in)

if err != nil {
return nil, err
}

return c, nil
err = w.Flush()
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}

func (c *compressor) Do(in []byte) ([]byte, error) {
buf := new(bytes.Buffer)
func gzipCompressor(in []byte) ([]byte, error) {
var buf bytes.Buffer
w, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed)
defer w.Close()

c.compression.Reset(buf)
_, err := w.Write(in)

if _, err := c.compression.Write(in); err != nil {
if err != nil {
return nil, err
}

if err := c.compression.Flush(); err != nil {
err = w.Flush()
if err != nil {
return nil, err
}

if closer, ok := c.compression.(io.Closer); ok {
if err := closer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func zlibCompressor(in []byte) ([]byte, error) {
var buf bytes.Buffer
w, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed)
defer w.Close()

_, err := w.Write(in)

if err != nil {
return nil, err
}

err = w.Flush()
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}

func noopCompressor(in []byte) ([]byte, error) {
return in, nil
}
Loading

0 comments on commit c74f6c7

Please sign in to comment.