Skip to content

Commit

Permalink
Fix handling of NaN and Inf-valued float
Browse files Browse the repository at this point in the history
Prior to this commit, a NaN or Inf-valued float would cause NaN or Inf
to leak into statistics. We will probably want to track NaN/Inf counts
in stats eventually, but they should definitely not corrupt
min/sum/max/etc.
  • Loading branch information
wkalt committed Apr 1, 2024
1 parent 70905e5 commit 1883231
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 4 deletions.
8 changes: 8 additions & 0 deletions nodestore/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ func (s *Statistics) Ranges(start, end uint64, schemaHash string) []StatRange {
}

func (s *Statistics) observeNumeric(idx int, v float64) {
// if we get a NaN or an inf, skip it but still create a summary.
if math.IsNaN(v) || math.IsInf(v, 0) {
if _, ok := s.NumStats[idx]; !ok {
s.NumStats[idx] = &NumericalSummary{}
}
return
}

summary, ok := s.NumStats[idx]
if !ok {
summary = &NumericalSummary{Min: v, Max: v, Mean: v, Sum: v}
Expand Down
241 changes: 241 additions & 0 deletions nodestore/statistics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package nodestore_test

import (
"math"
"testing"

fmcap "github.com/foxglove/mcap/go/mcap"
"github.com/stretchr/testify/require"
"github.com/wkalt/dp3/nodestore"
"github.com/wkalt/dp3/util"
"github.com/wkalt/dp3/util/schema"
"github.com/wkalt/dp3/util/testutils"
)

func TestObserve(t *testing.T) {
t.Run("edge case floats do not disrupt stats", func(t *testing.T) {
cases := []struct {
assertion string
value float64
}{
{"NaN", math.NaN()},
{"+Inf", math.Inf(1)},
{"-Inf", math.Inf(-1)},
}
for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
s := nodestore.NewStatistics([]util.Named[schema.PrimitiveType]{util.NewNamed("test", schema.FLOAT64)})
msg := &fmcap.Message{
Data: testutils.F64b(c.value),
}
require.NoError(t, s.ObserveMessage(msg, []any{c.value}))
require.Equal(t, 1, int(s.MessageCount))
require.Zero(t, s.NumStats[0].Mean)
require.Zero(t, s.NumStats[0].Sum)
require.Zero(t, s.NumStats[0].Min)
require.Zero(t, s.NumStats[0].Max)
})
}
})

t.Run("text updates text stats", func(t *testing.T) {
s := nodestore.NewStatistics([]util.Named[schema.PrimitiveType]{util.NewNamed("test", schema.STRING)})
data := testutils.PrefixedString("hello")
msg := &fmcap.Message{
Data: data,
}
require.NoError(t, s.ObserveMessage(msg, []any{"hello"}))
require.Equal(t, 1, int(s.MessageCount))
require.Equal(t, "hello", s.TextStats[0].Min)
require.Equal(t, "hello", s.TextStats[0].Max)
})

t.Run("numeric types update numeric stats", func(t *testing.T) {
cases := []struct {
name string
value any
}{
{"int8", int8(1)},
{"int16", int16(1)},
{"int32", int32(1)},
{"int64", int64(1)},
{"float32", float32(1)},
{"float64", float64(1)},
{"uint8", uint8(1)},
{"uint16", uint16(1)},
{"uint32", uint32(1)},
{"uint64", uint64(1)},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := nodestore.NewStatistics([]util.Named[schema.PrimitiveType]{util.NewNamed("test", schema.FLOAT64)})
var data []byte
switch value := c.value.(type) {
case int8:
data = testutils.U8b(uint8(value))
case uint8:
data = testutils.U8b(value)
case int16:
data = testutils.U16b(uint16(value))
case uint16:
data = testutils.U16b(value)
case int32:
data = testutils.U32b(uint32(value))
case uint32:
data = testutils.U32b(value)
case int64:
data = testutils.U64b(uint64(value))
case uint64:
data = testutils.U64b(value)
case float32:
data = testutils.F32b(value)
case float64:
data = testutils.F64b(value)
}
msg := &fmcap.Message{
Data: data,
}
require.NoError(t, s.ObserveMessage(msg, []any{c.value}))
require.Equal(t, 1, int(s.MessageCount))
require.InEpsilon(t, float64(1), s.NumStats[0].Mean, 0.01)
require.InEpsilon(t, float64(1), s.NumStats[0].Sum, 0.01)
require.InEpsilon(t, float64(1), s.NumStats[0].Min, 0.01)
require.InEpsilon(t, float64(1), s.NumStats[0].Max, 0.01)
})
}
})
}

