Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,36 @@ will now also serialize ``float64`` (double-precision) columns as binary.
You might see a performance uplift if this is a dominant data type in your
ingestion workload.

## Decimal columns

QuestDB server version 9.2.0 and newer supports decimal columns with arbitrary precision and scale.
The Go client converts supported decimal values to QuestDB's text/binary wire format automatically:

- `DecimalColumnScaled`: `questdb.ScaledDecimal`, including helpers like `questdb.NewDecimalFromInt64` and `questdb.NewDecimal`.
- `DecimalColumnShopspring`: `github.com/shopspring/decimal.Decimal` values or pointers.
- `DecimalColumnString`: `string` literals representing decimal values (validated at runtime).

```go
price := qdb.NewDecimalFromInt64(12345, 2) // 123.45 with scale 2
commission := qdb.NewDecimal(big.NewInt(-750), 4) // -0.0750 with scale 4

err = sender.
Table("trades").
Symbol("symbol", "ETH-USD").
DecimalColumnScaled("price", price).
DecimalColumnScaled("commission", commission).
AtNow(ctx)
```

To emit textual decimals, pass a validated string literal:

```go
err = sender.
Table("quotes").
DecimalColumnString("mid", "1.23456").
AtNow(ctx)
```

## Pooled Line Senders

**Warning: Experimental feature designed for use with HTTP senders ONLY**
Expand Down
71 changes: 71 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,77 @@ func (b *buffer) Float64Column(name string, val float64) *buffer {
return b
}

func (b *buffer) DecimalColumnScaled(name string, val ScaledDecimal) *buffer {
if val.isNull() {
// Don't write null decimals
return b
}
if !b.prepareForField() {
return b
}
return b.decimalColumnScaled(name, val)
}

func (b *buffer) decimalColumnScaled(name string, val ScaledDecimal) *buffer {
if err := val.ensureValidScale(); err != nil {
b.lastErr = err
return b
}
b.lastErr = b.writeColumnName(name)
if b.lastErr != nil {
return b
}
b.WriteByte('=')
b.WriteByte('=')
b.WriteByte(decimalBinaryTypeCode)
b.WriteByte((uint8)(val.scale))
b.WriteByte(32 - val.offset)
b.Write(val.unscaled[val.offset:])
b.hasFields = true
return b
}

func (b *buffer) DecimalColumnString(name string, val string) *buffer {
if !b.prepareForField() {
return b
}
if err := validateDecimalText(val); err != nil {
b.lastErr = err
return b
}
b.lastErr = b.writeColumnName(name)
if b.lastErr != nil {
return b
}
b.WriteByte('=')
b.WriteString(val)
b.WriteByte('d')
b.hasFields = true
return b
}

func (b *buffer) DecimalColumnShopspring(name string, val ShopspringDecimal) *buffer {
if val == nil {
return b
}
if b.lastErr != nil {
return b
}
dec, err := convertShopspringDecimal(val)
if err != nil {
b.lastErr = err
return b
}
if dec.isNull() {
// Don't write null decimals
return b
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, we already wrote column name to the buffer. ILP requires the value to be included. Can't we interpret zero length as null on the server side?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we don't even need to send the value.
In the fix, we no longer send the field if the value is null.

}
if !b.prepareForField() {
return b
}
return b.decimalColumnScaled(name, dec)
}

