Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 27, 2023
1 parent e038498 commit b33c4dd
Show file tree
Hide file tree
Showing 17 changed files with 942 additions and 654 deletions.
25 changes: 16 additions & 9 deletions ci/scripts/integration_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,27 @@ arrow_dir=${1}
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration

pip install -e $arrow_dir/dev/archery[integration]
# For C# C Data Interface testing
pip install pythonnet

# Get more detailed context on crashes
export PYTHONFAULTHANDLER=1

# --run-flight \

# Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
time archery integration \
--run-c-data \
--run-ipc \
--run-flight \
--with-cpp=1 \
--with-csharp=1 \
--with-java=1 \
--with-js=1 \
--with-java=0 \
--with-js=0 \
--with-go=1 \
--gold-dirs=$gold_dir/0.14.1 \
--gold-dirs=$gold_dir/0.17.1 \
--gold-dirs=$gold_dir/1.0.0-bigendian \
--gold-dirs=$gold_dir/1.0.0-littleendian \
--gold-dirs=$gold_dir/2.0.0-compression \
--gold-dirs=$gold_dir/4.0.0-shareddict \

# --gold-dirs=$gold_dir/0.14.1 \
# --gold-dirs=$gold_dir/0.17.1 \
# --gold-dirs=$gold_dir/1.0.0-bigendian \
# --gold-dirs=$gold_dir/1.0.0-littleendian \
# --gold-dirs=$gold_dir/2.0.0-compression \
# --gold-dirs=$gold_dir/4.0.0-shareddict \
6 changes: 3 additions & 3 deletions csharp/src/Apache.Arrow/ArrowBuffer.BitmapBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@ public BitmapBuilder Append(bool value)
/// <param name="validBits">Number of valid bits in the source span.</param>
/// <returns>Returns the builder (for fluent-style composition).</returns>
public BitmapBuilder Append(ReadOnlySpan<byte> source, int validBits)
{
{
if (!source.IsEmpty && validBits > source.Length * 8)
throw new ArgumentException($"Number of valid bits ({validBits}) cannot be greater than the the source span length ({source.Length * 8} bits).", nameof(validBits));

// Check if memory copy can be used from the source array (performance optimization for byte-aligned coping)
if (!source.IsEmpty && Length % 8 == 0)
{
EnsureAdditionalCapacity(validBits);
source.Slice(0, BitUtility.ByteCount(validBits)).CopyTo(Span.Slice(Length / 8));

Length += validBits;
SetBitCount += BitUtility.CountBits(source, 0, validBits);
}
Expand Down
3 changes: 2 additions & 1 deletion csharp/src/Apache.Arrow/ArrowBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public void Dispose()

internal bool TryExport(ExportedAllocationOwner newOwner, out IntPtr ptr)
{
if (_memoryOwner == null && IsEmpty)
if (IsEmpty)
{
// _memoryOwner could be anything (for example null or a NullMemoryOwner), but it doesn't matter here
ptr = IntPtr.Zero;
return true;
}
Expand Down
39 changes: 25 additions & 14 deletions csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Apache.Arrow.Memory;
Expand Down Expand Up @@ -59,8 +60,6 @@ public static unsafe void ExportArray(IArrowArray array, CArrowArray* cArray)
try
{
ConvertArray(allocationOwner, array.Data, cArray);
cArray->release = ReleaseArrayPtr;
cArray->private_data = FromDisposable(allocationOwner);
allocationOwner = null;
}
finally
Expand Down Expand Up @@ -102,8 +101,6 @@ public static unsafe void ExportRecordBatch(RecordBatch batch, CArrowArray* cArr
try
{
ConvertRecordBatch(allocationOwner, batch, cArray);
cArray->release = ReleaseArrayPtr;
cArray->private_data = FromDisposable(allocationOwner);
allocationOwner = null;
}
finally
Expand All @@ -118,7 +115,7 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr
cArray->offset = array.Offset;
cArray->null_count = array.NullCount;
cArray->release = ReleaseArrayPtr;
cArray->private_data = null;
cArray->private_data = MakePrivateData(sharedOwner);

cArray->n_buffers = array.Buffers?.Length ?? 0;
cArray->buffers = null;
Expand All @@ -131,7 +128,7 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr
IntPtr ptr;
if (!buffer.TryExport(sharedOwner, out ptr))
{
throw new NotSupportedException($"An ArrowArray of type {array.DataType.TypeId} could not be exported");
throw new NotSupportedException($"An ArrowArray of type {array.DataType.TypeId} could not be exported: failed on buffer #{i}");
}
cArray->buffers[i] = (byte*)ptr;
}
Expand Down Expand Up @@ -163,13 +160,17 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne
cArray->offset = 0;
cArray->null_count = 0;
cArray->release = ReleaseArrayPtr;
cArray->private_data = null;
cArray->private_data = MakePrivateData(sharedOwner);

cArray->n_buffers = 1;
cArray->buffers = (byte**)sharedOwner.Allocate(IntPtr.Size);

cArray->n_children = batch.ColumnCount;
cArray->children = null;
// XXX sharing the same ExportedAllocationOwner for all columns
// and child arrays makes memory tracking inflexible.
// If the consumer keeps only a single record batch column,
// the entire record batch memory is nevertheless kept alive.
if (cArray->n_children > 0)
{
cArray->children = (CArrowArray**)sharedOwner.Allocate(IntPtr.Size * batch.ColumnCount);
Expand All @@ -190,26 +191,36 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne
#endif
private unsafe static void ReleaseArray(CArrowArray* cArray)
{
Dispose(&cArray->private_data);
for (long i = 0; i < cArray->n_children; i++)
{
CArrowArray.Free(cArray->children[i]);
}
if (cArray->dictionary != null) {
CArrowArray.Free(cArray->dictionary);
}
DisposePrivateData(&cArray->private_data);
cArray->release = default;
}

private unsafe static void* FromDisposable(IDisposable disposable)
private unsafe static void* MakePrivateData(ExportedAllocationOwner sharedOwner)
{
GCHandle gch = GCHandle.Alloc(disposable);
GCHandle gch = GCHandle.Alloc(sharedOwner);
sharedOwner.IncRef();
return (void*)GCHandle.ToIntPtr(gch);
}

private unsafe static void Dispose(void** ptr)
private unsafe static void DisposePrivateData(void** ptr)
{
GCHandle gch = GCHandle.FromIntPtr((IntPtr)(*ptr));
GCHandle gch = GCHandle.FromIntPtr((IntPtr) (*ptr));
if (!gch.IsAllocated)
{
return;
}
((IDisposable)gch.Target).Dispose();
// We can't call IDisposable.Dispose() here as we create multiple
// GCHandles to the same object. Instead, refcounting ensures
// timely memory deallocation when all GCHandles are freed.
((ExportedAllocationOwner) gch.Target).DecRef();
gch.Free();
*ptr = null;
}
}
}
9 changes: 6 additions & 3 deletions csharp/src/Apache.Arrow/C/CArrowSchemaImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static class CArrowSchemaImporter
/// Typically, you will allocate an uninitialized CArrowSchema pointer,
/// pass that to external function, and then use this method to import
/// the result.
///
///
/// <code>
/// CArrowSchema* importedPtr = CArrowSchema.Create();
/// foreign_export_function(importedPtr);
Expand All @@ -62,7 +62,7 @@ public static unsafe ArrowType ImportType(CArrowSchema* ptr)
/// Typically, you will allocate an uninitialized CArrowSchema pointer,
/// pass that to external function, and then use this method to import
/// the result.
///
///
/// <code>
/// CArrowSchema* importedPtr = CArrowSchema.Create();
/// foreign_export_function(importedPtr);
Expand All @@ -87,7 +87,7 @@ public static unsafe Field ImportField(CArrowSchema* ptr)
/// Typically, you will allocate an uninitialized CArrowSchema pointer,
/// pass that to external function, and then use this method to import
/// the result.
///
///
/// <code>
/// CArrowSchema* importedPtr = CArrowSchema.Create();
/// foreign_export_function(importedPtr);
Expand Down Expand Up @@ -241,6 +241,9 @@ public ArrowType GetAsType()
};

string timezone = format.Substring(format.IndexOf(':') + 1);
if (timezone.Length == 0) {
timezone = null;
}
return new TimestampType(timeUnit, timezone);
}