func TestAdd(t *testing.T) {
cases := []struct {
assertion string
inputs []*nodestore.Statistics
output *nodestore.Statistics
}{
{
assertion: "adding to empty copies the input",
inputs: []*nodestore.Statistics{
{},
{
MessageCount: 1,
ByteCount: 10,
MaxObservedTime: 10,
MinObservedTime: 1,
}},
output: &nodestore.Statistics{
MessageCount: 1,
ByteCount: 10,
MaxObservedTime: 10,
MinObservedTime: 1,
},
},
{
"adding to a populated statistic",
[]*nodestore.Statistics{
{
MessageCount: 1,
ByteCount: 10,
MaxObservedTime: 10,
MinObservedTime: 1,
},
{
MessageCount: 1,
ByteCount: 10,
MaxObservedTime: 20,
MinObservedTime: 1,
},
},
&nodestore.Statistics{
MessageCount: 2,
ByteCount: 20,
MaxObservedTime: 20,
MinObservedTime: 1,
},
},
{
"adding text fields",
[]*nodestore.Statistics{
{
Fields: []util.Named[schema.PrimitiveType]{util.NewNamed("test", schema.STRING)},
TextStats: map[int]*nodestore.TextSummary{
0: {
Min: "a",
Max: "b",
},
},
MessageCount: 1,
},
{
Fields: []util.Named[schema.PrimitiveType]{util.NewNamed("test", schema.STRING)},
TextStats: map[int]*nodestore.TextSummary{
0: {
Min: "a",
Max: "c",
},
},
MessageCount: 1,
},
},
&nodestore.Statistics{
Fields: []util.Named[schema.PrimitiveType]{util.NewNamed("test", schema.STRING)},
TextStats: map[int]*nodestore.TextSummary{
0: {
Min: "a",
Max: "c",
},
},
MessageCount: 2,
},
},
{
"adding numeric fields",
[]*nodestore.Statistics{
{
Fields: []util.Named[schema.PrimitiveType]{util.NewNamed("test", schema.FLOAT64)},
NumStats: map[int]*nodestore.NumericalSummary{
0: {
Min: 1,
Max: 10,
Sum: 10,
Mean: 5,
},
},
MessageCount: 2,
},
{
Fields: []util.Named[schema.PrimitiveType]{util.NewNamed("test", schema.FLOAT64)},
NumStats: map[int]*nodestore.NumericalSummary{
0: {
Min: 10,
Max: 20,
Sum: 30,
Mean: 15,
},
},
MessageCount: 2,
},
},
&nodestore.Statistics{
Fields: []util.Named[schema.PrimitiveType]{util.NewNamed("test", schema.FLOAT64)},
NumStats: map[int]*nodestore.NumericalSummary{
0: {
Min: 1,
Max: 20,
Mean: 10,
Sum: 40,
},
},
MessageCount: 4,
},
},
}

for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
first := c.inputs[0]
second := c.inputs[1]
require.NoError(t, first.Add(second))
require.Equal(t, c.output, first)
})
}
}
6 changes: 3 additions & 3 deletions routes/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ func newImportHandler(tmgr *treemgr.TreeManager) http.HandlerFunc {
return
}
defer r.Body.Close()

ctx = log.AddTags(ctx, "producer", req.ProducerID, "path", req.Path)
f, err := os.Open(req.Path) // todo - get from storage provider
if err != nil {
httputil.BadRequest(ctx, w, "error opening file: %s", err)
return
}
defer f.Close()
log.Infof(ctx, "Importing file %s for producer %s", req.Path, req.ProducerID)
log.Infof(ctx, "Importing file")
if err := tmgr.Receive(ctx, req.ProducerID, f); err != nil {
httputil.InternalServerError(ctx, w, "error receiving file: %s", err)
return
}
log.Infow(ctx, "Imported", "location", req.Path, "producer_id", req.ProducerID)
log.Infof(ctx, "Imported")
}
}
1 change: 0 additions & 1 deletion tree/memtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (m *MemTree) ToBytes(ctx context.Context, oid uint64) ([]byte, error) { //
if err != nil {
return nil, fmt.Errorf("failed to write node: %w", err)
}

nodeID := nodestore.NewNodeID(oid, uint64(offset), uint64(n))
offset += n
processed[id] = nodeID
Expand Down

0 comments on commit 1883231

Please sign in to comment.