Skip to content

Commit

Permalink
Adding isolated worker model trigger binding
Browse files Browse the repository at this point in the history
  • Loading branch information
tpeczek committed Feb 29, 2024
1 parent 13e7745 commit 684982b
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/continuous-integration.yml
Expand Up @@ -20,7 +20,7 @@ jobs:
- name: Restore
run: dotnet restore src/RethinkDb.Azure.WebJobs.Extensions
- name: Build
run: dotnet build src/RethinkDb.Azure.WebJobs.Extensions --configuration Release --no-restore
run: dotnet build src/RethinkDb.Azure.WebJobs.Extensions --configuration Release --no-restore
- name: Test
run: dotnet test test/RethinkDb.Azure.WebJobs.Extensions.Tests --configuration Release
code-scanning:
Expand Down
Expand Up @@ -5,6 +5,7 @@
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Logging;
using RethinkDb.Azure.Functions.Worker.Extensions;
using RethinkDb.Azure.Functions.Worker.Extensions.Model;
using Demo.Azure.Functions.Worker.RethinkDb.Model;

namespace Demo.Azure.Functions.Worker.RethinkDb
Expand Down Expand Up @@ -128,5 +129,17 @@ public ThreadStatsFunctions(ILogger<ThreadStatsFunctions> logger)
RethinkDbDocuments = documents
};
}

[Function("HandleThreadStatsChange")]
public void HandleThreadStatsChange(
[RethinkDbTrigger(
databaseName: "Demo",
tableName: "ThreadStats",
HostnameSetting = "RethinkDbHostname",
UserSetting = "RethinkDbUser",
PasswordSetting = "RethinkDbPassword")]DocumentChange<ThreadStats> change)
{
_logger.LogInformation($"[ThreadStats Change Received] {change.NewValue}");
}
}
}
10 changes: 10 additions & 0 deletions demos/Demo.Azure.Functions.Worker.RethinkDb/local.settings.json
@@ -0,0 +1,10 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"RethinkDbHostname": "[Enter RethinkDB hostname here]",
"RethinkDbUser": "[Enter RethinkDB user name here]",
"RethinkDbPassword": "[Enter RethinkDB user password here]"
}
}
@@ -0,0 +1,57 @@
using System.Collections.Generic;
using System.Text.Json.Serialization;

namespace RethinkDb.Azure.Functions.Worker.Extensions.Model
{
/// <summary>
/// Represents a document change in RethinkDB table.
/// </summary>
public class DocumentChange
{
#region Properties
/// <summary>
/// Gets or sets the old value. When a document is inserted, old value will be null.
/// </summary>
[JsonPropertyName("old_val")]
public dynamic OldValue { get; set; }

/// <summary>
/// Gets or sets the new value. When a document is deleted, new value will be null.
/// </summary>
[JsonPropertyName("new_val")]
public dynamic NewValue { get; set; }

/// <summary>
/// If <see cref="RethinkDbTriggerAttribute.IncludeTypes"/> is set to true, this property will indicate the type of change.
/// </summary>
[JsonPropertyName("type")]
public DocumentChangeType? Type { get; set; }
#endregion
}

/// <summary>
/// Represents a document change in RethinkDB table.
/// </summary>
public class DocumentChange<T>
{
#region Properties
/// <summary>
/// Gets or sets the old value. When a document is inserted, old value will be null.
/// </summary>
[JsonPropertyName("old_val")]
public T OldValue { get; set; }

/// <summary>
/// Gets or sets the new value. When a document is deleted, new value will be null.
/// </summary>
[JsonPropertyName("new_val")]
public T NewValue { get; set; }

/// <summary>
/// If <see cref="RethinkDbTriggerAttribute.IncludeTypes"/> is set to true, this property will indicate the type of change.
/// </summary>
[JsonPropertyName("type")]
public DocumentChangeType? Type { get; set; }
#endregion
}
}
@@ -0,0 +1,38 @@
using System.Text.Json.Serialization;

namespace RethinkDb.Azure.Functions.Worker.Extensions.Model
{
/// <summary>
/// The type of <see cref="DocumentChange"/>.
/// </summary>
[JsonConverter(typeof(JsonStringEnumConverter))]
public enum DocumentChangeType
{
/// <summary>
/// Document was added.
/// </summary>
Add,
/// <summary>
/// Document was removed.
/// </summary>
Remove,
/// <summary>
/// Document was changed.
/// </summary>
Change,
/// <summary>
/// Initial document.
/// </summary>
Initial,
/// <summary>
/// If an initial result for a document has been sent and a change is made to that document that would move it to the unsent part of the result set
/// (e.g., a changefeed monitors the top 100 posters, the first 50 have been sent, and poster 48 has become poster 52),
/// an 'uninitial' notification will be sent, with an old_val field but no new_val field.
/// </summary>
Uninitial,
/// <summary>
/// A state document.
/// </summary>
State
}
}
Expand Up @@ -30,6 +30,7 @@
<PackageReference Include="DotNet.ReproducibleBuilds" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="System.Text.Json" Version="[6.0.0,)" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Abstractions" Version="[1.3.0,)" />
</ItemGroup>
</Project>
@@ -0,0 +1,88 @@
using System;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

