Skip to content
Merged
118 changes: 118 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The produced Mango JSON:
* [Indexing](#indexing)
* [Index Options](#index-options)
* [Partial Indexes](#partial-indexes)
* [Partitioned Databases](#partitioned-databases)
* [Database Splitting](#database-splitting)
* [Views](#views)
* [Local (non-replicating) Documents](#local-(non-replicating)-documents)
Expand Down Expand Up @@ -444,11 +445,48 @@ var filter = ChangesFeedFilter.Selector<Rebel>(rebel => rebel.Age == 19);
var filter = ChangesFeedFilter.Design();
// _view
var filter = ChangesFeedFilter.View(view);
// Design document filter with custom query parameters
var filter = ChangesFeedFilter.DesignDocument("replication/by_partition",
new Dictionary<string, string> { { "partition", "skywalker" } });

// Use
ChangesFeedResponse<Rebel> changes = await GetChangesAsync(options: null, filter);
```

#### Design Document Filters with Query Parameters

For partitioned databases or custom filtering logic, you can use design document filters with query parameters:

```csharp
// Create a design document in CouchDB with a filter function
// _design/replication
{
"filters": {
"by_partition": function(doc, req) {
var partition = req.query.partition;
return doc._id.indexOf(partition + ':') === 0;
}
}
}

// Use the filter with query parameters
var filter = ChangesFeedFilter.DesignDocument("replication/by_partition",
new Dictionary<string, string> { { "partition", "businessId123" } });

await foreach (var change in db.GetContinuousChangesAsync(null, filter, cancellationToken))
{
// Process changes from specific partition
}

// Or pass query parameters via options
var options = new ChangesFeedOptions
{
Filter = "replication/by_partition",
QueryParameters = new Dictionary<string, string> { { "partition", "businessId123" } }
};
var changes = await db.GetChangesAsync(options);
```

## Indexing

It is possible to create indexes to use when querying.
Expand Down Expand Up @@ -515,6 +553,86 @@ public class MyDeathStarContext : CouchContext
}
```

## Partitioned Databases

CouchDB partitioned databases allow you to optimize query performance by grouping related documents together using a partition key. This feature is supported in CouchDB 3.0+.

### Creating a Partitioned Database

```csharp
// Create a partitioned database
var rebels = await client.CreateDatabaseAsync<Rebel>("rebels", partitioned: true);

// Or with GetOrCreateDatabaseAsync
var rebels = await client.GetOrCreateDatabaseAsync<Rebel>("rebels", partitioned: true);
```

### Partitioned Document IDs

In partitioned databases, document IDs must follow the format: `{partition_key}:{document_id}`

```csharp
var luke = new Rebel
{
Id = "skywalker:luke", // partition key is "skywalker"
Name = "Luke",
Surname = "Skywalker"
};
await rebels.AddAsync(luke);
```

### Getting Partition Information

```csharp
// Get metadata about a specific partition
var partitionInfo = await rebels.GetPartitionInfoAsync("skywalker");
Console.WriteLine($"Documents in partition: {partitionInfo.DocCount}");
Console.WriteLine($"Partition size: {partitionInfo.Sizes.Active} bytes");
```

### Querying Partitioned Databases

Partition-specific queries are more efficient as they only scan documents within the partition:

```csharp
// Query a partition using Mango selector
var skywalkers = await rebels.QueryPartitionAsync("skywalker", new
{
selector = new { name = new { $gt = "A" } },
sort = new[] { "name" }
});

// Or with JSON string
var json = "{\"selector\": {\"name\": {\"$gt\": \"A\"}}}";
var results = await rebels.QueryPartitionAsync("skywalker", json);

// Get all documents in a partition
var allSkywalkers = await rebels.GetPartitionAllDocsAsync("skywalker");
```

### Checking if Database is Partitioned

```csharp
var dbInfo = await rebels.GetInfoAsync();
bool isPartitioned = dbInfo.Props?.Partitioned ?? false;
```

### Partitioned Indexes

When creating indexes for partitioned databases, specify whether the index should be partitioned or global:

```csharp
// Create a partitioned index (default for partitioned databases)
await rebels.CreateIndexAsync("name_index",
b => b.IndexBy(r => r.Name),
new IndexOptions { Partitioned = true });

// Create a global index (queries across all partitions)
await rebels.CreateIndexAsync("global_index",
b => b.IndexBy(r => r.Age),
new IndexOptions { Partitioned = false });
```

## Database Splitting

It is possible to use the same database for multiple types:
Expand Down
13 changes: 13 additions & 0 deletions src/CouchDB.Driver/ChangesFeed/ChangesFeedFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,18 @@ public static ChangesFeedFilter Design()
/// <returns></returns>
public static ChangesFeedFilter View(string view)
=> new ViewChangesFeedFilter(view);

/// <summary>
/// Create a filter using a design document filter function with optional query parameters.
/// </summary>
/// <remarks>
/// The filter function in the design document can access query parameters via <c>req.query</c>.
/// This is useful for passing partition keys or other custom filtering parameters.
/// </remarks>
/// <param name="filterName">The filter name in format "designdoc/filtername" (e.g., "replication/by_partition").</param>
/// <param name="queryParameters">Optional query parameters to pass to the filter function.</param>
/// <returns></returns>
public static ChangesFeedFilter DesignDocument(string filterName, Dictionary<string, string>? queryParameters = null)
=> new DesignDocumentChangesFeedFilter(filterName, queryParameters);
}
}
35 changes: 35 additions & 0 deletions src/CouchDB.Driver/ChangesFeed/ChangesFeedFilterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public static async Task<ChangesFeedResponse<TSource>> QueryWithFilterAsync<TSou
.GetJsonAsync<ChangesFeedResponse<TSource>>(cancellationToken)
.ConfigureAwait(false);
}

if (filter is DesignDocumentChangesFeedFilter designDocFilter)
{
var req = ApplyDesignDocumentFilterParams(request, designDocFilter);

return await req
.GetJsonAsync<ChangesFeedResponse<TSource>>(cancellationToken)
.ConfigureAwait(false);
}

throw new InvalidOperationException($"Filter of type {filter.GetType().Name} not supported.");
}

Expand Down Expand Up @@ -99,7 +109,32 @@ public static async Task<Stream> QueryContinuousWithFilterAsync<TSource>(this IF
.GetStreamAsync(cancellationToken, HttpCompletionOption.ResponseHeadersRead)
.ConfigureAwait(false);
}

if (filter is DesignDocumentChangesFeedFilter designDocFilter)
{
var req = ApplyDesignDocumentFilterParams(request, designDocFilter);

return await req
.GetStreamAsync(cancellationToken, HttpCompletionOption.ResponseHeadersRead)
.ConfigureAwait(false);
}

throw new InvalidOperationException($"Filter of type {filter.GetType().Name} not supported.");
}

private static IFlurlRequest ApplyDesignDocumentFilterParams(IFlurlRequest request, DesignDocumentChangesFeedFilter filter)
{
var req = request.SetQueryParam("filter", filter.FilterName);

if (filter.QueryParameters != null)
{
foreach (var param in filter.QueryParameters)
{
req = req.SetQueryParam(param.Key, param.Value);
}
}

return req;
}
}
}
13 changes: 12 additions & 1 deletion src/CouchDB.Driver/ChangesFeed/ChangesFeedOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ο»Ώusing System.ComponentModel;
ο»Ώusing System.Collections.Generic;
using System.ComponentModel;
using Newtonsoft.Json;