func (b *buffer) Float64ColumnBinary(name string, val float64) *buffer {
if !b.prepareForField() {
return b
Expand Down
247 changes: 247 additions & 0 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ import (

type bufWriterFn func(b *qdb.Buffer) error

type fakeShopspringDecimal struct {
coeff *big.Int
exp int32
}

func (f fakeShopspringDecimal) Coefficient() *big.Int {
return f.coeff
}

func (f fakeShopspringDecimal) Exponent() int32 {
return f.exp
}

func newTestBuffer() qdb.Buffer {
return qdb.NewBuffer(128*1024, 1024*1024, 127)
}
Expand Down Expand Up @@ -481,6 +494,240 @@ func TestFloat64ColumnBinary(t *testing.T) {
}
}

func TestDecimalColumnScaled(t *testing.T) {
negative, err := qdb.NewDecimal(big.NewInt(-12345), 3)
assert.NoError(t, err)

prefix := []byte(testTable + " price==")
testCases := []struct {
name string
value qdb.ScaledDecimal
expected []byte
}{
{
name: "positive",
value: qdb.NewDecimalFromInt64(12345, 2),
expected: append(prefix, 0x17, 0x02, 0x02, 0x30, 0x39, 0x0A),
},
{
name: "negative",
value: negative,
expected: append(prefix, 0x17, 0x03, 0x02, 0xCF, 0xC7, 0x0A),
},
{
name: "zero with scale",
value: qdb.NewDecimalFromInt64(0, 4),
expected: append(prefix, 0x17, 0x04, 0x01, 0x0, 0x0A),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()
err := buf.Table(testTable).DecimalColumnScaled("price", tc.value).At(time.Time{}, false)
assert.NoError(t, err)
assert.Equal(t, tc.expected, buf.Messages())
})
}
}

func TestDecimalColumnScaledTrimmingAndPadding(t *testing.T) {
prefix := []byte(testTable + " price==")

testCases := []struct {
name string
value qdb.ScaledDecimal
expectedBytes []byte
}{
{
name: "127 boundary",
value: qdb.NewDecimalFromInt64(127, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0x7F},
},
{
name: "128 sign extension",
value: qdb.NewDecimalFromInt64(128, 0),
expectedBytes: []byte{0x17, 0x00, 0x02, 0x00, 0x80},
},
{
name: "255 sign extension",
value: qdb.NewDecimalFromInt64(255, 0),
expectedBytes: []byte{0x17, 0x00, 0x02, 0x00, 0xFF},
},
{
name: "32768 sign extension",
value: qdb.NewDecimalFromInt64(32768, 0),
expectedBytes: []byte{0x17, 0x00, 0x03, 0x00, 0x80, 0x00},
},
{
name: "-1",
value: qdb.NewDecimalFromInt64(-1, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0xFF},
},
{
name: "-2",
value: qdb.NewDecimalFromInt64(-2, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0xFE},
},
{
name: "-127",
value: qdb.NewDecimalFromInt64(-127, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0x81},
},
{
name: "-128",
value: qdb.NewDecimalFromInt64(-128, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0x80},
},
{
name: "-129",
value: qdb.NewDecimalFromInt64(-129, 0),
expectedBytes: []byte{0x17, 0x00, 0x02, 0xFF, 0x7F},
},
{
name: "-256 sign extension",
value: qdb.NewDecimalFromInt64(-256, 0),
expectedBytes: []byte{0x17, 0x00, 0x02, 0xFF, 0x00},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()

err := buf.Table(testTable).DecimalColumnScaled("price", tc.value).At(time.Time{}, false)
assert.NoError(t, err)

expected := append(append([]byte{}, prefix...), tc.expectedBytes...)
expected = append(expected, '\n')
assert.Equal(t, expected, buf.Messages())
})
}
}

func TestDecimalColumnShopspring(t *testing.T) {
prefix := []byte(testTable + " price==")

testCases := []struct {
name string
value fakeShopspringDecimal
expectedBytes []byte
}{
{
name: "negative exponent scales value",
value: fakeShopspringDecimal{coeff: big.NewInt(12345), exp: -2},
expectedBytes: []byte{0x17, 0x02, 0x02, 0x30, 0x39},
},
{
name: "zero",
value: fakeShopspringDecimal{coeff: big.NewInt(0), exp: 0},
expectedBytes: []byte{0x17, 0x00, 0x01, 0x00},
},
{
name: "positive exponent multiplies coefficient",
value: fakeShopspringDecimal{coeff: big.NewInt(123), exp: 2},
expectedBytes: []byte{0x17, 0x00, 0x02, 0x30, 0x0C},
},
{
name: "positive value sign extension",
value: fakeShopspringDecimal{coeff: big.NewInt(128), exp: 0},
expectedBytes: []byte{0x17, 0x00, 0x02, 0x00, 0x80},
},
{
name: "negative value sign extension",
value: fakeShopspringDecimal{coeff: big.NewInt(-12345), exp: -3},
expectedBytes: []byte{0x17, 0x03, 0x02, 0xCF, 0xC7},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()

err := buf.Table(testTable).DecimalColumnShopspring("price", tc.value).At(time.Time{}, false)
assert.NoError(t, err)

expected := append(append([]byte{}, prefix...), tc.expectedBytes...)
expected = append(expected, '\n')
assert.Equal(t, expected, buf.Messages())
})
}
}