namespace RethinkDb.Azure.Functions.Worker.Extensions
{
/// <summary>
/// Attribute used to define a trigger that binds to a RethinkDB table.
/// </summary>
public sealed class RethinkDbTriggerAttribute : TriggerBindingAttribute
{
/// <summary>
/// The name of the database containing the table to which the parameter applies (may include binding parameters).
/// </summary>
public string DatabaseName { get; private set; }

/// <summary>
/// The name of the table to which the parameter applies (may include binding parameters).
/// </summary>
public string TableName { get; private set; }

/// <summary>
/// The name of the app setting containing the hostname or IP address of the server to which the parameter applies.
/// </summary>
public string HostnameSetting { get; set; }

/// <summary>
/// The name of the app setting containing the TCP port of the server to which the parameter applies.
/// </summary>
public string PortSetting { get; set; }

/// <summary>
/// The name of the app setting containing the authorization key to the server to which the parameter applies.
/// </summary>
public string AuthorizationKeySetting { get; set; }

/// <summary>
/// The name of the app setting containing the user account to connect as to the server to which the parameter applies.
/// </summary>
public string UserSetting { get; set; }

/// <summary>
/// The name of the app setting containing the user account password to connect as to the server to which the parameter applies.
/// </summary>
public string PasswordSetting { get; set; }

/// <summary>
/// The name of the app setting containing the value indicating if SSL/TLS encryption should be enabled for connection to the server.
/// </summary>
/// <remarks>The underlying driver (RethinkDb.Driver) requires a commercial license for SSL/TLS encryption.</remarks>
public string EnableSslSetting { get; set; }

/// <summary>
/// The name of the app setting containing the "license to" of underlying driver (RethinkDb.Driver) commercial license.
/// </summary>
public string LicenseToSetting { get; set; }

/// <summary>
/// The name of the app setting containing the "license key" of underlying driver (RethinkDb.Driver) commercial license.
/// </summary>
public string LicenseKeySetting { get; set; }

/// <summary>
/// The value indicating if <see cref="DocumentChange.Type"/> field should be included for <see cref="DocumentChange"/>.
/// </summary>
public bool IncludeTypes { get; set; } = false;

/// <summary>
/// Defines a trigger that binds to a RethinkDB table.
/// </summary>
/// <param name="databaseName">Name of the database of the table to which the parameter applies (may include binding parameters).</param>
/// <param name="tableName">Name of the table to which the parameter applies (may include binding parameters).</param>
public RethinkDbTriggerAttribute(string databaseName, string tableName)
{
if (String.IsNullOrWhiteSpace(databaseName))
{
throw new ArgumentNullException(nameof(databaseName));
}

if (String.IsNullOrWhiteSpace(tableName))
{
throw new ArgumentNullException(nameof(tableName));
}

DatabaseName = databaseName;
TableName = tableName;
}
}
}
Expand Up @@ -17,6 +17,21 @@ namespace RethinkDb.Azure.WebJobs.Extensions.Config
[Extension("RethinkDB")]
internal class RethinkDbExtensionConfigProvider : IExtensionConfigProvider
{
#region Classes
private class RethinkDbOpenType : OpenType.Poco
{
public override bool IsMatch(Type type, OpenTypeMatchContext context)
{
if (type.FullName == "System.Object")
{
return true;
}

return base.IsMatch(type, context);
}
}
#endregion

#region Fields
private readonly IConfiguration _configuration;
private readonly RethinkDbOptions _options;
Expand Down Expand Up @@ -47,13 +62,13 @@ public void Initialize(ExtensionConfigContext context)
// RethinkDB Bindings
var bindingAttributeBindingRule = context.AddBindingRule<RethinkDbAttribute>();
bindingAttributeBindingRule.AddValidator(ValidateHost);
bindingAttributeBindingRule.BindToCollector<OpenType.Poco>(typeof(RethinkDbCollectorConverter<>), _options, _rethinkDBConnectionFactory);
bindingAttributeBindingRule.BindToCollector<RethinkDbOpenType>(typeof(RethinkDbCollectorConverter<>), _options, _rethinkDBConnectionFactory);
bindingAttributeBindingRule.WhenIsNotNull(nameof(RethinkDbAttribute.Id))
.BindToValueProvider(CreateValueBinderAsync);

// RethinkDB Trigger
var triggerAttributeBindingRule = context.AddBindingRule<RethinkDbTriggerAttribute>();
triggerAttributeBindingRule.BindToTrigger<DocumentChange>(new RethinkDbTriggerAttributeBindingProvider(_configuration, _options, _rethinkDBConnectionFactory, _nameResolver, _loggerFactory));
triggerAttributeBindingRule.BindToTrigger(new RethinkDbTriggerAttributeBindingProvider(_configuration, _options, _rethinkDBConnectionFactory, _nameResolver, _loggerFactory));
}