namespace CouchDB.Driver.ChangesFeed
Expand Down Expand Up @@ -128,5 +129,15 @@ public class ChangesFeedOptions
[JsonProperty("seq_interval")]
[DefaultValue(null)]
public int? SeqInterval { get; set; }

/// <summary>
/// Custom query parameters to pass to design document filter functions.
/// </summary>
/// <remarks>
/// These parameters are accessible in design document filter functions via <c>req.query</c>.
/// Useful for passing partition keys or other custom filtering parameters to filter functions.
/// </remarks>
[JsonIgnore]
public Dictionary<string, string>? QueryParameters { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;

namespace CouchDB.Driver.ChangesFeed.Filters
{
internal class DesignDocumentChangesFeedFilter : ChangesFeedFilter
{
public string FilterName { get; }
public Dictionary<string, string>? QueryParameters { get; }

public DesignDocumentChangesFeedFilter(string filterName, Dictionary<string, string>? queryParameters = null)
{
if (string.IsNullOrWhiteSpace(filterName))
{
throw new ArgumentException("Filter name cannot be null or empty.", nameof(filterName));
}

FilterName = filterName;
QueryParameters = queryParameters;
}
}
}
113 changes: 105 additions & 8 deletions src/CouchDB.Driver/CouchDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,7 @@ public async Task<ChangesFeedResponse<TSource>> GetChangesAsync(ChangesFeedOptio
_ = request.SetQueryParam("feed", "longpoll");
}

if (options != null)
{
request = request.ApplyQueryParametersOptions(options);
}
request = ApplyChangesFeedOptions(request, options);

ChangesFeedResponse<TSource>? response = filter == null
? await request.GetJsonAsync<ChangesFeedResponse<TSource>>(cancellationToken)
Expand Down Expand Up @@ -458,10 +455,7 @@ public async IAsyncEnumerable<ChangesFeedResponseResult<TSource>> GetContinuousC
.AppendPathSegment("_changes")
.SetQueryParam("feed", "continuous");

if (options != null)
{
request = request.ApplyQueryParametersOptions(options);
}
request = ApplyChangesFeedOptions(request, options);

var lastSequence = options?.Since ?? "0";

Expand Down Expand Up @@ -722,6 +716,88 @@ public async Task<CouchDatabaseInfo> GetInfoAsync(CancellationToken cancellation
}

/// <inheritdoc />
public async Task<CouchPartitionInfo> GetPartitionInfoAsync(string partitionKey, CancellationToken cancellationToken = default)
{
Check.NotNull(partitionKey, nameof(partitionKey));

return await NewRequest()
.AppendPathSegment("_partition")
.AppendPathSegment(Uri.EscapeDataString(partitionKey))
.GetJsonAsync<CouchPartitionInfo>(cancellationToken)
.SendRequestAsync()
.ConfigureAwait(false);
}

/// <inheritdoc />
public Task<List<TSource>> QueryPartitionAsync(string partitionKey, string mangoQueryJson, CancellationToken cancellationToken = default)
{
Check.NotNull(partitionKey, nameof(partitionKey));
Check.NotNull(mangoQueryJson, nameof(mangoQueryJson));

return QueryPartitionInternalAsync(partitionKey, r => r
.WithHeader("Content-Type", "application/json")
.PostStringAsync(mangoQueryJson, cancellationToken));
}

/// <inheritdoc />
public Task<List<TSource>> QueryPartitionAsync(string partitionKey, object mangoQuery, CancellationToken cancellationToken = default)
{
Check.NotNull(partitionKey, nameof(partitionKey));
Check.NotNull(mangoQuery, nameof(mangoQuery));

return QueryPartitionInternalAsync(partitionKey, r => r
.PostJsonAsync(mangoQuery, cancellationToken));
}

/// <inheritdoc />
public async Task<List<TSource>> GetPartitionAllDocsAsync(string partitionKey, CancellationToken cancellationToken = default)
{
Check.NotNull(partitionKey, nameof(partitionKey));

var result = await NewRequest()
.AppendPathSegment("_partition")
.AppendPathSegment(Uri.EscapeDataString(partitionKey))
.AppendPathSegment("_all_docs")
.SetQueryParam("include_docs", "true")
.GetJsonAsync<AllDocsResult<TSource>>(cancellationToken)
.SendRequestAsync()
.ConfigureAwait(false);

var documents = result.Rows
.Where(r => r.Doc != null)
.Select(r => r.Doc!)
.ToList();

foreach (var document in documents)
{
InitAttachments(document);
}

return documents;
}

private async Task<List<TSource>> QueryPartitionInternalAsync(string partitionKey, Func<IFlurlRequest, Task<IFlurlResponse>> requestFunc)
{
IFlurlRequest request = NewRequest()
.AppendPathSegment("_partition")
.AppendPathSegment(Uri.EscapeDataString(partitionKey))
.AppendPathSegment("_find");

Task<IFlurlResponse> message = requestFunc(request);

FindResult<TSource> findResult = await message
.ReceiveJson<FindResult<TSource>>()
.SendRequestAsync()
.ConfigureAwait(false);

var documents = findResult.Docs.ToList();

foreach (TSource document in documents)
{
InitAttachments(document);
}

return documents;
public async Task<int> GetRevisionLimitAsync(CancellationToken cancellationToken = default)
{
return Convert.ToInt32(await NewRequest()
Expand Down Expand Up @@ -791,6 +867,27 @@ internal IndexBuilder<TSource> NewIndexBuilder(
return builder;
}

private static IFlurlRequest ApplyChangesFeedOptions(IFlurlRequest request, ChangesFeedOptions? options)
{
if (options == null)
{
return request;
}

request = request.ApplyQueryParametersOptions(options);

// Apply custom query parameters for design document filters
if (options.QueryParameters != null)
{
foreach (var param in options.QueryParameters)
{
request = request.SetQueryParam(param.Key, param.Value);
}
}

return request;
}

private static IFlurlRequest SetFindOptions(IFlurlRequest request, FindOptions options)
{
if (options.Attachments)
Expand Down
Loading
Loading