Expand Down
20 changes: 20 additions & 0 deletions csharp/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
// limitations under the License.

using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;

namespace Apache.Arrow.Memory
{
internal sealed class ExportedAllocationOwner : INativeAllocationOwner, IDisposable
{
private readonly List<IntPtr> _pointers = new List<IntPtr>();
private int _allocationSize;
private long _referenceCount;
private bool _disposed;

~ExportedAllocationOwner()
{
Expand All @@ -47,8 +51,23 @@ public void Release(IntPtr ptr, int offset, int length)
throw new InvalidOperationException();
}

public void IncRef()
{
Interlocked.Increment(ref _referenceCount);
}

public void DecRef()
{
if (Interlocked.Decrement(ref _referenceCount) == 0) {
Dispose();
}
}

public void Dispose()
{
if (_disposed) {
return;
}
for (int i = 0; i < _pointers.Count; i++)
{
if (_pointers[i] != IntPtr.Zero)
Expand All @@ -59,6 +78,7 @@ public void Dispose()
}
GC.RemoveMemoryPressure(_allocationSize);
GC.SuppressFinalize(this);
_disposed = true;
}
}
}
2 changes: 1 addition & 1 deletion csharp/src/Apache.Arrow/Memory/NullMemoryOwner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ public void Dispose()
{
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFrameworks>net7.0</TargetFrameworks>
</PropertyGroup>

Expand All @@ -13,4 +14,4 @@
<ProjectReference Include="..\Apache.Arrow.Tests\Apache.Arrow.Tests.csproj" />
</ItemGroup>

</Project>
</Project>
79 changes: 79 additions & 0 deletions csharp/test/Apache.Arrow.IntegrationTest/CDataInterface.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.IO;
using Apache.Arrow.C;
using Apache.Arrow.Arrays;
using Apache.Arrow.Types;

namespace Apache.Arrow.IntegrationTest
{
/// <summary>
/// Bridge for C Data Interface integration testing.
/// These methods are called from the Python integration testing
/// harness provided by Archery.
/// </summary>
public static class CDataInterface
{
// Archery uses the `pythonnet` library (*) to invoke .Net DLLs.
// `pythonnet` is only able to marshal simple types such as int and
// str, which is why we provide trivial wrappers around other APIs.
//
// (*) https://pythonnet.github.io/

public static void Initialize()
{
// Allow debugging using Debug.WriteLine()
Trace.Listeners.Add(new ConsoleTraceListener());
}

public static unsafe Schema ImportSchema(long ptr)
{
return CArrowSchemaImporter.ImportSchema((CArrowSchema*) ptr);
}

public static unsafe void ExportSchema(Schema schema, long ptr)
{
CArrowSchemaExporter.ExportSchema(schema, (CArrowSchema*) ptr);
}

public static unsafe RecordBatch ImportRecordBatch(long ptr, Schema schema)
{
return CArrowArrayImporter.ImportRecordBatch((CArrowArray*) ptr, schema);
}

public static unsafe void ExportRecordBatch(RecordBatch batch, long ptr)
{
CArrowArrayExporter.ExportRecordBatch(batch, (CArrowArray*) ptr);
}

public static JsonFile ParseJsonFile(String jsonPath)
{
return JsonFile.Parse(new FileInfo(jsonPath));
}

public static long GetAllocatedBytes()
{
GC.Collect();
// XXX this doesn't seem to give stable and reliable measurements
var gcInfo = GC.GetGCMemoryInfo();
return gcInfo.PromotedBytes;
}
}
}
Loading

0 comments on commit b33c4dd

Please sign in to comment.