Skip to content

Commit

Permalink
feat(consul): use app environment for service discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
SonicGD committed Dec 2, 2022
1 parent 412e926 commit ddcbc07
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 22 deletions.
24 changes: 18 additions & 6 deletions src/Sitko.Core.Consul.Web/ConsulWebClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace Sitko.Core.Consul.Web;

public class ConsulWebClient
{
private readonly IApplication application;
private readonly IOptionsMonitor<ConsulWebModuleOptions> configMonitor;
private readonly IApplicationContext applicationContext;
private readonly IConsulClientProvider consulClientProvider;
private readonly string healthUrl;
private readonly ILogger<ConsulWebClient> logger;
Expand All @@ -22,11 +22,11 @@ public class ConsulWebClient

public ConsulWebClient(IServer server, IConsulClientProvider consulClientProvider,
IOptionsMonitor<ConsulWebModuleOptions> config,
IApplicationContext applicationContext, IApplication application, ILogger<ConsulWebClient> logger)
IApplicationContext applicationContext, ILogger<ConsulWebClient> logger)
{
this.consulClientProvider = consulClientProvider;
configMonitor = config;
this.application = application;
this.applicationContext = applicationContext;
this.logger = logger;

name = applicationContext.Name;
Expand Down Expand Up @@ -96,7 +96,14 @@ public async Task RegisterAsync()
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(Options.DeregisterTimeoutInSeconds),
Interval = TimeSpan.FromSeconds(Options.ChecksIntervalInSeconds)
},
Tags = new[] { "metrics", $"healthUrl:{healthUrl}", $"version:{application.Version}" }
Tags = new[] { "metrics" },
Meta = new Dictionary<string, string>
{
{ "Environment", applicationContext.Environment },
{ "Instance", applicationContext.Id.ToString() },
{ "Version", applicationContext.Version },
{ "HealthUrl", healthUrl },
}
};

logger.LogInformation("Registering in Consul as {Name} on {Host}:{Port}", name,
Expand All @@ -114,7 +121,13 @@ public async Task RegisterAsync()
{
if (serviceResponse.Response.Any())
{
return HealthCheckResult.Healthy();
var services = serviceResponse.Response.Where(catalogService =>
catalogService.ServiceMeta.TryGetValue("Environment", out var env) ||
env == applicationContext.Environment).ToList();
if (services.Any())
{
return HealthCheckResult.Healthy();
}
}

if (Options.AutoFixRegistration)
Expand All @@ -129,4 +142,3 @@ public async Task RegisterAsync()
return HealthCheckResult.Unhealthy($"Error response from consul: {serviceResponse.StatusCode}");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Sitko.Core.App;
using Sitko.Core.Consul;
using Sitko.Core.Grpc.Client.Discovery;

Expand All @@ -15,21 +16,24 @@ public class ConsulGrpcServiceAddressResolver<TClient> : IGrpcServiceAddressReso
private readonly CancellationTokenSource cts = new();
private readonly ILogger<ConsulGrpcServiceAddressResolver<TClient>> logger;
private readonly IOptionsMonitor<ConsulGrpcClientModuleOptions<TClient>> optionsMonitor;
private readonly IApplicationContext applicationContext;

private readonly string serviceName =
typeof(TClient).BaseType!.GenericTypeArguments.First().DeclaringType!.Name;

private ulong lastIndex;

private Task? refreshTask;
private Uri? target;
private List<Uri> target = new();

public ConsulGrpcServiceAddressResolver(IConsulClientProvider consulClientProvider,
IOptionsMonitor<ConsulGrpcClientModuleOptions<TClient>> optionsMonitor,
IApplicationContext applicationContext,
ILogger<ConsulGrpcServiceAddressResolver<TClient>> logger)
{
this.consulClientProvider = consulClientProvider;
this.optionsMonitor = optionsMonitor;
this.applicationContext = applicationContext;
this.logger = logger;
}

Expand All @@ -52,7 +56,7 @@ public async Task InitAsync()
refreshTask = StartRefreshTaskAsync();
}

public Uri? GetAddress() => target;
public Uri? GetAddress() => target.Any() ? target.OrderBy(_ => Guid.NewGuid()).First() : null;

public event EventHandler? OnChange;

Expand Down Expand Up @@ -85,27 +89,36 @@ private async Task LoadTargetAsync()
lastIndex = serviceResponse.LastIndex;
if (serviceResponse.Response.Any())
{
var service = serviceResponse.Response.First();
var serviceUrl =
new Uri(
$"{(Options.EnableHttp2UnencryptedSupport ? "http" : "https")}://{service.ServiceAddress}:{service.ServicePort}");
var services = serviceResponse.Response.Where(catalogService =>
catalogService.ServiceMeta.TryGetValue("Environment", out var env) ||
env == applicationContext.Environment).ToList();
if (!services.Any())
{
logger.LogError("No for services {ServiceName} for environment {Environment}", serviceName,
applicationContext.Environment);
target = new List<Uri>();
}

if (serviceUrl == target)
var serviceUrls = services.Select(service => new Uri(
$"{(Options.EnableHttp2UnencryptedSupport ? "http" : "https")}://{service.ServiceAddress}:{service.ServicePort}"))
.OrderBy(uri => uri).ToList();

if (serviceUrls.SequenceEqual(target))
{
return;
}

target = serviceUrl;
logger.LogInformation("Target for {Type} loaded: {Target}", typeof(TClient), target);
target = serviceUrls;
logger.LogInformation("Target for {Type} loaded: {Urls}", typeof(TClient),
string.Join(", ", serviceUrls));
}
else
{
logger.LogError("Empty response from consul for service {ServiceName}", serviceName);
target = null;
target = new List<Uri>();
}

OnChange?.Invoke(this, EventArgs.Empty);
}
}
}

