Skip to content

Commit

Permalink
Merge pull request #1 from kklipsch/race
Browse files Browse the repository at this point in the history
eliminate data race in codec around primitive codecs
  • Loading branch information
kklipsch committed Aug 4, 2016
2 parents b9868e4 + 53ae019 commit d99039f
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 47 deletions.
84 changes: 50 additions & 34 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,42 @@ func (c codec) String() string {
return fmt.Sprintf("nm: %v, df: %v, ef: %v", c.nm, c.df, c.ef)
}

type symtab map[string]*codec // map full name to codec
// NOTE: use Go type names because for runtime resolution of
// union member, it gets the Go type name of the datum sent to
// the union encoder, and uses that string as a key into the
// encoders map
func newSymbolTable() *symtab {
return &symtab{
name: make(map[string]*codec),
nullCodec: &codec{nm: &name{n: "null"}, df: nullDecoder, ef: nullEncoder},
booleanCodec: &codec{nm: &name{n: "bool"}, df: booleanDecoder, ef: booleanEncoder},
intCodec: &codec{nm: &name{n: "int32"}, df: intDecoder, ef: intEncoder},
longCodec: longCodec(),
floatCodec: &codec{nm: &name{n: "float32"}, df: floatDecoder, ef: floatEncoder},
doubleCodec: &codec{nm: &name{n: "float64"}, df: doubleDecoder, ef: doubleEncoder},
bytesCodec: &codec{nm: &name{n: "[]uint8"}, df: bytesDecoder, ef: bytesEncoder},
stringCodec: &codec{nm: &name{n: "string"}, df: stringDecoder, ef: stringEncoder},
}

}

func longCodec() *codec {
return &codec{nm: &name{n: "int64"}, df: longDecoder, ef: longEncoder}
}

type symtab struct {
name map[string]*codec // map full name to codec

//cache primitive codecs
nullCodec *codec
booleanCodec *codec
intCodec *codec
longCodec *codec
floatCodec *codec
doubleCodec *codec
bytesCodec *codec
stringCodec *codec
}

