This repository has been archived by the owner on Dec 8, 2023. It is now read-only.
forked from eiiches/go-hadoop-io
/
codec.go
123 lines (104 loc) · 2.53 KB
/
codec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package hadoop
// #cgo CFLAGS: -O3
import "C"
import (
"bytes"
"compress/bzip2"
"compress/zlib"
"io"
)
var (
Codecs map[string]Codec = map[string]Codec{
"org.apache.hadoop.io.compress.DefaultCodec": &ZlibCodec{},
"org.apache.hadoop.io.compress.Lz4Codec": &Lz4Codec{},
"org.apache.hadoop.io.compress.BZip2Codec": &Bzip2Codec{},
}
)
type Codec interface {
Uncompress(dst, src []byte) ([]byte, error)
Compress(dst, src []byte) ([]byte, error)
}
type ZlibCodec struct{}
func (c *ZlibCodec) Uncompress(dst, src []byte) ([]byte, error) {
reader, err := zlib.NewReader(bytes.NewReader(src))
if err != nil {
return nil, err
}
var buf [512]byte
for {
n, err := reader.Read(buf[:])
if err != nil && err != io.EOF {
return nil, err
}
dst = append(dst, buf[:n]...)
if err == io.EOF {
break
}
}
return dst, nil
}
func (c *ZlibCodec) Compress(dst, src []byte) ([]byte, error) {
var buf bytes.Buffer
writer := zlib.NewWriter(&buf)
_, err := writer.Write(src)
if err != nil {
return nil, err
}
err = writer.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
type Bzip2Codec struct {
}
func (c *Bzip2Codec) Compress(dst, src []byte) ([]byte, error) {
panic("Bzip2Codec Compress not implemented")
}
func (c *Bzip2Codec) Uncompress(dst, src []byte) ([]byte, error) {
reader := bzip2.NewReader(bytes.NewReader(src))
var buf [512]byte
for {
n, err := reader.Read(buf[:])
if err != nil && err != io.EOF {
return nil, err
}
dst = append(dst, buf[:n]...)
if err == io.EOF {
break
}
}
return dst, nil
}
type Lz4Codec struct {
}
func (c *Lz4Codec) Compress(dst, src []byte) ([]byte, error) {
panic("Lz4Codec Compress not implemented")
}
// func lz4DecompressSafe(in, out []byte) (int, error) {
// n := int(C.LZ4_decompress_safe((*C.char)(unsafe.Pointer(&in[0])), (*C.char)(unsafe.Pointer(&out[0])), C.int(len(in)), C.int(len(out))))
// if n < 0 {
// return 0, errors.New("corrupt input")
// }
// return n, nil
// }
func (c *Lz4Codec) Uncompress(dst, src []byte) ([]byte, error) {
panic("Lz4Codec Uncompress has been disabled / commented out")
// var iptr, optr uint
// osize := uint(binary.BigEndian.Uint32(src[0:]))
// iptr += 4
// out := make([]byte, osize)
// for optr < osize {
// iblocksize := uint(binary.BigEndian.Uint32(src[iptr:]))
// iptr += 4
// n, err := lz4DecompressSafe(src[iptr:iptr+iblocksize], out[optr:])
// if err != nil {
// return nil, err
// }
// optr += uint(n)
// iptr += iblocksize
// }
// return out, nil
}
type SnappyCodec struct {
}