Skip to content

Commit

Permalink
apacheGH-37789: [Integration][Go] Go C Data Interface integration tes…
Browse files Browse the repository at this point in the history
…ting (apache#37788)

### Rationale for this change

We want to enable integration testing of the Arrow Go implementation of the C Data Interface, so as to ensure interoperability.

### What changes are included in this PR?

1. Enable C Data Interface integration testing for the Arrow Go implementation
2. Fix compatibility issues found by the integration tests

### Are these changes tested?

Yes, by construction.

### Are there any user-facing changes?

Bugfixes in the Arrow Go C Data Interface implementation.

* Closes: apache#37789

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou authored and loicalleyne committed Nov 13, 2023
1 parent 6c31a10 commit 197a169
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ jobs:
name: AMD64 Windows 2019 Go ${{ matrix.go }}
runs-on: windows-2019
if: ${{ !contains(github.event.pull_request.title, 'WIP') }}
timeout-minutes: 15
timeout-minutes: 25
strategy:
fail-fast: false
matrix:
Expand Down
19 changes: 19 additions & 0 deletions ci/scripts/go_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,22 @@ pushd ${source_dir}/parquet
go install -v ./...

popd

if [[ -n "${ARROW_GO_INTEGRATION}" ]]; then
pushd ${source_dir}/arrow/internal/cdata_integration

case "$(uname)" in
Linux)
go_lib="arrow_go_integration.so"
;;
Darwin)
go_lib="arrow_go_integration.so"
;;
MINGW*)
go_lib="arrow_go_integration.dll"
;;
esac
go build -tags cdata_integration,assert -buildmode=c-shared -o ${go_lib} .

popd
fi
7 changes: 6 additions & 1 deletion dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self, json_files,
self.serial = serial
self.gold_dirs = gold_dirs
self.failures: List[Outcome] = []
self.skips: List[Outcome] = []
self.match = match

if self.match is not None:
Expand Down Expand Up @@ -207,6 +208,8 @@ def case_wrapper(test_case):
self.failures.append(outcome.failure)
if self.stop_on_error:
break
elif outcome.skipped:
self.skips.append(outcome)

else:
with ThreadPoolExecutor() as executor:
Expand All @@ -215,6 +218,8 @@ def case_wrapper(test_case):
self.failures.append(outcome.failure)
if self.stop_on_error:
break
elif outcome.skipped:
self.skips.append(outcome)

def _compare_ipc_implementations(
self,
Expand Down Expand Up @@ -638,7 +643,7 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
log(f'{exc_type}: {exc_value}')
log()

log(fail_count, "failures")
log(f"{fail_count} failures, {len(runner.skips)} skips")
if fail_count > 0:
sys.exit(1)

Expand Down
134 changes: 133 additions & 1 deletion dev/archery/archery/integration/tester_go.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
# under the License.

import contextlib
import functools
import os
import subprocess

from .tester import Tester
from . import cdata
from .tester import Tester, CDataExporter, CDataImporter
from .util import run_cmd, log
from ..utils.source import ARROW_ROOT_DEFAULT


# FIXME(sbinet): revisit for Go modules
Expand All @@ -39,12 +42,21 @@
"localhost",
]

_dll_suffix = ".dll" if os.name == "nt" else ".so"

_DLL_PATH = os.path.join(
ARROW_ROOT_DEFAULT,
"go/arrow/internal/cdata_integration")
_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" + _dll_suffix)


class GoTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
C_DATA_EXPORTER = True
C_DATA_IMPORTER = True

name = 'Go'

Expand Down Expand Up @@ -119,3 +131,123 @@ def flight_request(self, port, json_path=None, scenario_name=None):
if self.debug:
log(' '.join(cmd))
run_cmd(cmd)

def make_c_data_exporter(self):
return GoCDataExporter(self.debug, self.args)

def make_c_data_importer(self):
return GoCDataImporter(self.debug, self.args)


