Skip to content

Commit

Permalink
apacheGH-36669: [Go] Guard against garbage in C Data structures
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Jul 13, 2023
1 parent 994c73b commit d7882b5
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 7 deletions.
23 changes: 23 additions & 0 deletions go/arrow/cdata/cdata_fulltest.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,26 @@ void test_stream_schema_fallible(struct ArrowArrayStream* stream) {
stream->private_data = &kFallibleStream;
stream->release = FallibleRelease;
}

int confuse_go_gc(struct ArrowArrayStream* stream, unsigned int seed) {
struct ArrowSchema schema;
// Try to confuse the Go GC by putting what looks like a Go pointer here.
schema.name = (char*)(0xc000000000L + (rand_r(&seed) % 0x2000));
schema.format = (char*)(0xc000000000L + (rand_r(&seed) % 0x2000));
int rc = stream->get_schema(stream, &schema);
if (rc != 0) return rc;
schema.release(&schema);

while (1) {
struct ArrowArray array;
array.release = (void*)0xDEADBEEF;
int rc = stream->get_next(stream, &array);
if (rc != 0) return rc;

if (array.release == NULL) {
stream->release(stream);
break;
}
}
return 0;
}
26 changes: 26 additions & 0 deletions go/arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"io"
"runtime"
"runtime/cgo"
"sync"
"testing"
"time"
"unsafe"
Expand Down Expand Up @@ -940,3 +941,28 @@ func TestRecordReaderImportError(t *testing.T) {
}
assert.Contains(t, err.Error(), "Expected error message")
}

func TestConfuseGoGc(t *testing.T) {
// Regression test for https://github.com/apache/arrow-adbc/issues/729
reclist := arrdata.Records["primitives"]

var wg sync.WaitGroup
concurrency := 32
wg.Add(concurrency)

// XXX: this test is a bit expensive
for i := 0; i < concurrency; i++ {
go func() {
for i := 0; i < 128; i++ {
rdr, err := array.NewRecordReader(reclist[0].Schema(), reclist)
assert.NoError(t, err)
runtime.GC()
confuseGoGc(rdr)
runtime.GC()
}
wg.Done()
}()
}

wg.Wait()
}
13 changes: 13 additions & 0 deletions go/arrow/cdata/cdata_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ package cdata
// struct ArrowSchema** test_union(const char** fmts, const char** names, int64_t* flags, const int n);
// int test_exported_stream(struct ArrowArrayStream* stream);
// void test_stream_schema_fallible(struct ArrowArrayStream* stream);
// int confuse_go_gc(struct ArrowArrayStream* stream, unsigned int seed);
import "C"
import (
"errors"
"fmt"
"io"
"math/rand"
"unsafe"

"github.com/apache/arrow/go/v13/arrow"
Expand Down Expand Up @@ -350,3 +352,14 @@ func fallibleSchemaTest() error {
}
return nil
}

func confuseGoGc(reader array.RecordReader) error {
out := C.get_test_stream()
ExportRecordReader(reader, out)
rc := C.confuse_go_gc(out, C.uint(rand.Int()))
C.free(unsafe.Pointer(out))
if rc == 0 {
return nil
}
return fmt.Errorf("Exported stream test failed with return code %d", int(rc))
}
17 changes: 10 additions & 7 deletions go/arrow/cdata/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ import (
// #include <stdlib.h>
// #include "arrow/c/helpers.h"
//
// typedef const char cchar_t;
// extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
// extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
// extern const char* streamGetError(struct ArrowArrayStream*);
// extern void streamRelease(struct ArrowArrayStream*);
// typedef const char cchar_t;
// extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
// extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
// extern const char* streamGetError(struct ArrowArrayStream*);
// extern void streamRelease(struct ArrowArrayStream*);
// // XXX(https://github.com/apache/arrow-adbc/issues/729)
// int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct ArrowSchema* out);
// int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct ArrowArray* out);
//
import "C"

Expand Down Expand Up @@ -154,8 +157,8 @@ func streamRelease(handle *CArrowArrayStream) {
}

func exportStream(rdr array.RecordReader, out *CArrowArrayStream) {
out.get_schema = (*[0]byte)(C.streamGetSchema)
out.get_next = (*[0]byte)(C.streamGetNext)
out.get_schema = (*[0]byte)(C.streamGetSchemaTrampoline)
out.get_next = (*[0]byte)(C.streamGetNextTrampoline)
out.get_last_error = (*[0]byte)(C.streamGetError)
out.release = (*[0]byte)(C.streamRelease)
h := cgo.NewHandle(cRecordReader{rdr: rdr, err: nil})
Expand Down
20 changes: 20 additions & 0 deletions go/arrow/cdata/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ func ImportCRecordReader(stream *CArrowArrayStream, schema *arrow.Schema) (arrio
// the populating of the struct. Any memory allocated will be allocated using malloc
// which means that it is invisible to the Go Garbage Collector and must be freed manually
// using the callback on the CArrowSchema object.
//
// WARNING: the output ArrowSchema MUST BE ZERO INITIALIZED, or the Go garbage collector
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowSchema(schema *arrow.Schema, out *CArrowSchema) {
dummy := arrow.Field{Type: arrow.StructOf(schema.Fields()...), Metadata: schema.Metadata()}
exportField(dummy, out)
Expand All @@ -220,6 +225,11 @@ func ExportArrowSchema(schema *arrow.Schema, out *CArrowSchema) {
// The release function on the populated CArrowArray will properly decrease the reference counts,
// and release the memory if the record has already been released. But since this must be explicitly
// done, make sure it is released so that you do not create a memory leak.
//
// WARNING: the output ArrowArray MUST BE ZERO INITIALIZED, or the Go garbage collector
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowRecordBatch(rb arrow.Record, out *CArrowArray, outSchema *CArrowSchema) {
children := make([]arrow.ArrayData, rb.NumCols())
for i := range rb.Columns() {
Expand All @@ -243,6 +253,11 @@ func ExportArrowRecordBatch(rb arrow.Record, out *CArrowArray, outSchema *CArrow
// being used by the arrow.Array passed in, in order to share with zero-copy across the C
// Data Interface. See the documentation for ExportArrowRecordBatch for details on how to ensure
// you do not leak memory and prevent unwanted, undefined or strange behaviors.
//
// WARNING: the output ArrowArray MUST BE ZERO INITIALIZED, or the Go garbage collector
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
exportArray(arr, out, outSchema)
}
Expand All @@ -252,6 +267,11 @@ func ExportArrowArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema
// CArrowArrayStream takes ownership of the RecordReader until the consumer calls the release
// callback, as such it is unnecesary to call Release on the passed in reader unless it has
// previously been retained.
//
// WARNING: the output ArrowArrayStream MUST BE ZERO INITIALIZED, or the Go garbage
// collector may error at runtime, due to CGO rules ("the current implementation may
// sometimes cause a runtime error if the contents of the C memory appear to be a Go
// pointer"). You have been warned!
func ExportRecordReader(reader array.RecordReader, out *CArrowArrayStream) {
exportStream(reader, out)
}
Expand Down
34 changes: 34 additions & 0 deletions go/arrow/cdata/trampoline.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string.h>

#include "arrow/c/abi.h"

int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);

int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
// XXX(https://github.com/apache/arrow-adbc/issues/729)
memset(out, 0, sizeof(*out));
return streamGetSchema(stream, out);
}

int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct ArrowArray* out) {
// XXX(https://github.com/apache/arrow-adbc/issues/729)
memset(out, 0, sizeof(*out));
return streamGetNext(stream, out);
}

0 comments on commit d7882b5

Please sign in to comment.