Skip to content

Commit

Permalink
Support snappy and deflate codecs
Browse files Browse the repository at this point in the history
  • Loading branch information
xentripetal committed Feb 8, 2022
1 parent 66ca1f1 commit 31c4c0d
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 136 deletions.
64 changes: 55 additions & 9 deletions format/avro/ocf.go
@@ -1,20 +1,26 @@
package avro

import (
"bytes"
"compress/flate"
"encoding/binary"
"github.com/golang/snappy"
"github.com/wader/fq/format"
"github.com/wader/fq/format/avro/decoders"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/format/registry"
"github.com/wader/fq/pkg/bitio"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
"hash/crc32"
)

func init() {
registry.MustRegister(decode.Format{
Name: format.AVRO_OCF,
Description: "Avro object container file",
Groups: []string{format.PROBE},
DecodeFn: avroDecodeOCF,
DecodeFn: decodeAvroOCF,
})
}

Expand Down Expand Up @@ -62,13 +68,13 @@ func decodeHeader(d *decode.D) HeaderData {
if err != nil {
d.Fatalf("failed to parse schema: %v", err)
}
if codec, ok := meta["avro.codec"]; ok && codec != "null" {
if codec, ok := meta["avro.codec"]; ok {
headerData.Codec, ok = codec.(string)
if !ok {
d.Fatalf("avro.codec is not a string")
}
} else {
headerData.Codec = ""
headerData.Codec = "null"
}

headerData.Sync, ok = headerRecord["sync"].([]byte)
Expand All @@ -78,7 +84,7 @@ func decodeHeader(d *decode.D) HeaderData {
return headerData
}

func avroDecodeOCF(d *decode.D, in interface{}) interface{} {
func decodeAvroOCF(d *decode.D, in interface{}) interface{} {
header := decodeHeader(d)

decodeFn, err := decoders.DecodeFnForSchema(header.Schema)
Expand All @@ -92,18 +98,58 @@ func avroDecodeOCF(d *decode.D, in interface{}) interface{} {
return
}
size := d.FieldSFn("size", decoders.VarZigZag)
// Currently not supporting encodings.
if header.Codec != "" {
d.FieldRawLen("data", size*8, scalar.Description(header.Codec+" encoded"))
} else {
i := int64(0)
i := int64(0)

if header.Codec == "deflate" {
br := d.FieldRawLen("compressed", size*8)
bb := &bytes.Buffer{}
d.MustCopy(bb, flate.NewReader(bitio.NewIOReader(br)))
d.FieldArrayRootBitBufFn("data", bitio.NewBitReader(bb.Bytes(), -1), func(d *decode.D) {
for ; i < count; i++ {
decodeFn("data", d)
}
})
} else if header.Codec == "snappy" {
// Everything but last 4 bytes which are the checksum
n := (size - 4) * 8
br := d.FieldRawLen("compressed", n)
data := make([]byte, size-4)
if _, err := br.ReadBits(data, n); err != nil {
d.Fatalf("failed to read snappy compressed data: %v", err)
}

This comment has been minimized.

Copy link
@wader

wader Feb 8, 2022

Owner

Using ReadBits directly is usually a bad idea as it works the same way as io.Reader:s Read so it can return short even if there is more data to be read, for example if the underlaying bit reader is a bitio.MultiReader (decoding from a "io-list" etc) or bitio.IOBitReadSeeker (reading from a io.Reader).

But there are some helpers to hide this for example bitio.Copy, bitio.ReadFull, d.*CopyBit or you can use a bitio.IOReader in combination with io.Copy etc. It's a bit messy to go between bit and byte land but i want fq to internally use bits as much as possible.

BTW the bitio package has README.md explaining a how it works. Actually now i think about that text should probably be moved to be package documentation.

This comment has been minimized.

Copy link
@xentripetal

xentripetal Feb 10, 2022

Author Contributor

Ah yeah that makes sense. Swapped to bitio.ReadFull

Missed all those functions since I normally just go off methods for feature discovery.

decoded, err := snappy.Decode(nil, data)

This comment has been minimized.

Copy link
@wader

wader Feb 8, 2022

Owner

Snappy has no reader interface?

This comment has been minimized.

Copy link
@xentripetal

xentripetal Feb 10, 2022

Author Contributor

It does but I'm fairly sure it requires that the compressed data be in a different framing than what avro puts them as https://github.com/google/snappy/blob/main/framing_format.txt

I don't know this for certain but every time I tried to use the snappy.NewReader(...) it resulted in it erroring out with invalid data.

Edit

Yeah the docs for Reader say

// Reader handles the Snappy stream format, not the Snappy block format.

and docs for Decode say

// Decode handles the Snappy block format, not the Snappy stream format.
if err != nil {
d.Fatalf("failed to decode snappy compressed data: %v", err)
}

crc := crc32.ChecksumIEEE(decoded)
crcB := make([]byte, 4)
if d.Endian == decode.BigEndian {
binary.BigEndian.PutUint32(crcB, crc)
} else {
binary.LittleEndian.PutUint32(crcB, crc)
}
d.FieldRawLen("crc32", 4*8, d.AssertBitBuf(crcB))

This comment has been minimized.

Copy link
@wader

wader Feb 8, 2022

Owner

Wonder if can use d.ValidateUBytes here? it takes care of some endian stuff

This comment has been minimized.

Copy link
@xentripetal

xentripetal Feb 10, 2022

Author Contributor

good idea, swapped to that

d.FieldArrayRootBitBufFn("data", bitio.NewBitReader(decoded, -1), func(d *decode.D) {
for ; i < count; i++ {
decodeFn("data", d)
}
})
} else if header.Codec == "null" {
d.FieldArrayLoop("data", func() bool { return i < count }, func(d *decode.D) {
decodeFn("datum", d)
i++
})
} else {
// Unknown codec, just dump the compressed data.
d.FieldRawLen("data", size*8, scalar.Description(header.Codec+" encoded"))
}
d.FieldRawLen("sync", 16*8, d.AssertBitBuf(header.Sync))
})

return nil
}

func decodeBlock(d *decode.D, decodeFn decoders.DecodeFn) {

}
138 changes: 11 additions & 127 deletions format/avro/testdata/quickstop-deflate.fqtest
@@ -1,128 +1,12 @@
# Testcase taken from linkedin/goavro https://github.com/linkedin/goavro
$ fq dv quickstop-deflate.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: quickstop-deflate.avro (avro_ocf) 0x0-0x5835.7 (22582)
0x0000|4f 62 6a 01 |Obj. | magic: raw bits (valid) 0x0-0x3.7 (4)
| | | header{}: 0x4-0x129.7 (294)
| | | meta[0:2]: 0x4-0x119.7 (278)
| | | [0]{}: block 0x4-0x118.7 (277)
0x0000| 04 | . | count: 2 0x4-0x4.7 (1)
| | | data[0:2]: 0x5-0x118.7 (276)
| | | [0]{}: entry 0x5-0x17.7 (19)
| | | key{}: 0x5-0xf.7 (11)
0x0000| 14 | . | length: 10 0x5-0x5.7 (1)
0x0000| 61 76 72 6f 2e 63 6f 64 65 63| avro.codec| data: "avro.codec" 0x6-0xf.7 (10)
| | | value{}: 0x10-0x17.7 (8)
0x0010|0e |. | length: 7 0x10-0x10.7 (1)
0x0010| 64 65 66 6c 61 74 65 | deflate | data: "deflate" 0x11-0x17.7 (7)
| | | [1]{}: entry 0x18-0x118.7 (257)
| | | key{}: 0x18-0x23.7 (12)
0x0010| 16 | . | length: 11 0x18-0x18.7 (1)
0x0010| 61 76 72 6f 2e 73 63| avro.sc| data: "avro.schema" 0x19-0x23.7 (11)
0x0020|68 65 6d 61 |hema |
| | | value{}: 0x24-0x118.7 (245)
0x0020| e6 03 | .. | length: 243 0x24-0x25.7 (2)
0x0020| 7b 22 74 79 70 65 22 3a 22 72| {"type":"r| data: "{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\""... 0x26-0x118.7 (243)
0x0030|65 63 6f 72 64 22 2c 22 6e 61 6d 65 22 3a 22 50|ecord","name":"P|
* |until 0x118.7 (243) | |
| | | [1]{}: block 0x119-0x119.7 (1)
0x0110| 00 | . | count: 0 0x119-0x119.7 (1)
| | | data[0:0]: 0x11a-NA (0)
0x0110| 93 e7 87 9e 02 95| ......| sync: raw bits 0x11a-0x129.7 (16)
0x0120|d5 9e 4f 58 37 ad b2 a2 ce cd |..OX7..... |
| | | blocks[0:12]: 0x12a-0x5835.7 (22284)
| | | [0]{}: block 0x12a-0x9dc.7 (2227)
0x0120| b4 09 | .. | count: 602 0x12a-0x12b.7 (2)
0x0120| be 22 | ." | size: 2207 0x12c-0x12d.7 (2)
0x0120| 8d db| ..| data: raw bits (deflate encoded) 0x12e-0x9cc.7 (2207)
0x0130|6f 64 ac f9 19 c6 71 f9 37 49 8e 63 1d 55 55 b5|od....q.7I.c.UU.|
* |until 0x9cc.7 (2207) | |
0x09c0| 93 e7 87| ...| sync: raw bits (valid) 0x9cd-0x9dc.7 (16)
0x09d0|9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |.....OX7..... |
| | | [1]{}: block 0x9dd-0x1257.7 (2171)
0x09d0| a0 09 | .. | count: 592 0x9dd-0x9de.7 (2)
0x09d0| ce| .| size: 2151 0x9df-0x9e0.7 (2)
0x09e0|21 |! |
0x09e0| 8d d8 5f 44 fc 7b 1e c7 f1 bb 9f d3 ff 3f d7| .._D.{.......?.| data: raw bits (deflate encoded) 0x9e1-0x1247.7 (2151)
0x09f0|7b b1 97 e7 7b 71 f8 55 df fe fc d6 f2 1b e7 2c|{...{q.U.......,|
* |until 0x1247.7 (2151) | |
0x1240| 93 e7 87 9e 02 95 d5 9e| ........| sync: raw bits (valid) 0x1248-0x1257.7 (16)
0x1250|4f 58 37 ad b2 a2 ce cd |OX7..... |
| | | [2]{}: block 0x1258-0x1ad5.7 (2174)
0x1250| 9e 09 | .. | count: 591 0x1258-0x1259.7 (2)
0x1250| d4 21 | .! | size: 2154 0x125a-0x125b.7 (2)
0x1250| 8d d8 5f 44| .._D| data: raw bits (deflate encoded) 0x125c-0x1ac5.7 (2154)
0x1260|ec 0d 1e c7 f1 bb 63 1d c7 7a f4 ff cf 5e ec e5|......c..z...^..|
* |until 0x1ac5.7 (2154) | |
0x1ac0| 93 e7 87 9e 02 95 d5 9e 4f 58| ........OX| sync: raw bits (valid) 0x1ac6-0x1ad5.7 (16)
0x1ad0|37 ad b2 a2 ce cd |7..... |
| | | [3]{}: block 0x1ad6-0x235f.7 (2186)
0x1ad0| a0 09 | .. | count: 592 0x1ad6-0x1ad7.7 (2)
0x1ad0| ec 21 | .! | size: 2166 0x1ad8-0x1ad9.7 (2)
0x1ad0| 8d d8 7f 44 ec 7b| ...D.{| data: raw bits (deflate encoded) 0x1ada-0x234f.7 (2166)
0x1ae0|1e c7 f1 ff 8e 75 1c d7 fd bf df bf b8 df 3f 96|.....u........?.|
* |until 0x234f.7 (2166) | |
0x2350|93 e7 87 9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd|........OX7.....| sync: raw bits (valid) 0x2350-0x235f.7 (16)
| | | [4]{}: block 0x2360-0x2bda.7 (2171)
0x2360|9e 09 |.. | count: 591 0x2360-0x2361.7 (2)
0x2360| ce 21 | .! | size: 2151 0x2362-0x2363.7 (2)
0x2360| 8d d8 df 47 ec fb 1e c7 f1 bb 65 5b| ...G......e[| data: raw bits (deflate encoded) 0x2364-0x2bca.7 (2151)
0x2370|96 6d ff 01 e7 e2 dc f4 e3 db 2f 6b d5 b7 5f c7|.m......../k.._.|
* |until 0x2bca.7 (2151) | |
0x2bc0| 93 e7 87 9e 02| .....| sync: raw bits (valid) 0x2bcb-0x2bda.7 (16)
0x2bd0|95 d5 9e 4f 58 37 ad b2 a2 ce cd |...OX7..... |
| | | [5]{}: block 0x2bdb-0x3453.7 (2169)
0x2bd0| a0 09 | .. | count: 592 0x2bdb-0x2bdc.7 (2)
0x2bd0| ca 21 | .! | size: 2149 0x2bdd-0x2bde.7 (2)
0x2bd0| 8d| .| data: raw bits (deflate encoded) 0x2bdf-0x3443.7 (2149)
0x2be0|d8 d1 47 fc 7b 1e c7 f1 bb 9f f5 f3 b3 ce 1f b0|..G.{...........|
* |until 0x3443.7 (2149) | |
0x3440| 93 e7 87 9e 02 95 d5 9e 4f 58 37 ad| ........OX7.| sync: raw bits (valid) 0x3444-0x3453.7 (16)
0x3450|b2 a2 ce cd |.... |
| | | [6]{}: block 0x3454-0x3ccd.7 (2170)
0x3450| a0 09 | .. | count: 592 0x3454-0x3455.7 (2)
0x3450| cc 21 | .! | size: 2150 0x3456-0x3457.7 (2)
0x3450| 8d d8 df 47 f4 7d 1e c7| ...G.}..| data: raw bits (deflate encoded) 0x3458-0x3cbd.7 (2150)
0x3460|f1 b3 cb ba 5c d6 fd 07 ec c1 1e de df 83 e5 ba|....\...........|
* |until 0x3cbd.7 (2150) | |
0x3cb0| 93 e7| ..| sync: raw bits (valid) 0x3cbe-0x3ccd.7 (16)
0x3cc0|87 9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |......OX7..... |
| | | [7]{}: block 0x3cce-0x454c.7 (2175)
0x3cc0| 9e 09| ..| count: 591 0x3cce-0x3ccf.7 (2)
0x3cd0|d6 21 |.! | size: 2155 0x3cd0-0x3cd1.7 (2)
0x3cd0| 8d d8 df 47 ec 0f 1e c7 f1 bb 63 1d c7 fa| ...G......c...| data: raw bits (deflate encoded) 0x3cd2-0x453c.7 (2155)
0x3ce0|fe 01 7b b1 97 df cf c5 72 4e 7d fa b5 96 33 fd|..{.....rN}...3.|
* |until 0x453c.7 (2155) | |
0x4530| 93 e7 87| ...| sync: raw bits (valid) 0x453d-0x454c.7 (16)
0x4540|9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |.....OX7..... |
| | | [8]{}: block 0x454d-0x4dcd.7 (2177)
0x4540| a0 09 | .. | count: 592 0x454d-0x454e.7 (2)
0x4540| da| .| size: 2157 0x454f-0x4550.7 (2)
0x4550|21 |! |
0x4550| 8d d8 ef 47 ec 7d 1e c7 f1 7b c7 3a 8e 75 fd| ...G.}...{.:.u.| data: raw bits (deflate encoded) 0x4551-0x4dbd.7 (2157)
0x4560|01 7b e3 ba 79 7d 6f 2c e7 d4 b7 1f 67 5d 9c 71|.{..y}o,....g].q|
* |until 0x4dbd.7 (2157) | |
0x4db0| 93 e7| ..| sync: raw bits (valid) 0x4dbe-0x4dcd.7 (16)
0x4dc0|87 9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |......OX7..... |
| | | [9]{}: block 0x4dce-0x564a.7 (2173)
0x4dc0| 9e 09| ..| count: 591 0x4dce-0x4dcf.7 (2)
0x4dd0|d2 21 |.! | size: 2153 0x4dd0-0x4dd1.7 (2)
0x4dd0| 8d d8 df 47 ec fb 1e c7 f1 bb 65 5b 96 63| ...G......e[.c| data: raw bits (deflate encoded) 0x4dd2-0x563a.7 (2153)
0x4de0|ff 01 e7 e2 5c ee ef c5 61 ad fa f6 6b db ac b1|....\...a...k...|
* |until 0x563a.7 (2153) | |
0x5630| 93 e7 87 9e 02| .....| sync: raw bits (valid) 0x563b-0x564a.7 (16)
0x5640|95 d5 9e 4f 58 37 ad b2 a2 ce cd |...OX7..... |
| | | [10]{}: block 0x564b-0x580d.7 (451)
0x5640| 94 01 | .. | count: 74 0x564b-0x564c.7 (2)
0x5640| de 06 | .. | size: 431 0x564d-0x564e.7 (2)
0x5640| 8d| .| data: raw bits (deflate encoded) 0x564f-0x57fd.7 (431)
0x5650|d5 c1 2b 04 71 14 c0 f1 db a6 6d 93 3f c0 c1 d1|..+.q.....m.?...|
* |until 0x57fd.7 (431) | |
0x57f0| 93 e7| ..| sync: raw bits (valid) 0x57fe-0x580d.7 (16)
0x5800|87 9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |......OX7..... |
| | | [11]{}: block 0x580e-0x5835.7 (40)
0x5800| 02 | . | count: 1 0x580e-0x580e.7 (1)
0x5800| 2c| ,| size: 22 0x580f-0x580f.7 (1)
0x5810|7b 14 cb 15 5c 5a 90 5a c4 e6 9b 98 c7 63 68 64|{...\Z.Z.....chd| data: raw bits (deflate encoded) 0x5810-0x5825.7 (22)
0x5820|6c 62 6a 66 07 00 |lbjf.. |
0x5820| 93 e7 87 9e 02 95 d5 9e 4f 58| ........OX| sync: raw bits (valid) 0x5826-0x5835.7 (16)
0x5830|37 ad b2 a2 ce cd| |7.....| |
$ fq '.blocks[0]' quickstop-deflate.avro

This comment has been minimized.

Copy link
@wader

wader Feb 8, 2022

Owner

Hmm maybe use dv but limit somehow if 602 is a lot? dv({array_truncate: 50}) etc?

This comment has been minimized.

Copy link
@xentripetal

xentripetal Feb 10, 2022

Author Contributor

Nice! wasn't aware of that feature. Yeah the full dump is over 8k lines. Swapped to truncate

|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.blocks[0]{}:
0x0000|02 0a 44 61 6e 74 65 0a 48 69 63 6b 73 06 28 30|..Dante.Hicks.(0| data[0:602]:
* |until 0x3ff1.7 (end) (16370) | |
0x00120| b4 09 | .. | count: 602
0x00120| be 22 | ." | size: 2207
0x00120| 8d db| ..| compressed: raw bits
0x00130|6f 64 ac f9 19 c6 71 f9 37 49 8e 63 1d 55 55 b5|od....q.7I.c.UU.|
* |until 0x9cc.7 (2207) | |
0x009c0| 93 e7 87| ...| sync: raw bits (valid)
0x009d0|9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |.....OX7..... |
Binary file added format/avro/testdata/snappy.avro
Binary file not shown.
13 changes: 13 additions & 0 deletions format/avro/testdata/snappy.fqtest
@@ -0,0 +1,13 @@
# Generated using https://gist.github.com/xentripetal/c0f1645ee1abd4d25f71896c8d650543
$ fq '.blocks[0]' snappy.avro

This comment has been minimized.

Copy link
@wader

wader Feb 8, 2022

Owner

Use dv?

Btw are the data blocks suppose to be concatinated by a reader? cbor and asn1_ber does something like that

|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.blocks[0]{}:
0x000|01 0e 42 00 00 00 00 92 24 49 92 24 49 f2 3f 02|..B.....$I.$I.?.| data[0:10]:
* |until 0x307.7 (end) (776) | |
0x0420|14 |. | count: 10
0x0420| 8c 08 | .. | size: 518
0x0420| 88 06 f0 52 01 0e 42 00 00 00 00 92 24| ...R..B.....$| compressed: raw bits
0x0430|49 92 24 49 f2 3f 02 39 04 31 30 00 00 06 02 61|I.$I.?.9.10....a|
* |until 0x624.7 (514) | |
0x0620| 87 b8 fe b6 | .... | crc32: raw bits (valid)
0x0620| cc cc 61 31 fd 14 d0| ..a1...| sync: raw bits (valid)
0x0630|61 16 b6 0f 9d 30 f4 1b f0| |a....0...| |
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -28,6 +28,8 @@ require (
github.com/wader/readline v0.0.0-20220117233529-692d84ca36e2
)

require github.com/golang/snappy v0.0.4

This comment has been minimized.

Copy link
@wader

wader Feb 8, 2022

Owner

I'll add bump config and things once this gets merged


require (
github.com/itchyny/timefmt-go v0.1.3 // indirect
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
@@ -1,3 +1,5 @@
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
Expand Down

0 comments on commit 31c4c0d

Please sign in to comment.