Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Branch: master
Fetching contributors…

Cannot retrieve contributors at this time

309 lines (269 sloc) 6.163 kB
package oakmole
import (
"bufio"
"bytes"
"compress/flate"
"encoding/binary"
"errors"
"hash/crc32"
"io"
"io/ioutil"
"log"
"net"
)
const (
CompressionLevel = 5
MinimalBodyLengthForCompression = 200
)
const MagicNumber uint32 = 0x5501aaff
const (
FlagBodyDeflate = 1 << iota
FlagLocalIPV6
FlagRemoteIPV6
)
var ErrBufferIncomplete error = errors.New("Input buffer is incomplete")
var ErrInvalidLength error = errors.New("Invalid length")
var ErrMagicMismatch error = errors.New("Magic number mismatch")
var ErrChecksumMismatch error = errors.New("Checksum mismatch")
type Record struct {
Timestamp uint64
LocalIP net.IP
RemoteIP net.IP
HttpHost []byte
Body []byte
}
// Format:
// 4 magic
// 4 length of all record including magic
// 1 flags
// 7 timestamp
// 4/16 local ip
// 4/16 remote ip
// 1 length of HTTP host
// ? HTTP host
// 4 length of body
// ? body
// 4 checksum
func (r *Record) Marshal() ([]byte, error) {
// Includes timestamp, flags, ..., checksum.
// Does not include magic and size of length itself.
length := 33 + len(r.HttpHost)
flags := uint8(0)
localIP := r.LocalIP.To4()
if localIP == nil {
localIP = r.LocalIP
flags |= FlagLocalIPV6
length += 12
}
remoteIP := r.RemoteIP.To4()
if remoteIP == nil {
remoteIP = r.RemoteIP
flags |= FlagRemoteIPV6
length += 12
}
body := r.Body
if len(r.Body) > MinimalBodyLengthForCompression {
flags |= FlagBodyDeflate
buffer := new(bytes.Buffer)
deflate, err := flate.NewWriter(buffer, CompressionLevel)
if err != nil {
return nil, err
}
_, err = deflate.Write(r.Body)
if err != nil {
return nil, err
}
if err = deflate.Close(); err != nil {
return nil, err
}
body = buffer.Bytes()
}
length += len(body)
result := make([]byte, length)
// Reslicing is slower but more convenient.
b := result
binary.BigEndian.PutUint32(b, MagicNumber)
b = b[4:]
binary.BigEndian.PutUint32(b, uint32(length))
b = b[4:]
timestampAndFlags := uint64(r.Timestamp&0x00ffffffffffffff) | uint64(flags)<<56
binary.BigEndian.PutUint64(b, timestampAndFlags)
b = b[8:]
copy(b, localIP)
b = b[len(localIP):]
copy(b, remoteIP)
b = b[len(remoteIP):]
b[0] = uint8(len(r.HttpHost))
b = b[1:]
copy(b, r.HttpHost)
b = b[len(r.HttpHost):]
binary.BigEndian.PutUint32(b, uint32(len(body)))
b = b[4:]
copy(b, body)
b = b[len(body):]
// Should only have place for checksum.
if len(b) != 4 {
panic(len(b))
}
crc := crc32.NewIEEE()
crc.Write(result[:len(result)-4])
checksum := crc.Sum32()
binary.BigEndian.PutUint32(b, checksum)
return result, nil
}
func (r *Record) WriteTo(w io.Writer) (int64, error) {
b, err := r.Marshal()
if err != nil {
return 0, err
}
n, err := w.Write(b)
return int64(n), err
}
func parseHeader(b []byte) (uint32, error) {
magic := binary.BigEndian.Uint32(b)
if magic != MagicNumber {
return 0, ErrMagicMismatch
}
length := binary.BigEndian.Uint32(b[4:])
return length, nil
}
func readIP(b []byte, v6 bool) (ip net.IP) {
if v6 {
ip = make([]byte, 16)
} else {
ip = make([]byte, 4)
}
copy(ip, b)
return ip
}
func (r *Record) unmarshal(input []byte) error {
parsedChecksum := binary.BigEndian.Uint32(input[len(input)-4:])
crc := crc32.NewIEEE()
crc.Write(input[:len(input)-4])
realChecksum := crc.Sum32()
if realChecksum != parsedChecksum {
return ErrChecksumMismatch
}
// Reslicing is slower but more convenient.
b := input[8:]
timestampAndFlags := binary.BigEndian.Uint64(b)
b = b[8:]
flags := uint8(timestampAndFlags >> 56)
r.Timestamp = timestampAndFlags & 0x00ffffffffffffff
r.LocalIP = readIP(b, flags&FlagLocalIPV6 != 0)
b = b[len(r.LocalIP):]
r.RemoteIP = readIP(b, flags&FlagRemoteIPV6 != 0)
b = b[len(r.RemoteIP):]
hostLength := b[0]
b = b[1:]
r.HttpHost = make([]byte, hostLength)
copy(r.HttpHost, b)
b = b[hostLength:]
bodyLength := binary.BigEndian.Uint32(b)
b = b[4:]
if flags&FlagBodyDeflate != 0 {
compressedReader := bytes.NewReader(b[:bodyLength])
deflate := flate.NewReader(compressedReader)
var err error
r.Body, err = ioutil.ReadAll(deflate)
if err != nil {
return err
}
} else {
r.Body = make([]byte, bodyLength)
copy(r.Body, b)
}
return nil
}
// Returns rest. On errors pretends nothing was consumed.
func (r *Record) Unmarshal(input []byte) ([]byte, error) {
if len(input) < 33 {
return input, ErrInvalidLength
}
length, err := parseHeader(input)
if err != nil {
return input, err
}
if int(length) > len(input) {
return input, ErrBufferIncomplete
}
if err = r.unmarshal(input); err != nil {
return input, err
}
return input[length:], nil
}
// Will not consume any bytes on magic number mismatch.
func (r *Record) ReadFrom(reader *bufio.Reader) (n int64, err error) {
var bHeader []byte
bHeader, err = reader.Peek(8)
if err != nil {
return 0, err
}
var length uint32
length, err = parseHeader(bHeader)
if err != nil {
return 0, err
}
buffer := make([]byte, length)
var n2 int
n2, err = reader.Read(buffer)
n += int64(n2)
if err != nil {
return n, err
}
err = r.unmarshal(buffer)
return n, err
}
// Returns number of bytes scanned and error, if any.
func ReadAll(ch chan *Record, reader *bufio.Reader, stop chan bool) (pos int64, err error) {
var n int64
var b []byte
for {
select {
case <-stop:
return
default:
}
record := &Record{}
n, err = record.ReadFrom(reader)
if err == nil {
pos += n
ch <- record
} else if err == io.EOF {
return pos, nil
} else {
log.Println("ReadAll: position:", pos, "error:", err)
// Enter recovery mode.
// Try to skip 1 byte until we find magic number.
for {
select {
case <-stop:
return
default:
}
_, err = reader.ReadByte()
if err != nil {
if err == io.EOF {
return pos, nil
}
return pos, err
}
b, err = reader.Peek(4)
if err != nil {
if err == io.EOF {
return pos, nil
}
return pos, err
}
number := binary.BigEndian.Uint32(b)
if number == MagicNumber {
// Exit recovery mode.
break
}
}
}
}
return pos, nil
}
func initStorage() {
}
Jump to Line
Something went wrong with that request. Please try again.