func TestDecimalColumnStringValidation(t *testing.T) {
t.Run("valid strings", func(t *testing.T) {
testCases := []struct {
name string
value string
expected string
}{
{"integer", "123", "123d"},
{"decimal", "123.450", "123.450d"},
{"negative", "-0.001", "-0.001d"},
{"exponent positive", "1.2e3", "1.2e3d"},
{"exponent negative", "-4.5E-2", "-4.5E-2d"},
{"nan token", "NaN", "NaNd"},
{"infinity token", "Infinity", "Infinityd"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()
err := buf.Table(testTable).DecimalColumnString("price", tc.value).At(time.Time{}, false)
assert.NoError(t, err)
expected := []byte(testTable + " price=" + tc.expected + "\n")
assert.Equal(t, expected, buf.Messages())
})
}
})

t.Run("invalid strings", func(t *testing.T) {
testCases := []struct {
name string
value string
}{
{"empty", ""},
{"sign only", "+"},
{"double dot", "12.3.4"},
{"invalid char", "12a3"},
{"exponent missing mantissa", "e10"},
{"exponent no digits", "1.2e"},
{"exponent sign no digits", "1.2e+"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()
err := buf.Table(testTable).DecimalColumnString("price", tc.value).At(time.Time{}, false)
assert.Error(t, err)
assert.Contains(t, err.Error(), "decimal")
assert.Empty(t, buf.Messages())
})
}
})
}

func TestDecimalColumnErrors(t *testing.T) {
t.Run("invalid scale", func(t *testing.T) {
buf := newTestBuffer()
dec := qdb.NewDecimalFromInt64(1, 100)
err := buf.Table(testTable).DecimalColumnScaled("price", dec).At(time.Time{}, false)
assert.ErrorContains(t, err, "decimal scale")
assert.Empty(t, buf.Messages())
})

t.Run("overflow", func(t *testing.T) {
bigVal := new(big.Int).Lsh(big.NewInt(1), 2100)
_, err := qdb.NewDecimal(bigVal, 0)
assert.ErrorContains(t, err, "exceeds 32 bytes")
})

t.Run("no column", func(t *testing.T) {
buf := newTestBuffer()
err := buf.Table(testTable).DecimalColumnShopspring("price", nil).At(time.Time{}, false)
assert.ErrorContains(t, err, "no symbols or columns were provided: invalid message")
assert.Empty(t, buf.Messages())
})
}

func TestFloat64Array1DColumn(t *testing.T) {
testCases := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions conf_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
return nil, NewInvalidConfigStrError("invalid %s value, %q is not a valid int", k, v)
}
pVersion := protocolVersion(version)
if pVersion < ProtocolVersion1 || pVersion > ProtocolVersion2 {
return nil, NewInvalidConfigStrError("current client only supports protocol version 1 (text format for all datatypes), 2 (binary format for part datatypes) or explicitly unset")
if pVersion < ProtocolVersion1 || pVersion > ProtocolVersion3 {
return nil, NewInvalidConfigStrError("current client only supports protocol version 1 (text format for all datatypes), 2 (binary format for part datatypes), 3 (decimals) or explicitly unset")
}
senderConf.protocolVersion = pVersion
}
Expand Down
Loading
Loading