// NewCodec creates a new object that supports both the Decode and
// Encode methods. It requires an Avro schema, expressed as a JSON
Expand Down Expand Up @@ -179,7 +214,7 @@ func NewCodec(someJSONSchema string, setters ...CodecSetter) (Codec, error) {

// each codec gets a unified namespace of symbols to
// respective codecs
st := make(symtab)
st := newSymbolTable()

newCodec, err := st.buildCodec(nullNamespace, schema)
if err != nil {
Expand Down Expand Up @@ -236,25 +271,6 @@ func (c codec) NewWriter(setters ...WriterSetter) (*Writer, error) {
return NewWriter(setters...)
}

var (
nullCodec, booleanCodec, intCodec, longCodec, floatCodec, doubleCodec, bytesCodec, stringCodec *codec
)

func init() {
// NOTE: use Go type names because for runtime resolution of
// union member, it gets the Go type name of the datum sent to
// the union encoder, and uses that string as a key into the
// encoders map
nullCodec = &codec{nm: &name{n: "null"}, df: nullDecoder, ef: nullEncoder}
booleanCodec = &codec{nm: &name{n: "bool"}, df: booleanDecoder, ef: booleanEncoder}
intCodec = &codec{nm: &name{n: "int32"}, df: intDecoder, ef: intEncoder}
longCodec = &codec{nm: &name{n: "int64"}, df: longDecoder, ef: longEncoder}
floatCodec = &codec{nm: &name{n: "float32"}, df: floatDecoder, ef: floatEncoder}
doubleCodec = &codec{nm: &name{n: "float64"}, df: doubleDecoder, ef: doubleEncoder}
bytesCodec = &codec{nm: &name{n: "[]uint8"}, df: bytesDecoder, ef: bytesEncoder}
stringCodec = &codec{nm: &name{n: "string"}, df: stringDecoder, ef: stringEncoder}
}

func (st symtab) buildCodec(enclosingNamespace string, schema interface{}) (*codec, error) {
switch schemaType := schema.(type) {
case string:
Expand Down Expand Up @@ -290,21 +306,21 @@ func (st symtab) buildMap(enclosingNamespace string, schema map[string]interface
func (st symtab) buildString(enclosingNamespace, typeName string, schema interface{}) (*codec, error) {
switch typeName {
case "null":
return nullCodec, nil
return st.nullCodec, nil
case "boolean":
return booleanCodec, nil
return st.booleanCodec, nil
case "int":
return intCodec, nil
return st.intCodec, nil
case "long":
return longCodec, nil
return st.longCodec, nil
case "float":
return floatCodec, nil
return st.floatCodec, nil
case "double":
return doubleCodec, nil
return st.doubleCodec, nil
case "bytes":
return bytesCodec, nil
return st.bytesCodec, nil
case "string":
return stringCodec, nil
return st.stringCodec, nil
case "record":
return st.makeRecordCodec(enclosingNamespace, schema)
case "enum":
Expand All @@ -320,7 +336,7 @@ func (st symtab) buildString(enclosingNamespace, typeName string, schema interfa
if err != nil {
return nil, newCodecBuildError(typeName, "could not normalize name: %q: %q: %s", enclosingNamespace, typeName, err)
}
c, ok := st[t.n]
c, ok := st.name[t.n]
if !ok {
return nil, newCodecBuildError("unknown", "unknown type name: %s", t.n)
}
Expand Down Expand Up @@ -494,7 +510,7 @@ func (st symtab) makeEnumCodec(enclosingNamespace string, schema interface{}) (*
return newEncoderError(friendlyName, "symbol not defined: %s", someString)
},
}
st[nm.n] = c
st.name[nm.n] = c
return c, nil
}

Expand Down Expand Up @@ -555,7 +571,7 @@ func (st symtab) makeFixedCodec(enclosingNamespace string, schema interface{}) (
return nil
},
}
st[nm.n] = c
st.name[nm.n] = c
return c, nil
}

Expand Down Expand Up @@ -626,7 +642,7 @@ func (st symtab) makeRecordCodec(enclosingNamespace string, schema interface{})
return nil
},
}
st[recordTemplate.Name] = c
st.name[recordTemplate.Name] = c
return c, nil
}

Expand Down Expand Up @@ -751,7 +767,7 @@ func (st symtab) makeArrayCodec(enclosingNamespace string, schema interface{}) (
return &codec{
nm: nm,
df: func(r io.Reader) (interface{}, error) {
data := make([]interface{}, 0)
var data []interface{}

someValue, err := longDecoder(r)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions ocf_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func NewReader(setters ...ReaderSetter) (*Reader, error) {
toDecompress := make(chan *readerBlock)
toDecode := make(chan *readerBlock)
fr.deblocked = make(chan Datum)
go read(fr, toDecompress)
go read(fr, longCodec(), toDecompress)
go decompress(fr, toDecompress, toDecode)
go decode(fr, toDecode)
return fr, nil
Expand Down Expand Up @@ -261,13 +261,13 @@ func newReaderError(a ...interface{}) *ErrReader {
return &ErrReader{message, err}
}

func read(fr *Reader, toDecompress chan<- *readerBlock) {
func read(fr *Reader, lCodec *codec, toDecompress chan<- *readerBlock) {
// NOTE: these variables created outside loop to reduce churn
var lr io.Reader
var bits []byte
sync := make([]byte, syncLength)

blockCount, blockSize, err := readBlockCountAndSize(fr.r)
blockCount, blockSize, err := readBlockCountAndSize(fr.r, lCodec)
if err != nil {
fr.err = err
blockCount = 0
Expand All @@ -287,22 +287,22 @@ func read(fr *Reader, toDecompress chan<- *readerBlock) {
fr.err = newReaderError(fmt.Sprintf("sync marker mismatch: %#v != %#v", sync, fr.Sync))
break
}
if blockCount, blockSize, fr.err = readBlockCountAndSize(fr.r); fr.err != nil {
if blockCount, blockSize, fr.err = readBlockCountAndSize(fr.r, lCodec); fr.err != nil {
break
}
}
close(toDecompress)
}

func readBlockCountAndSize(r io.Reader) (int, int, error) {
bc, err := longCodec.Decode(r)
func readBlockCountAndSize(r io.Reader, lcodec *codec) (int, int, error) {
bc, err := lcodec.Decode(r)
if err != nil {
if ed, ok := err.(*ErrDecoder); ok && ed.Err == io.EOF {
return 0, 0, nil // we're done
}
return 0, 0, &ErrReaderBlockCount{err}
}
bs, err := longCodec.Decode(r)
bs, err := lcodec.Decode(r)
if err != nil {
return 0, 0, &ErrReaderBlockCount{err}
}
Expand Down
2 changes: 1 addition & 1 deletion ocf_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestReaderScanShouldNotBlock(t *testing.T) {

func TestReadBlockCountAndSizeWithNothing(t *testing.T) {
bits := []byte("")
bc, bs, err := readBlockCountAndSize(bytes.NewReader(bits))
bc, bs, err := readBlockCountAndSize(bytes.NewReader(bits), longCodec())
if bc != 0 {
t.Errorf("Actual: %#v; Expected: %#v", bc, 0)
}
Expand Down
10 changes: 5 additions & 5 deletions ocf_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/golang/snappy"
)

// DefaultWriterBlockSizeo specifies the default number of datum items
// DefaultWriterBlockSize specifies the default number of datum items
// in a block when writing.
const DefaultWriterBlockSize = 10

Expand Down Expand Up @@ -267,7 +267,7 @@ func NewWriter(setters ...WriterSetter) (*Writer, error) {
go blocker(fw, fw.toBlock, toEncode)
go encoder(fw, toEncode, toCompress)
go compressor(fw, toCompress, toWrite)
go writer(fw, toWrite)
go writer(fw, longCodec(), toWrite)
return fw, nil
}

Expand Down Expand Up @@ -423,13 +423,13 @@ func compressor(fw *Writer, toCompress <-chan *writerBlock, toWrite chan<- *writ
close(toWrite)
}

func writer(fw *Writer, toWrite <-chan *writerBlock) {
func writer(fw *Writer, lcodec *codec, toWrite <-chan *writerBlock) {
for block := range toWrite {
if block.err == nil {
block.err = longCodec.Encode(fw.w, int64(len(block.items)))
block.err = lcodec.Encode(fw.w, int64(len(block.items)))
}
if block.err == nil {
block.err = longCodec.Encode(fw.w, int64(len(block.compressed)))
block.err = lcodec.Encode(fw.w, int64(len(block.compressed)))
}
if block.err == nil {
_, block.err = fw.w.Write(block.compressed)
Expand Down
115 changes: 115 additions & 0 deletions race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package goavro

import (
"bytes"
"fmt"
"testing"
)

func TestRaceCodecConstructionDecode(t *testing.T) {

recordSchemaJSON := `{"type": "long"}`
codec, _ := NewCodec(recordSchemaJSON)
comms := make(chan []byte, 1000)
done := make(chan error, 10)

go func() {

for i := 0; i < 10000; i++ {

//Completely unrelated stateful objects were causing races
if i%100 == 0 {
recordSchemaJSON := `{"type": "long"}`
NewCodec(recordSchemaJSON)
}

bb := new(bytes.Buffer)
if err := codec.Encode(bb, int64(i)); err != nil {
done <- err
return
}

comms <- bb.Bytes()
}
close(comms)
}()

go func() {
i := 0
for encoded := range comms {
bb := bytes.NewBuffer(encoded)
decoded, err := codec.Decode(bb)
if err != nil {
done <- err
return
}
result := decoded.(int64)
if result != int64(i) {
done <- fmt.Errorf("didnt match %v %v", i, result)
return
}

i++
}

close(done)
}()

err := <-done
if err != nil {
t.Fatal(err)
}

}

func TestRaceCodecConstruction(t *testing.T) {

comms := make(chan []byte, 1000)
done := make(chan error, 10)

go func() {
recordSchemaJSON := `{"type": "long"}`
codec, _ := NewCodec(recordSchemaJSON)

for i := 0; i < 10000; i++ {

bb := new(bytes.Buffer)
if err := codec.Encode(bb, int64(i)); err != nil {
done <- err
return
}

comms <- bb.Bytes()
}
close(comms)
}()

go func() {
recordSchemaJSON := `{"type": "long"}`
codec, _ := NewCodec(recordSchemaJSON)
i := 0
for encoded := range comms {
bb := bytes.NewBuffer(encoded)
decoded, err := codec.Decode(bb)
if err != nil {
done <- err
return
}
result := decoded.(int64)
if result != int64(i) {
done <- fmt.Errorf("didnt match %v %v", i, result)
return
}

i++
}

close(done)
}()

err := <-done
if err != nil {
t.Fatal(err)
}

}

0 comments on commit d99039f

Please sign in to comment.