15 changes: 10 additions & 5 deletions src/Sitko.Core.Grpc.Server.Consul/ConsulGrpcServicesRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ namespace Sitko.Core.Grpc.Server.Consul;

public class ConsulGrpcServicesRegistrar : IGrpcServicesRegistrar, IAsyncDisposable
{
private readonly IApplication application;
private readonly IConsulClientProvider? consulClient;
private readonly string host = "127.0.0.1";
private readonly bool inContainer = DockerHelper.IsRunningInDocker();
private readonly ILogger<ConsulGrpcServicesRegistrar> logger;
private readonly IOptionsMonitor<ConsulDiscoveryGrpcServerModuleOptions> optionsMonitor;
private readonly IApplicationContext applicationContext;
private readonly int port;

private readonly ConcurrentDictionary<string, string> registeredServices = new();
Expand All @@ -30,12 +30,12 @@ public class ConsulGrpcServicesRegistrar : IGrpcServicesRegistrar, IAsyncDisposa
private IScheduledTask? updateTtlTask;

public ConsulGrpcServicesRegistrar(IOptionsMonitor<ConsulDiscoveryGrpcServerModuleOptions> optionsMonitor,
IApplication application,
IApplicationContext applicationContext,
IServer server, IScheduler scheduler, ILogger<ConsulGrpcServicesRegistrar> logger,
IConsulClientProvider? consulClient = null)
{
this.optionsMonitor = optionsMonitor;
this.application = application;
this.applicationContext = applicationContext;
this.consulClient = consulClient;
this.logger = logger;
if (!string.IsNullOrEmpty(Options.Host))
Expand Down Expand Up @@ -138,7 +138,13 @@ public async ValueTask DisposeAsync()
DeregisterCriticalServiceAfter =
TimeSpan.FromSeconds(Options.DeregisterTimeoutInSeconds)
},
Tags = new[] { "grpc", $"version:{application.Version}" }
Tags = new[] { "grpc" },
Meta = new Dictionary<string, string>
{
{ "Environment", applicationContext.Environment },
{ "Instance", applicationContext.Id.ToString() },
{ "Version", applicationContext.Version }
}
};
logger.LogInformation("Register grpc service {ServiceName} on {Address}:{Port}", serviceName, host,
port);
Expand Down Expand Up @@ -228,4 +234,3 @@ private async Task UpdateServicesTtlAsync(CancellationToken token)
}
}
}

0 comments on commit ddcbc07

Please sign in to comment.