_go_c_data_entrypoints = """
const char* ArrowGo_ExportSchemaFromJson(const char* json_path,
uintptr_t out);
const char* ArrowGo_ImportSchemaAndCompareToJson(
const char* json_path, uintptr_t c_schema);
const char* ArrowGo_ExportBatchFromJson(const char* json_path,
int num_batch,
uintptr_t out);
const char* ArrowGo_ImportBatchAndCompareToJson(
const char* json_path, int num_batch, uintptr_t c_array);
int64_t ArrowGo_BytesAllocated();
void ArrowGo_RunGC();
void ArrowGo_FreeError(const char*);
"""


@functools.lru_cache
def _load_ffi(ffi, lib_path=_INTEGRATION_DLL):
ffi.cdef(_go_c_data_entrypoints)
dll = ffi.dlopen(lib_path)
return dll


class _CDataBase:

def __init__(self, debug, args):
self.debug = debug
self.args = args
self.ffi = cdata.ffi()
self.dll = _load_ffi(self.ffi)

def _pointer_to_int(self, c_ptr):
return self.ffi.cast('uintptr_t', c_ptr)

def _check_go_error(self, go_error):
"""
Check a `const char*` error return from an integration entrypoint.
A null means success, a non-empty string is an error message.
The string is dynamically allocated on the Go side.
"""
assert self.ffi.typeof(go_error) is self.ffi.typeof("const char*")
if go_error != self.ffi.NULL:
try:
error = self.ffi.string(go_error).decode('utf8',
errors='replace')
raise RuntimeError(
f"Go C Data Integration call failed: {error}")
finally:
self.dll.ArrowGo_FreeError(go_error)

def _run_gc(self):
self.dll.ArrowGo_RunGC()


class GoCDataExporter(CDataExporter, _CDataBase):
# Note: the Arrow Go C Data export functions expect their output
# ArrowStream or ArrowArray argument to be zero-initialized.
# This is currently ensured through the use of `ffi.new`.

