From 889a7027c027121ef29f3b61c829a666e51ba840 Mon Sep 17 00:00:00 2001 From: Michelangelo Partipilo Date: Wed, 4 Jun 2025 23:32:59 +0200 Subject: [PATCH 1/3] Batch Inserts --- src/Example/Program.cs | 19 +- .../Integration/Batch.cs | 41 ++++ .../Integration/Datasets.cs | 81 ++++++++ .../Integration/_Integration.cs | 11 +- src/Weaviate.Client/DataClient.cs | 175 ++++++++++++++++++ src/Weaviate.Client/Models/Batch.cs | 11 ++ src/Weaviate.Client/Models/Property.cs | 73 ++++---- src/Weaviate.Client/Models/QueryReference.cs | 11 +- src/Weaviate.Client/gRPC/Batch.cs | 15 ++ 9 files changed, 392 insertions(+), 45 deletions(-) create mode 100644 src/Weaviate.Client.Tests/Integration/Batch.cs create mode 100644 src/Weaviate.Client/Models/Batch.cs create mode 100644 src/Weaviate.Client/gRPC/Batch.cs diff --git a/src/Example/Program.cs b/src/Example/Program.cs index 6dd58b2b..4e9c3e18 100644 --- a/src/Example/Program.cs +++ b/src/Example/Program.cs @@ -97,7 +97,7 @@ static async Task Main() { Name = "Cat", Description = "Lots of Cats of multiple breeds", - Properties = Property.FromType(), + Properties = Property.FromCollection(), VectorConfig = VectorConfigs, }; @@ -108,12 +108,19 @@ static async Task Main() Console.WriteLine($"Collection: {c.Name}"); } - foreach (var cat in cats) - { - var vectors = new NamedVectors() { { "default", cat.Vector } }; + // // Normal Insertion Demo + // foreach (var cat in cats) + // { + // var vectors = new NamedVectors() { { "default", cat.Vector } }; - var inserted = await collection.Data.Insert(cat.Data, vectors: vectors); - } + // var inserted = await collection.Data.Insert(cat.Data, vectors: vectors); + // } + + // Batch Insertion Demo + var batchInsertions = await collection.Data.InsertMany(add => + { + cats.ForEach(c => add(c.Data, vectors: new() { { "default", c.Vector } })); + }); // Get all objects and sum up the counter property var result = await collection.Query.List(limit: 250); diff --git a/src/Weaviate.Client.Tests/Integration/Batch.cs b/src/Weaviate.Client.Tests/Integration/Batch.cs new file mode 100644 index 00000000..f6fa8501 --- /dev/null +++ b/src/Weaviate.Client.Tests/Integration/Batch.cs @@ -0,0 +1,41 @@ +using System.Diagnostics; +using System.Text.Json; +using Weaviate.Client.Models; +using Xunit.v3; + +namespace Weaviate.Client.Tests.Integration; + +public partial class BasicTests +{ + [Theory] + [ClassData(typeof(DatasetBatchInsertMany))] + public async Task InsertMany(string key) + { + ( + int expectedObjects, + int expectedErrors, + Action.InsertDelegate>[] batcher + ) = DatasetBatchInsertMany.Cases[key]; + + var client = await CollectionFactory( + description: "Testing Batch InsertMany", + properties: + [ + Property.For("Name"), + Property.For("Size"), + Property.For("Price"), + Property.For("IsAvailable"), + Property.For("AvailableSince"), + ] + ); + + await client.AddReference(Property.Reference("ref", client.Name)); + + var result = await client.Data.InsertMany(batcher); + + var data = await client.Query.List(references: [new("ref")]); + + Assert.Equal(expectedObjects, data.Count()); + Assert.Equal(expectedErrors, result.Count(r => r.Error != null)); + } +} diff --git a/src/Weaviate.Client.Tests/Integration/Datasets.cs b/src/Weaviate.Client.Tests/Integration/Datasets.cs index 245b305c..e425470c 100644 --- a/src/Weaviate.Client.Tests/Integration/Datasets.cs +++ b/src/Weaviate.Client.Tests/Integration/Datasets.cs @@ -86,4 +86,85 @@ public static Dictionary< public DatasetTimeFilter() : base(Cases.Keys) { } } + + public class DatasetBatchInsertMany : TheoryData + { + public static Dictionary< + string, + ( + int expectedObjects, + int expectedErrors, + Action.InsertDelegate>[] batcher + ) + > Cases => + new() + { + ["2 simple objects, no errors"] = ( + 2, + 0, + [ + add => + { + add( + new { Name = "some name" }, + vectors: new() { { "default", 1, 2, 3 } } + ); + add(new { Name = "some other name" }, id: _reusableUuids[0]); + }, + ] + ), + ["all data types"] = ( + 1, + 0, + [ + add => + { + add( + new + { + Name = "some name", + Size = 3, + Price = 10.5, + IsAvailable = true, + AvailableSince = new DateTime(2023, 1, 1), + } + ); + }, + ] + ), + ["wrong type for property"] = ( + 0, + 1, + [ + add => + { + add(new { Name = 1 }); + }, + ] + ), + ["batch with self-reference"] = ( + 5, + 0, + [ + add => + { + add(new { Name = "Name 1" }, id: _reusableUuids[0]); + add(new { Name = "Name 2" }, id: _reusableUuids[1]); + add(new { Name = "Name 3" }, id: _reusableUuids[2]); + add(new { Name = "Name 4" }, id: _reusableUuids[3]); + }, + add => + { + add( + new { Name = "Name 5" }, + references: [new ObjectReference("ref", _reusableUuids[1])] + ); + }, + ] + ), + }; + + public DatasetBatchInsertMany() + : base(Cases.Keys) { } + } } diff --git a/src/Weaviate.Client.Tests/Integration/_Integration.cs b/src/Weaviate.Client.Tests/Integration/_Integration.cs index 0b32e510..ecb42468 100644 --- a/src/Weaviate.Client.Tests/Integration/_Integration.cs +++ b/src/Weaviate.Client.Tests/Integration/_Integration.cs @@ -21,6 +21,7 @@ internal class TestDataValue public partial class BasicTests : IAsyncDisposable { const bool _deleteCollectionsAfterTest = true; + List _collections = new(); WeaviateClient _weaviate; HttpClient _httpClient; @@ -50,9 +51,9 @@ public BasicTests() public async ValueTask DisposeAsync() { - if (_deleteCollectionsAfterTest && TestContext.Current.TestMethod?.MethodName is not null) + if (_deleteCollectionsAfterTest && _collections.Count > 0) { - await _weaviate.Collections.Delete(TestContext.Current.TestMethod!.MethodName); + await Task.WhenAll(_collections.Select(c => _weaviate.Collections.Delete(c))); } _weaviate.Dispose(); @@ -73,7 +74,7 @@ async Task> CollectionFactory( name = $"{TestContext.Current.TestMethod?.MethodName ?? string.Empty}_{name}"; - properties ??= Property.FromType(); + properties ??= Property.FromCollection(); ArgumentException.ThrowIfNullOrEmpty(name); @@ -89,7 +90,7 @@ async Task> CollectionFactory( }, }; - references = references ?? []; + references ??= []; var c = new Collection { @@ -104,6 +105,8 @@ async Task> CollectionFactory( var collectionClient = await _weaviate.Collections.Create(c); + _collections.Add(name); + return collectionClient; } diff --git a/src/Weaviate.Client/DataClient.cs b/src/Weaviate.Client/DataClient.cs index a5b436d1..de6beb20 100644 --- a/src/Weaviate.Client/DataClient.cs +++ b/src/Weaviate.Client/DataClient.cs @@ -1,5 +1,7 @@ +using System.Collections.Frozen; using System.Dynamic; using System.Text.Json; +using Google.Protobuf.WellKnownTypes; using Weaviate.Client.Models; using Weaviate.Client.Rest.Dto; @@ -61,6 +63,63 @@ public static IDictionary[] MakeBeacons(params Guid[] guids) return obj; } + // Helper method to convert C# objects to protobuf Values + public static Value ConvertToValue(object obj) + { + return obj switch + { + null => Value.ForNull(), + bool b => Value.ForBool(b), + int i => Value.ForNumber(i), + long l => Value.ForNumber(l), + float f => Value.ForNumber(f), + double d => Value.ForNumber(d), + decimal dec => Value.ForNumber((double)dec), + string s => Value.ForString(s), + DateTime dt => Value.ForString(dt.ToUniversalTime().ToString("o")), + Guid uuid => Value.ForString(uuid.ToString()), + // Dictionary dict => Value.ForStruct(CreateStructFromDictionary(dict)), + // IEnumerable enumerable => CreateListValue(enumerable), + _ => throw new ArgumentException($"Unsupported type: {obj.GetType()}"), + }; + } + + private V1.BatchObject.Types.Properties BuildBatchProperties(TProps data) + { + var props = new V1.BatchObject.Types.Properties(); + + if (data is null) + { + return props; + } + + Google.Protobuf.WellKnownTypes.Struct? nonRefProps = null; + + foreach (var propertyInfo in data.GetType().GetProperties()) + { + if (!propertyInfo.CanRead) + continue; // skip non-readable properties + + var value = propertyInfo.GetValue(data); + + if (value is null) + { + continue; + } + + if (propertyInfo.PropertyType.IsNativeType()) + { + nonRefProps ??= new(); + + nonRefProps.Fields.Add(propertyInfo.Name, ConvertToValue(value)); + } + } + + props.NonRefProperties = nonRefProps; + + return props; + } + public async Task Insert( TData data, Guid? id = null, @@ -93,6 +152,122 @@ public async Task Insert( return response.Id!.Value; } + public delegate void InsertDelegate( + TData data, + Guid? id = null, + NamedVectors? vectors = null, + IEnumerable? references = null, + string? tenant = null + ); + + public async Task> InsertMany( + params BatchInsertRequest[] requests + ) + { + var objects = requests + .Select( + (r, idx) => + { + var o = new V1.BatchObject + { + Collection = _collectionName, + Uuid = (r.ID ?? Guid.NewGuid()).ToString(), + Properties = BuildBatchProperties(r.Data), + }; + + if (r.References?.Any() ?? false) + { + foreach (var reference in r.References!) + { + var strp = new Weaviate.V1.BatchObject.Types.SingleTargetRefProps() + { + PropName = reference.Name, + Uuids = { reference.TargetID.Select(id => id.ToString()) }, + }; + + o.Properties.SingleTargetRefProps.Add(strp); + } + } + + if (r.Vectors != null) + { + o.Vectors.AddRange( + r.Vectors.Select(v => new V1.Vectors + { + Name = v.Key, + VectorBytes = v.Value.ToByteString(), + }) + ); + } + + return new { Index = idx, BatchObject = o }; + } + ) + .ToList(); + + var inserts = await _client.GrpcClient.InsertMany(objects.Select(o => o.BatchObject)); + + var dictErr = inserts.Errors.ToFrozenDictionary(kv => kv.Index, kv => kv.Error); + var dictUuid = objects + .Select(o => new { o.Index, o.BatchObject.Uuid }) + .Where(o => !dictErr.ContainsKey(o.Index)) + .ToDictionary(kv => kv.Index, kv => Guid.Parse(kv.Uuid)); + + var results = new List(); + + foreach (int r in Enumerable.Range(0, objects.Count)) + { + results.Add( + new BatchInsertResponse( + Index: r, + dictUuid.TryGetValue(r, out Guid uuid) ? uuid : (Guid?)null, + dictErr.TryGetValue(r, out string? error) ? new WeaviateException(error) : null + ) + ); + } + + return results; + } + + public async Task> InsertMany( + params Action[] inserterList + ) + { + var responses = new List(); + + foreach (var inserter in inserterList) + { + IList> requests = []; + + InsertDelegate _inserter = ( + TData data, + Guid? id = null, + NamedVectors? vectors = null, + IEnumerable? references = null, + string? tenant = null + ) => + { + requests.Add(new BatchInsertRequest(data, id, vectors, references, tenant)); + }; + + inserter(_inserter); + + var response = await InsertMany([.. requests]); + + responses.AddRange( + [ + .. response.Select(r => new BatchInsertResponse( + r.Index + responses.Count, + r.ID, + r.Error + )), + ] + ); + } + + return responses; + } + public async Task Delete(Guid id) { await _client.RestClient.DeleteObject(_collectionName, id); diff --git a/src/Weaviate.Client/Models/Batch.cs b/src/Weaviate.Client/Models/Batch.cs new file mode 100644 index 00000000..dc141683 --- /dev/null +++ b/src/Weaviate.Client/Models/Batch.cs @@ -0,0 +1,11 @@ +namespace Weaviate.Client.Models; + +public record BatchInsertRequest( + TData Data, + Guid? ID = null, + NamedVectors? Vectors = null, + IEnumerable? References = null, + string? Tenant = null +); + +public record BatchInsertResponse(int Index, Guid? ID = null, WeaviateException? Error = null); diff --git a/src/Weaviate.Client/Models/Property.cs b/src/Weaviate.Client/Models/Property.cs index af37b5b0..0812323e 100644 --- a/src/Weaviate.Client/Models/Property.cs +++ b/src/Weaviate.Client/Models/Property.cs @@ -5,6 +5,7 @@ public static class DataType public static string Date { get; } = "date"; public static string GeoCoordinate { get; } = "geo"; public static string Int { get; } = "int"; + public static string Bool { get; } = "boolean"; public static string List { get; } = "list"; public static string Number { get; } = "number"; public static string Object { get; } = "object"; @@ -60,44 +61,52 @@ public static Property Number(string name) return new Property { Name = name, DataType = { Models.DataType.Number } }; } + internal static Property Bool(string name) + { + return new Property { Name = name, DataType = { Models.DataType.Bool } }; + } + public static ReferenceProperty Reference(string name, string targetCollection) { return new ReferenceProperty { Name = name, TargetCollection = targetCollection }; } + private static Property For(Type t, string name) + { + Func? f = Type.GetTypeCode(t) switch + { + TypeCode.String => Text, + TypeCode.Int16 => Int, + TypeCode.UInt16 => Int, + TypeCode.Int32 => Int, + TypeCode.UInt32 => Int, + TypeCode.Int64 => Int, + TypeCode.UInt64 => Int, + TypeCode.DateTime => Date, + TypeCode.Boolean => Bool, + TypeCode.Char => Text, + TypeCode.SByte => null, + TypeCode.Byte => null, + TypeCode.Single => Number, + TypeCode.Double => Number, + TypeCode.Decimal => Number, + TypeCode.Empty => null, + TypeCode.Object => null, + TypeCode.DBNull => null, + _ => null, + }; + + return f!(name); + } + + public static Property For(string name) + { + return For(typeof(TField), name); + } + // Extract collection properties from type specified by TData. - public static IList FromType() + public static IList FromCollection() { - return - [ - .. typeof(TData) - .GetProperties() - .Select(x => - Type.GetTypeCode(x.PropertyType) switch - { - TypeCode.String => Property.Text(x.Name), - TypeCode.Int16 => Property.Int(x.Name), - TypeCode.UInt16 => Property.Int(x.Name), - TypeCode.Int32 => Property.Int(x.Name), - TypeCode.UInt32 => Property.Int(x.Name), - TypeCode.Int64 => Property.Int(x.Name), - TypeCode.UInt64 => Property.Int(x.Name), - TypeCode.DateTime => Property.Date(x.Name), - TypeCode.Boolean => null, - TypeCode.Char => null, - TypeCode.SByte => null, - TypeCode.Byte => null, - TypeCode.Single => null, - TypeCode.Double => null, - TypeCode.Decimal => null, - TypeCode.Empty => null, - TypeCode.Object => null, - TypeCode.DBNull => null, - _ => null, - } - ) - .Where(p => p is not null) - .Select(p => p!), - ]; + return [.. typeof(TData).GetProperties().Select(x => For(x.PropertyType, x.Name))]; } } diff --git a/src/Weaviate.Client/Models/QueryReference.cs b/src/Weaviate.Client/Models/QueryReference.cs index 6c691c95..73662bd9 100644 --- a/src/Weaviate.Client/Models/QueryReference.cs +++ b/src/Weaviate.Client/Models/QueryReference.cs @@ -7,11 +7,16 @@ public record QueryReference public MetadataQuery? Metadata { get; init; } public IList? References { get; init; } - public QueryReference(string linkOn, string[] fields, MetadataQuery? metadata = null, params QueryReference[]? references) + public QueryReference( + string linkOn, + string[]? fields = null, + MetadataQuery? metadata = null, + params QueryReference[]? references + ) { LinkOn = linkOn; - Fields = fields; + Fields = fields ?? []; Metadata = metadata; References = references; } -} \ No newline at end of file +} diff --git a/src/Weaviate.Client/gRPC/Batch.cs b/src/Weaviate.Client/gRPC/Batch.cs new file mode 100644 index 00000000..f834a896 --- /dev/null +++ b/src/Weaviate.Client/gRPC/Batch.cs @@ -0,0 +1,15 @@ +using Weaviate.V1; + +namespace Weaviate.Client.Grpc; + +public partial class WeaviateGrpcClient +{ + internal async Task InsertMany(IEnumerable objects) + { + var request = new BatchObjectsRequest { Objects = { objects } }; + + BatchObjectsReply reply = await _grpcClient.BatchObjectsAsync(request); + + return reply; + } +} From 1f53b516eb66610ba5e23b978bbaa3d4f38691a9 Mon Sep 17 00:00:00 2001 From: Michelangelo Partipilo Date: Thu, 5 Jun 2025 09:38:59 +0200 Subject: [PATCH 2/3] Assert references and link to multiple objects --- .../Integration/Batch.cs | 7 ++++ .../Integration/Datasets.cs | 39 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/Weaviate.Client.Tests/Integration/Batch.cs b/src/Weaviate.Client.Tests/Integration/Batch.cs index f6fa8501..ab3ea574 100644 --- a/src/Weaviate.Client.Tests/Integration/Batch.cs +++ b/src/Weaviate.Client.Tests/Integration/Batch.cs @@ -14,6 +14,8 @@ public async Task InsertMany(string key) ( int expectedObjects, int expectedErrors, + int expectedReferences, + int expectedReferencedObjects, Action.InsertDelegate>[] batcher ) = DatasetBatchInsertMany.Cases[key]; @@ -37,5 +39,10 @@ public async Task InsertMany(string key) Assert.Equal(expectedObjects, data.Count()); Assert.Equal(expectedErrors, result.Count(r => r.Error != null)); + Assert.Equal(expectedReferences, data.Count(r => r.References.Any())); + Assert.Equal( + expectedReferencedObjects, + data.Select(d => d.References.Sum(r => r.Value.Count)).Sum() + ); } } diff --git a/src/Weaviate.Client.Tests/Integration/Datasets.cs b/src/Weaviate.Client.Tests/Integration/Datasets.cs index e425470c..3bd4a0d1 100644 --- a/src/Weaviate.Client.Tests/Integration/Datasets.cs +++ b/src/Weaviate.Client.Tests/Integration/Datasets.cs @@ -94,6 +94,8 @@ public static Dictionary< ( int expectedObjects, int expectedErrors, + int expectedReferences, + int expectedReferencedObjects, Action.InsertDelegate>[] batcher ) > Cases => @@ -102,6 +104,8 @@ public static Dictionary< ["2 simple objects, no errors"] = ( 2, 0, + 0, + 0, [ add => { @@ -116,6 +120,8 @@ public static Dictionary< ["all data types"] = ( 1, 0, + 0, + 0, [ add => { @@ -135,6 +141,8 @@ public static Dictionary< ["wrong type for property"] = ( 0, 1, + 0, + 0, [ add => { @@ -145,6 +153,8 @@ public static Dictionary< ["batch with self-reference"] = ( 5, 0, + 1, + 1, [ add => { @@ -162,6 +172,35 @@ public static Dictionary< }, ] ), + ["batch with multiple self-references"] = ( + 5, + 0, + 1, + 2, + [ + add => + { + add(new { Name = "Name 1" }, id: _reusableUuids[0]); + add(new { Name = "Name 2" }, id: _reusableUuids[1]); + add(new { Name = "Name 3" }, id: _reusableUuids[2]); + add(new { Name = "Name 4" }, id: _reusableUuids[3]); + }, + add => + { + add( + new { Name = "Name 5" }, + references: + [ + new ObjectReference( + "ref", + _reusableUuids[1], + _reusableUuids[2] + ), + ] + ); + }, + ] + ), }; public DatasetBatchInsertMany() From 9458e620bc327f33ac9df8ae6c1f7b6779743234 Mon Sep 17 00:00:00 2001 From: Michelangelo Partipilo Date: Thu, 5 Jun 2025 09:57:55 +0200 Subject: [PATCH 3/3] Test cases with complex references --- .../Integration/Batch.cs | 6 ++-- .../Integration/Datasets.cs | 34 +++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/Weaviate.Client.Tests/Integration/Batch.cs b/src/Weaviate.Client.Tests/Integration/Batch.cs index ab3ea574..13dd802b 100644 --- a/src/Weaviate.Client.Tests/Integration/Batch.cs +++ b/src/Weaviate.Client.Tests/Integration/Batch.cs @@ -1,7 +1,4 @@ -using System.Diagnostics; -using System.Text.Json; using Weaviate.Client.Models; -using Xunit.v3; namespace Weaviate.Client.Tests.Integration; @@ -32,10 +29,11 @@ public async Task InsertMany(string key) ); await client.AddReference(Property.Reference("ref", client.Name)); + await client.AddReference(Property.Reference("ref2", client.Name)); var result = await client.Data.InsertMany(batcher); - var data = await client.Query.List(references: [new("ref")]); + var data = await client.Query.List(references: [new("ref"), new("ref2")]); Assert.Equal(expectedObjects, data.Count()); Assert.Equal(expectedErrors, result.Count(r => r.Error != null)); diff --git a/src/Weaviate.Client.Tests/Integration/Datasets.cs b/src/Weaviate.Client.Tests/Integration/Datasets.cs index 3bd4a0d1..ee7c4f67 100644 --- a/src/Weaviate.Client.Tests/Integration/Datasets.cs +++ b/src/Weaviate.Client.Tests/Integration/Datasets.cs @@ -201,6 +201,40 @@ public static Dictionary< }, ] ), + ["batch with multiple self-reference properties"] = ( + 7, + 0, + 3, + 4, + [ + add => + { + add(new { Name = "Name 1" }, id: _reusableUuids[0]); + add(new { Name = "Name 2" }, id: _reusableUuids[1]); + add(new { Name = "Name 3" }, id: _reusableUuids[2]); + add(new { Name = "Name 4" }, id: _reusableUuids[3]); + }, + add => + { + add( + new { Name = "Name 5" }, + references: [new("ref", _reusableUuids[1])] + ); + add( + new { Name = "Name 6" }, + references: [new("ref2", _reusableUuids[2])] + ); + add( + new { Name = "Name 7" }, + references: + [ + new("ref", _reusableUuids[1]), + new("ref2", _reusableUuids[2]), + ] + ); + }, + ] + ), }; public DatasetBatchInsertMany()