private void ValidateHost(RethinkDbAttribute attribute, Type paramType)
Expand Down
Expand Up @@ -15,6 +15,10 @@ namespace RethinkDb.Azure.WebJobs.Extensions.Trigger
internal class RethinkDbTriggerAttributeBindingProvider : ITriggerBindingProvider
{
#region Fields
private static readonly Type DOCUMENTCHANGE_TYPE = typeof(DocumentChange);
private static readonly Type OBJECT_TYPE = typeof(object);
private static readonly Type STRING_TYPE = typeof(string);

private const string UNABLE_TO_RESOLVE_APP_SETTING_FORMAT = "Unable to resolve app setting for property '{0}.{1}'. Make sure the app setting exists and has a valid value.";

private readonly IConfiguration _configuration;
Expand Down Expand Up @@ -45,20 +49,23 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
throw new ArgumentNullException(nameof(context));
}

ParameterInfo parameter = context.Parameter;

RethinkDbTriggerAttribute triggerAttribute = parameter.GetCustomAttribute<RethinkDbTriggerAttribute>(inherit: false);
RethinkDbTriggerAttribute triggerAttribute = context.Parameter.GetCustomAttribute<RethinkDbTriggerAttribute>(inherit: false);
if (triggerAttribute is null)
{
return _nullTriggerBindingTask;
}

if ((context.Parameter.ParameterType != DOCUMENTCHANGE_TYPE) && (context.Parameter.ParameterType != OBJECT_TYPE) && (context.Parameter.ParameterType != STRING_TYPE))
{
return _nullTriggerBindingTask;
}

ConnectionOptions triggerConnectionOptions = ResolveTriggerConnectionOptions(triggerAttribute);
Task<IConnection> triggerConnectionTask = _rethinkDBConnectionFactory.GetConnectionAsync(triggerConnectionOptions);

TableOptions triggerTableOptions = ResolveTriggerTableOptions(triggerAttribute);

return Task.FromResult<ITriggerBinding>(new RethinkDbTriggerBinding(parameter, triggerConnectionTask, triggerTableOptions, triggerAttribute.IncludeTypes));
return Task.FromResult<ITriggerBinding>(new RethinkDbTriggerBinding(context.Parameter, triggerConnectionTask, triggerTableOptions, triggerAttribute.IncludeTypes));
}

private ConnectionOptions ResolveTriggerConnectionOptions(RethinkDbTriggerAttribute triggerAttribute)
Expand Down
Expand Up @@ -14,17 +14,18 @@ namespace RethinkDb.Azure.WebJobs.Extensions.Trigger
internal class RethinkDbTriggerBinding : ITriggerBinding
{
#region Fields
private static readonly Type DOCUMENTCHANGE_TYPE = typeof(DocumentChange);
private static readonly IReadOnlyDictionary<string, object> EMPTY_BINDING_DATA = new Dictionary<string, object>();

private readonly ParameterInfo _parameter;
private readonly Task<IConnection> _rethinkDbConnectionTask;
private readonly TableOptions _rethinkDbTableOptions;
private readonly Driver.Ast.Table _rethinkDbTable;
private readonly bool _includeTypes;

private readonly Task<ITriggerData> _emptyBindingDataTask = Task.FromResult<ITriggerData>(new TriggerData(null, new Dictionary<string, object>()));
#endregion

#region Properties
public Type TriggerValueType => typeof(DocumentChange);
public Type TriggerValueType => DOCUMENTCHANGE_TYPE;

public IReadOnlyDictionary<string, Type> BindingDataContract { get; } = new Dictionary<string, Type>();
#endregion
Expand All @@ -43,8 +44,9 @@ public RethinkDbTriggerBinding(ParameterInfo parameter, Task<IConnection> rethin
#region Methods
public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
// ValueProvider is via binding rules.
return _emptyBindingDataTask;
IValueProvider valueBinder = new RethinkDbTriggerValueBinder(_parameter, value);

return Task.FromResult<ITriggerData>(new TriggerData(valueBinder, EMPTY_BINDING_DATA));
}

public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
Expand Down
@@ -0,0 +1,46 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Newtonsoft.Json;

namespace RethinkDb.Azure.WebJobs.Extensions.Trigger
{
internal class RethinkDbTriggerValueBinder : IValueProvider
{
#region Fields
private static readonly Type STRING_TYPE = typeof(string);

private readonly ParameterInfo _parameter;
private readonly object _value;
private readonly bool _parameterTypeIsString;
#endregion

#region Properties
public Type Type => _parameter.ParameterType;
#endregion

#region Constructor
public RethinkDbTriggerValueBinder(ParameterInfo parameter, object value)
{
_parameter = parameter;
_value = value;
_parameterTypeIsString = parameter.ParameterType == STRING_TYPE;
}
#endregion

#region Methods
public Task<object> GetValueAsync()
{
if (_parameterTypeIsString)
{
return Task.FromResult((object)JsonConvert.SerializeObject(_value));
}

return Task.FromResult(_value);
}

public string ToInvokeString() => String.Empty;
#endregion
}
}

0 comments on commit 684982b

Please sign in to comment.