def export_schema_from_json(self, json_path, c_schema_ptr):
go_error = self.dll.ArrowGo_ExportSchemaFromJson(
str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
self._check_go_error(go_error)

def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
go_error = self.dll.ArrowGo_ExportBatchFromJson(
str(json_path).encode(), num_batch,
self._pointer_to_int(c_array_ptr))
self._check_go_error(go_error)

@property
def supports_releasing_memory(self):
return True

def record_allocation_state(self):
self._run_gc()
return self.dll.ArrowGo_BytesAllocated()

def compare_allocation_state(self, recorded, gc_until):
def pred():
return self.record_allocation_state() == recorded

return gc_until(pred)


class GoCDataImporter(CDataImporter, _CDataBase):

def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
go_error = self.dll.ArrowGo_ImportSchemaAndCompareToJson(
str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
self._check_go_error(go_error)

def import_batch_and_compare_to_json(self, json_path, num_batch,
c_array_ptr):
go_error = self.dll.ArrowGo_ImportBatchAndCompareToJson(
str(json_path).encode(), num_batch,
self._pointer_to_int(c_array_ptr))
self._check_go_error(go_error)

@property
def supports_releasing_memory(self):
return True

def gc_until(self, predicate):
for i in range(10):
if predicate():
return True
self._run_gc()
return False
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,7 @@ services:
<<: [*common, *ccache]
# tell archery where the arrow binaries are located
ARROW_CPP_EXE_PATH: /build/cpp/debug
ARROW_GO_INTEGRATION: 1
ARCHERY_INTEGRATION_WITH_RUST: 0
command:
["/arrow/ci/scripts/rust_build.sh /arrow /build &&
Expand Down
2 changes: 1 addition & 1 deletion go/arrow/cdata/cdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) {

// handle types with params via colon
typs := strings.Split(f, ":")
defaulttz := "UTC"
defaulttz := ""
switch typs[0] {
case "tss":
tz := typs[1]
Expand Down
38 changes: 20 additions & 18 deletions go/arrow/cdata/cdata_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,34 +368,36 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
exportField(arrow.Field{Type: arr.DataType()}, outSchema)
}

nbuffers := len(arr.Data().Buffers())
buf_offset := 0
// Some types don't have validity bitmaps, but we keep them shifted
// to make processing easier in other contexts. This means that
// we have to adjust when exporting.
has_validity_bitmap := internal.DefaultHasValidityBitmap(arr.DataType().ID())
if nbuffers > 0 && !has_validity_bitmap {
nbuffers--
buf_offset++
}

out.dictionary = nil
out.null_count = C.int64_t(arr.NullN())
out.length = C.int64_t(arr.Len())
out.offset = C.int64_t(arr.Data().Offset())
out.n_buffers = C.int64_t(len(arr.Data().Buffers()))

if out.n_buffers > 0 {
var (
nbuffers = len(arr.Data().Buffers())
bufs = arr.Data().Buffers()
)
// unions don't have validity bitmaps, but we keep them shifted
// to make processing easier in other contexts. This means that
// we have to adjust for union arrays
if !internal.DefaultHasValidityBitmap(arr.DataType().ID()) {
out.n_buffers--
nbuffers--
bufs = bufs[1:]
}
out.n_buffers = C.int64_t(nbuffers)
out.buffers = nil

if nbuffers > 0 {
bufs := arr.Data().Buffers()
buffers := allocateBufferPtrArr(nbuffers)
for i := range bufs {
buf := bufs[i]
for i, buf := range bufs[buf_offset:] {
if buf == nil || buf.Len() == 0 {
if i > 0 || !internal.DefaultHasValidityBitmap(arr.DataType().ID()) {
if i > 0 || !has_validity_bitmap {
// apache/arrow#33936: export a dummy buffer to be friendly to
// implementations that don't import NULL properly
buffers[i] = (*C.void)(unsafe.Pointer(&C.kGoCdataZeroRegion))
} else {
// null pointer permitted for the validity bitmap
// (assuming null count is 0)
buffers[i] = nil
}
continue
Expand Down
12 changes: 8 additions & 4 deletions go/arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,17 @@ func TestImportTemporalSchema(t *testing.T) {
{arrow.FixedWidthTypes.MonthInterval, "tiM"},
{arrow.FixedWidthTypes.DayTimeInterval, "tiD"},
{arrow.FixedWidthTypes.MonthDayNanoInterval, "tin"},
{arrow.FixedWidthTypes.Timestamp_s, "tss:"},
{arrow.FixedWidthTypes.Timestamp_s, "tss:UTC"},
{&arrow.TimestampType{Unit: arrow.Second}, "tss:"},
{&arrow.TimestampType{Unit: arrow.Second, TimeZone: "Europe/Paris"}, "tss:Europe/Paris"},
{arrow.FixedWidthTypes.Timestamp_ms, "tsm:"},
{arrow.FixedWidthTypes.Timestamp_ms, "tsm:UTC"},
{&arrow.TimestampType{Unit: arrow.Millisecond}, "tsm:"},
{&arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "Europe/Paris"}, "tsm:Europe/Paris"},
{arrow.FixedWidthTypes.Timestamp_us, "tsu:"},
{arrow.FixedWidthTypes.Timestamp_us, "tsu:UTC"},
{&arrow.TimestampType{Unit: arrow.Microsecond}, "tsu:"},
{&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "Europe/Paris"}, "tsu:Europe/Paris"},
{arrow.FixedWidthTypes.Timestamp_ns, "tsn:"},
{arrow.FixedWidthTypes.Timestamp_ns, "tsn:UTC"},
{&arrow.TimestampType{Unit: arrow.Nanosecond}, "tsn:"},
{&arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "Europe/Paris"}, "tsn:Europe/Paris"},
}

Expand Down
10 changes: 10 additions & 0 deletions go/arrow/internal/arrjson/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (r *Reader) Release() {
r.recs[i] = nil
}
}
r.memo.Clear()
r.memo = nil
}
}
func (r *Reader) Schema() *arrow.Schema { return r.schema }
Expand All @@ -96,6 +98,14 @@ func (r *Reader) Read() (arrow.Record, error) {
return rec, nil
}

func (r *Reader) ReadAt(index int) (arrow.Record, error) {
if index >= r.NumRecords() {
return nil, io.EOF
}
rec := r.recs[index]
return rec, nil
}

var (
_ arrio.Reader = (*Reader)(nil)
)

0 comments on commit 197a169

Please sign in to comment.