Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/Data/Masa.Contrib.Data.UoW.EF/DbConnectionStringProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Masa.Contrib.Data.UoW.EF;

public class DbConnectionStringProvider : BaseDbConnectionStringProvider
{
private readonly IOptionsMonitor<MasaDbConnectionOptions> _options;

public DbConnectionStringProvider(IOptionsMonitor<MasaDbConnectionOptions> options) => _options = options;

protected override List<DbContextOptions> GetDbContextOptionsList()
{
return new() { new(_options.CurrentValue.DefaultConnection) };
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace Masa.Contrib.Data.UoW.EF;

public class DefaultConnectionStringProvider : IConnectionStringProvider
{
private readonly IUnitOfWorkAccessor _unitOfWorkAccessor;
private readonly IOptionsSnapshot<MasaDbConnectionOptions> _options;
private readonly ILogger<DefaultConnectionStringProvider>? _logger;

public DefaultConnectionStringProvider(
IUnitOfWorkAccessor unitOfWorkAccessor,
IOptionsSnapshot<MasaDbConnectionOptions> options,
ILogger<DefaultConnectionStringProvider>? logger = null)
{
_unitOfWorkAccessor = unitOfWorkAccessor;
_options = options;
_logger = logger;
}

public Task<string> GetConnectionStringAsync() => Task.FromResult(GetConnectionString());

public string GetConnectionString()
{
if (_unitOfWorkAccessor.CurrentDbContextOptions != null)
return _unitOfWorkAccessor.CurrentDbContextOptions.ConnectionString;

var connectionString = _options.Value.DefaultConnection;
if (string.IsNullOrEmpty(connectionString))
_logger?.LogError("Failed to get database connection string, please check whether the configuration of IOptionsSnapshot<MasaDbConnectionOptions> is abnormal");

_unitOfWorkAccessor.CurrentDbContextOptions = new MasaDbContextConfigurationOptions(connectionString);
return connectionString;
}
}
50 changes: 37 additions & 13 deletions src/Data/Masa.Contrib.Data.UoW.EF/DispatcherOptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,61 @@ namespace Masa.Contrib.Data.UoW.EF;

public static class DispatcherOptionsExtensions
{
public static IEventBusBuilder UseUoW<TDbContext>(
this IEventBusBuilder eventBusBuilder,
Action<MasaDbContextOptionsBuilder>? optionsBuilder = null,
bool disableRollbackOnFailure = false,
bool useTransaction = true)
where TDbContext : MasaDbContext
{
eventBusBuilder.Services.UseUoW<TDbContext>(nameof(eventBusBuilder.Services), optionsBuilder, disableRollbackOnFailure, useTransaction);
return eventBusBuilder;
}

public static IDispatcherOptions UseUoW<TDbContext>(
this IDispatcherOptions options,
Action<MasaDbContextOptionsBuilder>? optionsBuilder = null,
bool disableRollbackOnFailure = false,
bool useTransaction = true)
where TDbContext : MasaDbContext
{
if (options.Services == null)
throw new ArgumentNullException(nameof(options.Services));
options.Services.UseUoW<TDbContext>(nameof(options.Services), optionsBuilder, disableRollbackOnFailure, useTransaction);
return options;
}

if (options.Services.Any(service => service.ImplementationType == typeof(UoWProvider)))
return options;
private static IServiceCollection UseUoW<TDbContext>(
this IServiceCollection services,
string paramName,
Action<MasaDbContextOptionsBuilder>? optionsBuilder = null,
bool disableRollbackOnFailure = false,
bool useTransaction = true)
where TDbContext : MasaDbContext
{
if (services == null)
throw new ArgumentNullException(paramName);

if (services.Any(service => service.ImplementationType == typeof(UoWProvider)))
return services;

options.Services.AddSingleton<UoWProvider>();
services.AddSingleton<UoWProvider>();
services.TryAddScoped<IUnitOfWorkAccessor, UnitOfWorkAccessor>();
services.TryAddSingleton<IUnitOfWorkManager, UnitOfWorkManager>();
services.TryAddScoped<IConnectionStringProvider, DefaultConnectionStringProvider>();
services.TryAddSingleton<IDbConnectionStringProvider, DbConnectionStringProvider>();

options.Services.AddScoped<IUnitOfWork>(serviceProvider => new UnitOfWork<TDbContext>(serviceProvider)
services.AddScoped<IUnitOfWork>(serviceProvider => new UnitOfWork<TDbContext>(serviceProvider)
{
DisableRollbackOnFailure = disableRollbackOnFailure,
UseTransaction = useTransaction
});
if (options.Services.All(service => service.ServiceType != typeof(MasaDbContextOptions<TDbContext>)))
options.Services.AddMasaDbContext<TDbContext>(optionsBuilder);
if (services.All(service => service.ServiceType != typeof(MasaDbContextOptions<TDbContext>)))
services.AddMasaDbContext<TDbContext>(optionsBuilder);

options.Services.AddScoped<ITransaction, Transaction>();

return options;
services.AddScoped<ITransaction, Transaction>();
return services;
}

private class UoWProvider
{

}
}

17 changes: 13 additions & 4 deletions src/Data/Masa.Contrib.Data.UoW.EF/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
[中](README.zh-CN.md) | EN

## Contracts.EF
## UoW.EF

Example:

```C#
Install-Package Masa.Contrib.Dispatcher.Events
Install-Package Masa.Contrib.Data.UoW.EF
Install-Package Masa.Utils.Data.EntityFrameworkCore.SqlServer
```

1. Configure appsettings.json
``` appsettings.json
{
"ConnectionStrings": {
"DefaultConnection": "server=localhost;uid=sa;pwd=P@ssw0rd;database=identity"
}
}
```

2. Use UoW
```C#
builder.Services.AddEventBus(options => {
options.UseUoW<CustomDbContext>(dbOptions => dbOptions.UseSqlServer("server=localhost;uid=sa;pwd=P@ssw0rd;database=identity"));
});
builder.Services.AddEventBus(eventBusBuilder => eventBusBuilder.UseUoW<CustomDbContext>(dbOptions => dbOptions.UseSqlServer()));
```
19 changes: 14 additions & 5 deletions src/Data/Masa.Contrib.Data.UoW.EF/README.zh-CN.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
中 | [EN](README.md)

## Contracts.EF
## UoW.EF

用例:

```C#
Install-Package Masa.Contrib.Dispatcher.Events
Install-Package Masa.Contrib.Data.UoW.EF
Install-Package Masa.Utils.Data.EntityFrameworkCore.SqlServer
```

1. 配置appsettings.json
``` appsettings.json
{
"ConnectionStrings": {
"DefaultConnection": "server=localhost;uid=sa;pwd=P@ssw0rd;database=identity"
}
}
```

2. 使用UoW
```C#
builder.Services.AddEventBus(options => {
options.UseUoW<CustomDbContext>(dbOptions => dbOptions.UseSqlServer("server=localhost;uid=sa;pwd=P@ssw0rd;database=identity"));
});
```
builder.Services.AddEventBus(eventBusBuilder => eventBusBuilder.UseUoW<CustomDbContext>(dbOptions => dbOptions.UseSqlServer()));
```
16 changes: 7 additions & 9 deletions src/Data/Masa.Contrib.Data.UoW.EF/UnitOfWork.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
namespace Masa.Contrib.Data.UoW.EF;

public class UnitOfWork<TDbContext> : IUnitOfWork
where TDbContext : MasaDbContext
public class UnitOfWork<TDbContext> : IUnitOfWork where TDbContext : MasaDbContext
{
private readonly IServiceProvider _serviceProvider;
public IServiceProvider ServiceProvider { get; }

private DbContext? _context;

protected DbContext Context => _context ??= _serviceProvider.GetRequiredService<TDbContext>();
protected DbContext Context;

public DbTransaction Transaction
{
Expand Down Expand Up @@ -35,7 +32,8 @@ public DbTransaction Transaction

public UnitOfWork(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
ServiceProvider = serviceProvider;
Context = serviceProvider.GetRequiredService<TDbContext>();
}

public async Task SaveChangesAsync(CancellationToken cancellationToken = default)
Expand All @@ -61,7 +59,7 @@ public async Task RollbackAsync(CancellationToken cancellationToken = default)
await Context.Database.RollbackTransactionAsync(cancellationToken);
}

public async ValueTask DisposeAsync() => await (_context?.DisposeAsync() ?? ValueTask.CompletedTask);
public async ValueTask DisposeAsync() => await Context.DisposeAsync();

public void Dispose() => _context?.Dispose();
public void Dispose() => Context.Dispose();
}
6 changes: 6 additions & 0 deletions src/Data/Masa.Contrib.Data.UoW.EF/UnitOfWorkAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Masa.Contrib.Data.UoW.EF;

public class UnitOfWorkAccessor : IUnitOfWorkAccessor
{
public MasaDbContextConfigurationOptions? CurrentDbContextOptions { get; set; }
}
26 changes: 26 additions & 0 deletions src/Data/Masa.Contrib.Data.UoW.EF/UnitOfWorkManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace Masa.Contrib.Data.UoW.EF;

public class UnitOfWorkManager : IUnitOfWorkManager
{
private readonly IServiceProvider _serviceProvider;

public UnitOfWorkManager(IServiceProvider serviceProvider) => _serviceProvider = serviceProvider;

public IUnitOfWork CreateDbContext()
{
var scope = _serviceProvider.CreateAsyncScope();
return scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
}

public IUnitOfWork CreateDbContext(MasaDbContextConfigurationOptions options)
{
ArgumentNullException.ThrowIfNull(options, nameof(options));
if (string.IsNullOrEmpty(options.ConnectionString))
throw new ArgumentException($"Invalid {nameof(options)}");

var scope = _serviceProvider.CreateAsyncScope();
var unitOfWorkAccessor = scope.ServiceProvider.GetRequiredService<IUnitOfWorkAccessor>();
unitOfWorkAccessor.CurrentDbContextOptions = options;
return scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
}
}
5 changes: 4 additions & 1 deletion src/Data/Masa.Contrib.Data.UoW.EF/_Imports.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
global using Masa.BuildingBlocks.Data.UoW;
global using Masa.BuildingBlocks.Data.UoW.Options;
global using Masa.BuildingBlocks.Dispatcher.Events;
global using Masa.Utils.Data.EntityFrameworkCore;
global using Microsoft.EntityFrameworkCore;
global using Microsoft.EntityFrameworkCore.Storage;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;
global using Microsoft.Extensions.Options;
global using System.Data.Common;
global using System.Text.Json.Serialization;
global using EntityState = Masa.BuildingBlocks.Data.UoW.EntityState;

global using DbContextOptions = Masa.BuildingBlocks.Data.UoW.Options.MasaDbContextConfigurationOptions;
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class Repository<TDbContext, TEntity> :
{
protected readonly TDbContext Context;

public Repository(TDbContext context, IUnitOfWork unitOfWork)
public Repository(TDbContext context, IUnitOfWork unitOfWork) : base(unitOfWork.ServiceProvider)
{
Context = context;
UnitOfWork = unitOfWork;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@ public class DeleteLocalQueueExpiresProcessor : ProcessorBase
{
private readonly IOptions<DispatcherOptions> _options;

public DeleteLocalQueueExpiresProcessor(IOptions<DispatcherOptions> options)
public override int Delay => _options.Value.CleaningLocalQueueExpireInterval;

public DeleteLocalQueueExpiresProcessor(IOptions<DispatcherOptions> options) : base(null)
{
_options = options;
}

/// <summary>
/// Delete expired events
/// </summary>
/// <param name="stoppingToken"></param>
/// <returns></returns>
public override Task ExecuteAsync(CancellationToken stoppingToken)
protected override void Executing()
{
LocalQueueProcessor.Default.Delete(_options.Value.LocalRetryTimes);
return Task.CompletedTask;
}

public override int Delay => _options.Value.CleaningLocalQueueExpireInterval;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,25 @@

public class DeletePublishedExpireEventProcessor : ProcessorBase
{
private readonly IServiceProvider _serviceProvider;
private readonly IOptions<DispatcherOptions> _options;

public DeletePublishedExpireEventProcessor(
IServiceProvider serviceProvider,
IOptions<DispatcherOptions> options)
public override int Delay => _options.Value.CleaningExpireInterval;

public DeletePublishedExpireEventProcessor(IServiceProvider serviceProvider, IOptions<DispatcherOptions> options)
: base(serviceProvider)
{
_serviceProvider = serviceProvider;
_options = options;
}

/// <summary>
/// Delete expired events
/// </summary>
/// <param name="serviceProvider"></param>
/// <param name="stoppingToken"></param>
/// <returns></returns>
public override async Task ExecuteAsync(CancellationToken stoppingToken)
protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
{
using (var scope = _serviceProvider.CreateScope())
{
var logService = scope.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();
var expireDate = (_options.Value.GetCurrentTime?.Invoke() ?? DateTime.UtcNow).AddSeconds(-_options.Value.PublishedExpireTime);
await logService.DeleteExpiresAsync(expireDate, _options.Value.DeleteBatchCount, stoppingToken);
}
var logService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();
var expireDate = (_options.Value.GetCurrentTime?.Invoke() ?? DateTime.UtcNow).AddSeconds(-_options.Value.PublishedExpireTime);
await logService.DeleteExpiresAsync(expireDate, _options.Value.DeleteBatchCount, stoppingToken);
}

public override int Delay => _options.Value.CleaningExpireInterval;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ public class InfiniteLoopProcessor : ProcessorBase
private readonly IProcessor _processor;
private readonly ILogger<InfiniteLoopProcessor>? _logger;

public InfiniteLoopProcessor(IProcessor processor, ILogger<InfiniteLoopProcessor>? logger = null)
public InfiniteLoopProcessor(IServiceProvider serviceProvider, IProcessor processor)
: base(serviceProvider)
{
_processor = processor;
_logger = logger;
_logger = serviceProvider.GetService<ILogger<InfiniteLoopProcessor>>();
}

public override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand Down
Loading