From e6f2d3904ba0c310f158d2945fc9efc0ba406214 Mon Sep 17 00:00:00 2001 From: stasput Date: Fri, 15 Nov 2024 13:13:21 +0300 Subject: [PATCH 1/2] EDITSKS-5515: migrate to system.text.json --- .github/workflows/actions.yml | 4 +- .start-all.cmd | 2 +- ...tedTaskQueue.Monitoring.TestService.csproj | 1 - .../Startup.cs | 27 +- .../Api/RtqMonitoringSearchRequest.cs | 53 ++- .../Api/RtqMonitoringSearchResults.cs | 21 +- .../Api/RtqMonitoringTaskMeta.cs | 118 +++--- .../Api/RtqMonitoringTaskModel.cs | 45 ++- .../Api/TaskActions.cs | 6 +- .../Api/TimestampRange.cs | 27 +- ...dra.DistributedTaskQueue.Monitoring.csproj | 2 +- .../ElasticsearchResponseWrapperExtensions.cs | 99 +++-- .../Responses/Bulk/BulkResponse.cs | 21 +- .../Responses/Bulk/IndexBulkResponse.cs | 25 +- .../Responses/Bulk/ItemInfo.cs | 25 +- .../Responses/Get/GetResponse.cs | 33 +- .../Responses/ResponseBase.cs | 17 +- .../Responses/Search/HitInfo.cs | 16 +- .../Responses/Search/HitsCollection.cs | 22 +- .../Responses/Search/SearchResponse.cs | 16 +- .../RtqElasticsearchIndexerSettings.cs | 101 +++-- .../Indexer/TaskMetaProcessor.cs | 238 ++++++------ .../Json/JsonObjectExtensions.cs | 29 +- .../Json/LongToStringConverter.cs | 50 ++- .../Json/ObjectJsonSerializer.cs | 39 +- .../OmitBinaryAndAbstractPropertyConverter.cs | 28 ++ .../Json/TimeGuidJsonConverter.cs | 40 +- .../Json/TimestampJsonConverter.cs | 43 +-- .../Json/TotalCountCompatibilityConverter.cs | 33 +- .../Json/TwoKeysDictionaryConvertor.cs | 40 ++ .../Storage/Client/TaskSearchClient.cs | 165 ++++----- .../Storage/RtqElasticsearchSchema.cs | 349 +++++++++--------- .../Storage/Utils/UtcTicksJsonConverter.cs | 91 +++-- .../Storage/Writing/MetaIndexedInfo.cs | 47 ++- ...ryAndAbstractPropertiesContractResolver.cs | 38 -- .../Writing/RtqElasticsearchOffsetStorage.cs | 123 +++--- .../Storage/Writing/TaskIndexedInfo.cs | 42 +-- .../Writing/TruncateLongStringsConverter.cs | 48 +-- .../TaskCounter/RtqTaskCounters.cs | 26 +- ...ibutedTaskQueue.TestExchangeService.csproj | 4 - .../Startup.cs | 27 +- .../HttpClientForTestsBase.cs | 121 +++--- .../RtqTaskCountersForTests.cs | 19 +- .../TaskIndexInfoSerializationTest.cs | 154 ++++---- .../Monitoring/UtcTicksJsonConverterTest.cs | 231 ++++++------ ...andra.DistributedTaskQueue.sln.DotSettings | 3 +- .../Configuration/IRtqTaskDataRegistry.cs | 25 +- .../Configuration/RtqTaskDataRegistryBase.cs | 129 ++++--- 48 files changed, 1393 insertions(+), 1470 deletions(-) create mode 100644 Cassandra.DistributedTaskQueue.Monitoring/Json/OmitBinaryAndAbstractPropertyConverter.cs create mode 100644 Cassandra.DistributedTaskQueue.Monitoring/Json/TwoKeysDictionaryConvertor.cs delete mode 100644 Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/OmitBinaryAndAbstractPropertiesContractResolver.cs diff --git a/.github/workflows/actions.yml b/.github/workflows/actions.yml index a416d469..48589d77 100644 --- a/.github/workflows/actions.yml +++ b/.github/workflows/actions.yml @@ -40,7 +40,7 @@ jobs: run: dotnet jb cleanupcode Cassandra.DistributedTaskQueue.sln --profile=CatalogueCleanup --verbosity=WARN && git diff --exit-code -- ':!./cassandra-distributed-task-queue-ui/.yarn' - name: Check front code run: yarn --cwd cassandra-distributed-task-queue-ui lint - - name: Build docker compose environment + - name: Build docker-compose environment run: docker compose -f docker-compose.yaml up -d --build env: ES_VERSION: ${{ matrix.es-version }} @@ -50,7 +50,7 @@ jobs: run: dotnet test --no-build --configuration Release env: ES_VERSION: ${{ matrix.es-version }} - - name: Stop docker compose + - name: Stop if: always() run: docker compose -f docker-compose.yaml down env: diff --git a/.start-all.cmd b/.start-all.cmd index 9c8c78c8..170f51f2 100644 --- a/.start-all.cmd +++ b/.start-all.cmd @@ -1,2 +1,2 @@ wsl -u root sh -c "service docker status || (service docker start && echo 'artificially waiting 20s for docker to warmup...' && sleep 20s)" -wsl docker-compose -f docker-compose.linux.yaml up -d --build \ No newline at end of file +wsl docker compose -f docker-compose.linux.yaml up -d --build \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring.TestService/Cassandra.DistributedTaskQueue.Monitoring.TestService.csproj b/Cassandra.DistributedTaskQueue.Monitoring.TestService/Cassandra.DistributedTaskQueue.Monitoring.TestService.csproj index de0b9da6..86ebe200 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring.TestService/Cassandra.DistributedTaskQueue.Monitoring.TestService.csproj +++ b/Cassandra.DistributedTaskQueue.Monitoring.TestService/Cassandra.DistributedTaskQueue.Monitoring.TestService.csproj @@ -9,7 +9,6 @@ - diff --git a/Cassandra.DistributedTaskQueue.Monitoring.TestService/Startup.cs b/Cassandra.DistributedTaskQueue.Monitoring.TestService/Startup.cs index 7b2cabc3..6b9e04bc 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring.TestService/Startup.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring.TestService/Startup.cs @@ -6,23 +6,22 @@ using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.TestService +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.TestService; + +public class Startup { - public class Startup + public void ConfigureServices(IServiceCollection services) { - public void ConfigureServices(IServiceCollection services) - { - services.AddControllers().AddNewtonsoftJson(options => options.SerializerSettings.Converters.Add(new LongToStringConverter())); - services.AddSingleton(new GroboControllerFactory()); - } + services.AddControllers().AddJsonOptions(options => options.JsonSerializerOptions.Converters.Add(new LongToStringConverter())); + services.AddSingleton(new GroboControllerFactory()); + } - public void Configure(IApplicationBuilder app, IWebHostEnvironment env) - { - if (env.IsDevelopment()) - app.UseDeveloperExceptionPage(); + public void Configure(IApplicationBuilder app, IWebHostEnvironment env) + { + if (env.IsDevelopment()) + app.UseDeveloperExceptionPage(); - app.UseRouting(); - app.UseEndpoints(endpoints => endpoints.MapControllers()); - } + app.UseRouting(); + app.UseEndpoints(endpoints => endpoints.MapControllers()); } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchRequest.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchRequest.cs index b1a6f537..ecdaac56 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchRequest.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchRequest.cs @@ -1,33 +1,32 @@ -using JetBrains.Annotations; +using System.Text.Json.Serialization; -using Newtonsoft.Json; +using JetBrains.Annotations; using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Entities; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api; + +public class RtqMonitoringSearchRequest { - public class RtqMonitoringSearchRequest - { - [NotNull] - [JsonProperty("enqueueTimestampRange")] - public TimestampRange EnqueueTimestampRange { get; set; } - - [CanBeNull] - [JsonProperty("queryString")] - public string QueryString { get; set; } - - [CanBeNull] - [JsonProperty("states")] - public TaskState[] States { get; set; } - - [CanBeNull, ItemNotNull] - [JsonProperty("names")] - public string[] Names { get; set; } - - [JsonProperty("offset")] - public int? Offset { get; set; } - - [JsonProperty("count")] - public int? Count { get; set; } - } + [NotNull] + [JsonPropertyName("enqueueTimestampRange")] + public TimestampRange EnqueueTimestampRange { get; set; } + + [CanBeNull] + [JsonPropertyName("queryString")] + public string QueryString { get; set; } + + [CanBeNull] + [JsonPropertyName("states")] + public TaskState[] States { get; set; } + + [CanBeNull, ItemNotNull] + [JsonPropertyName("names")] + public string[] Names { get; set; } + + [JsonPropertyName("offset")] + public int? Offset { get; set; } + + [JsonPropertyName("count")] + public int? Count { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchResults.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchResults.cs index d1942b38..0be8bbe0 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchResults.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchResults.cs @@ -1,16 +1,15 @@ -using JetBrains.Annotations; +using System.Text.Json.Serialization; -using Newtonsoft.Json; +using JetBrains.Annotations; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api; + +public class RtqMonitoringSearchResults { - public class RtqMonitoringSearchResults - { - [JsonProperty("totalCount")] - public long TotalCount { get; set; } + [JsonPropertyName("totalCount")] + public long TotalCount { get; set; } - [NotNull, ItemNotNull] - [JsonProperty("taskMetas")] - public RtqMonitoringTaskMeta[] TaskMetas { get; set; } - } + [NotNull, ItemNotNull] + [JsonPropertyName("taskMetas")] + public RtqMonitoringTaskMeta[] TaskMetas { get; set; } = null!; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskMeta.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskMeta.cs index 97635402..acccc947 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskMeta.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskMeta.cs @@ -1,67 +1,65 @@ -using JetBrains.Annotations; +using System.Text.Json.Serialization; -using Newtonsoft.Json; -using Newtonsoft.Json.Converters; +using JetBrains.Annotations; using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Entities; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api; + +public class RtqMonitoringTaskMeta { - public class RtqMonitoringTaskMeta - { - [NotNull] - [JsonProperty("name")] - public string Name { get; set; } - - [NotNull] - [JsonProperty("id")] - public string Id { get; set; } - - [JsonProperty("ticks")] - [JsonConverter(typeof(LongToStringConverter))] - public long Ticks { get; set; } - - [JsonProperty("minimalStartTicks")] - [JsonConverter(typeof(LongToStringConverter))] - public long MinimalStartTicks { get; set; } - - [JsonProperty("startExecutingTicks")] - [JsonConverter(typeof(LongToStringConverter))] - public long? StartExecutingTicks { get; set; } - - [JsonProperty("finishExecutingTicks")] - [JsonConverter(typeof(LongToStringConverter))] - public long? FinishExecutingTicks { get; set; } - - [JsonProperty("lastModificationTicks")] - [JsonConverter(typeof(LongToStringConverter))] - public long? LastModificationTicks { get; set; } - - [JsonProperty("expirationTimestampTicks")] - [JsonConverter(typeof(LongToStringConverter))] - public long? ExpirationTimestampTicks { get; set; } - - [JsonProperty("expirationModificationTicks")] - [JsonConverter(typeof(LongToStringConverter))] - public long? ExpirationModificationTicks { get; set; } - - [JsonProperty("executionDurationTicks")] - [JsonConverter(typeof(LongToStringConverter))] - public long? ExecutionDurationTicks { get; set; } - - [JsonProperty("state")] - [JsonConverter(typeof(StringEnumConverter))] - public TaskState State { get; set; } - - [CanBeNull] - [JsonProperty("taskActions")] - public TaskActions TaskActions { get; set; } - - [JsonProperty("attempts")] - public int Attempts { get; set; } - - [JsonProperty("parentTaskId")] - public string ParentTaskId { get; set; } - } + [NotNull] + [JsonPropertyName("name")] + public string Name { get; set; } = null!; + + [NotNull] + [JsonPropertyName("id")] + public string Id { get; set; } = null!; + + [JsonPropertyName("ticks")] + [JsonConverter(typeof(LongToStringConverter))] + public long Ticks { get; set; } + + [JsonPropertyName("minimalStartTicks")] + [JsonConverter(typeof(LongToStringConverter))] + public long MinimalStartTicks { get; set; } + + [JsonPropertyName("startExecutingTicks")] + [JsonConverter(typeof(LongToStringConverter))] + public long? StartExecutingTicks { get; set; } + + [JsonPropertyName("finishExecutingTicks")] + [JsonConverter(typeof(LongToStringConverter))] + public long? FinishExecutingTicks { get; set; } + + [JsonPropertyName("lastModificationTicks")] + [JsonConverter(typeof(LongToStringConverter))] + public long? LastModificationTicks { get; set; } + + [JsonPropertyName("expirationTimestampTicks")] + [JsonConverter(typeof(LongToStringConverter))] + public long? ExpirationTimestampTicks { get; set; } + + [JsonPropertyName("expirationModificationTicks")] + [JsonConverter(typeof(LongToStringConverter))] + public long? ExpirationModificationTicks { get; set; } + + [JsonPropertyName("executionDurationTicks")] + [JsonConverter(typeof(LongToStringConverter))] + public long? ExecutionDurationTicks { get; set; } + + [JsonPropertyName("state")] + [JsonConverter(typeof(JsonStringEnumConverter))] + public TaskState State { get; set; } + + [CanBeNull] + [JsonPropertyName("taskActions")] + public TaskActions TaskActions { get; set; } + + [JsonPropertyName("attempts")] + public int Attempts { get; set; } + + [JsonPropertyName("parentTaskId")] + public string ParentTaskId { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskModel.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskModel.cs index 567a5c1b..a2421862 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskModel.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskModel.cs @@ -1,31 +1,30 @@ using JetBrains.Annotations; -using Newtonsoft.Json; +using System.Text.Json.Serialization; using SkbKontur.Cassandra.DistributedTaskQueue.Handling; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api; + +public class RtqMonitoringTaskModel { - public class RtqMonitoringTaskModel - { - public static RtqMonitoringTaskModel Empty => new RtqMonitoringTaskModel(); - - [NotNull] - [JsonProperty("taskMeta")] - public RtqMonitoringTaskMeta TaskMeta { get; set; } - - [NotNull] - [JsonProperty("taskData")] - [JsonConverter(typeof(TaskDataJsonSerializer))] - public IRtqTaskData TaskData { get; set; } - - [NotNull, ItemNotNull] - [JsonProperty("childTaskIds")] - public string[] ChildTaskIds { get; set; } - - [NotNull, ItemNotNull] - [JsonProperty("exceptionInfos")] - public string[] ExceptionInfos { get; set; } - } + public static RtqMonitoringTaskModel Empty => new RtqMonitoringTaskModel(); + + [NotNull] + [JsonPropertyName("taskMeta")] + public RtqMonitoringTaskMeta TaskMeta { get; set; } + + [NotNull] + [JsonPropertyName("taskData")] + [JsonConverter(typeof(TaskDataJsonSerializer))] + public IRtqTaskData TaskData { get; set; } + + [NotNull, ItemNotNull] + [JsonPropertyName("childTaskIds")] + public string[] ChildTaskIds { get; set; } + + [NotNull, ItemNotNull] + [JsonPropertyName("exceptionInfos")] + public string[] ExceptionInfos { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/TaskActions.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/TaskActions.cs index 4c9e6fd1..d47f8efd 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Api/TaskActions.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/TaskActions.cs @@ -1,12 +1,12 @@ -using Newtonsoft.Json; +using System.Text.Json.Serialization; namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api; public class TaskActions { - [JsonProperty("canCancel")] + [JsonPropertyName("canCancel")] public bool CanCancel { get; set; } - [JsonProperty("canRerun")] + [JsonPropertyName("canRerun")] public bool CanRerun { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/TimestampRange.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/TimestampRange.cs index 37e36a65..ab4b8c8c 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Api/TimestampRange.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/TimestampRange.cs @@ -1,22 +1,21 @@ -using JetBrains.Annotations; +using System.Text.Json.Serialization; -using Newtonsoft.Json; +using JetBrains.Annotations; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; using SkbKontur.Cassandra.TimeBasedUuid; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api; + +public class TimestampRange { - public class TimestampRange - { - [NotNull] - [JsonProperty("lowerBound")] - [JsonConverter(typeof(TimestampJsonConverter))] - public Timestamp LowerBound { get; set; } + [NotNull] + [JsonPropertyName("lowerBound")] + [JsonConverter(typeof(TimestampJsonConverter))] + public Timestamp LowerBound { get; set; } = null!; - [NotNull] - [JsonProperty("upperBound")] - [JsonConverter(typeof(TimestampJsonConverter))] - public Timestamp UpperBound { get; set; } - } + [NotNull] + [JsonPropertyName("upperBound")] + [JsonConverter(typeof(TimestampJsonConverter))] + public Timestamp UpperBound { get; set; } = null!; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Cassandra.DistributedTaskQueue.Monitoring.csproj b/Cassandra.DistributedTaskQueue.Monitoring/Cassandra.DistributedTaskQueue.Monitoring.csproj index 72686f4d..45d41ca0 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Cassandra.DistributedTaskQueue.Monitoring.csproj +++ b/Cassandra.DistributedTaskQueue.Monitoring/Cassandra.DistributedTaskQueue.Monitoring.csproj @@ -10,8 +10,8 @@ - + diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/ElasticsearchResponseWrapperExtensions.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/ElasticsearchResponseWrapperExtensions.cs index b953129d..71322bc8 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/ElasticsearchResponseWrapperExtensions.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/ElasticsearchResponseWrapperExtensions.cs @@ -1,74 +1,69 @@ -using System; +#nullable enable + +using System; using System.Linq; using System.Text; +using System.Text.Json; using Elasticsearch.Net; -using JetBrains.Annotations; - -using Newtonsoft.Json; - using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions; + +// todo (andrew, 11.11.2018): maybe enable ConnectionConfiguration.ThrowExceptions() instead of all this boilerplate +internal static class ElasticsearchResponseWrapperExtensions { - // todo (andrew, 11.11.2018): maybe enable ConnectionConfiguration.ThrowExceptions() instead of all this boilerplate - internal static class ElasticsearchResponseWrapperExtensions + public static T EnsureSuccess(this T response) + where T : ElasticsearchResponseBase { - [NotNull] - public static T EnsureSuccess([NotNull] this T response) - where T : ElasticsearchResponseBase - { - if (!response.Success) - throw new InvalidOperationException(response.ExtendErrorMessageWithElasticInfo("Unsuccessful response"), response.OriginalException); - return response; - } + if (!response.Success) + throw new InvalidOperationException(response.ExtendErrorMessageWithElasticInfo("Unsuccessful response"), response.OriginalException); + return response; + } - public static void DieIfBulkRequestFailed([NotNull] this StringResponse response) + public static void DieIfBulkRequestFailed(this StringResponse response) + { + response.EnsureSuccess(); + var bulkResponse = JsonSerializer.Deserialize(response.Body); + if (bulkResponse!.HasErrors) { - response.EnsureSuccess(); - var bulkResponse = JsonConvert.DeserializeObject(response.Body); - if (bulkResponse.HasErrors) - { - var innerExceptions = bulkResponse - .Items - .Select(x => x.Index ?? x.Create ?? x.Update ?? x.Delete) - .Select(CreateExceptionIfError) - .Where(x => x != null) - .ToList(); - var message = response.ExtendErrorMessageWithElasticInfo($"Bulk request failure [{innerExceptions.Count}]"); - throw new InvalidOperationException(message, new AggregateException(innerExceptions.Take(100))); - } + var innerExceptions = bulkResponse + .Items + .Select(x => x.Index ?? x.Create ?? x.Update ?? x.Delete) + .Select(CreateExceptionIfError) + .Where(x => x != null) + .ToList(); + var message = response.ExtendErrorMessageWithElasticInfo($"Bulk request failure [{innerExceptions.Count}]"); + throw new InvalidOperationException(message, new AggregateException(innerExceptions.Take(100))); } + } - [NotNull] - private static string ExtendErrorMessageWithElasticInfo([CanBeNull] this T response, string errorMessage) - where T : IElasticsearchResponse - { - var fullErrorMessage = new StringBuilder($"ElasticSearch error: '{errorMessage}'").AppendLine(); - - // note ConnectionConfiguration.EnableMetrics() is gone (https://github.com/elastic/elasticsearch-net/issues/1762) - // note IElasticSearchResponse.NumberOfRetries is gone (https://stackoverflow.com/questions/38602670/what-is-the-elasticsearch-net-2-x-equivalent-for-ielasticsearchresponse-numberof) + private static string ExtendErrorMessageWithElasticInfo(this T? response, string errorMessage) + where T : IElasticsearchResponse + { + var fullErrorMessage = new StringBuilder($"ElasticSearch error: '{errorMessage}'").AppendLine(); - if (response != null) - { - if (response.TryGetServerErrorReason(out var serverErrorReason)) - fullErrorMessage.AppendLine($"ServerErrorReason: '{serverErrorReason}'"); + // note ConnectionConfiguration.EnableMetrics() is gone (https://github.com/elastic/elasticsearch-net/issues/1762) + // note IElasticSearchResponse.NumberOfRetries is gone (https://stackoverflow.com/questions/38602670/what-is-the-elasticsearch-net-2-x-equivalent-for-ielasticsearchresponse-numberof) - fullErrorMessage.AppendLine($"For response: '{response}'"); - } + if (response != null) + { + if (response.TryGetServerErrorReason(out var serverErrorReason)) + fullErrorMessage.AppendLine($"ServerErrorReason: '{serverErrorReason}'"); - return fullErrorMessage.ToString(); + fullErrorMessage.AppendLine($"For response: '{response}'"); } - [CanBeNull] - private static Exception CreateExceptionIfError([NotNull] ResponseBase responseItem, int requestNumber) - { - return responseItem.Status >= 400 && responseItem.Status < 600 - ? new InvalidOperationException($"Request number #{requestNumber} failed: '{responseItem.ToPrettyJson()}'") - : null; - } + return fullErrorMessage.ToString(); + } + + private static Exception? CreateExceptionIfError(ResponseBase responseItem, int requestNumber) + { + return responseItem.Status >= 400 && responseItem.Status < 600 + ? new InvalidOperationException($"Request number #{requestNumber} failed: '{responseItem.ToPrettyJson()}'") + : null; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/BulkResponse.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/BulkResponse.cs index e4f85809..9a3fee97 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/BulkResponse.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/BulkResponse.cs @@ -1,16 +1,15 @@ -using Newtonsoft.Json; +using System.Text.Json.Serialization; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk; + +internal class BulkResponse { - internal class BulkResponse - { - [JsonProperty(PropertyName = "took")] - public int TimeMs { get; set; } + [JsonPropertyName("took")] + public int TimeMs { get; set; } - [JsonProperty(PropertyName = "errors")] - public bool HasErrors { get; set; } + [JsonPropertyName("errors")] + public bool HasErrors { get; set; } - [JsonProperty(PropertyName = "items")] - public ItemInfo[] Items { get; set; } - } + [JsonPropertyName("items")] + public ItemInfo[] Items { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/IndexBulkResponse.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/IndexBulkResponse.cs index 2d318121..8e300c66 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/IndexBulkResponse.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/IndexBulkResponse.cs @@ -1,19 +1,18 @@ -using Newtonsoft.Json; +using System.Text.Json.Serialization; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk; + +internal class IndexBulkResponse : ResponseBase { - internal class IndexBulkResponse : ResponseBase - { - [JsonProperty(PropertyName = "_index")] - public string Index { get; set; } + [JsonPropertyName("_index")] + public string Index { get; set; } - [JsonProperty(PropertyName = "_type")] - public string Type { get; set; } + [JsonPropertyName("_type")] + public string Type { get; set; } - [JsonProperty(PropertyName = "_id")] - public string Id { get; set; } + [JsonPropertyName("_id")] + public string Id { get; set; } - [JsonProperty(PropertyName = "_version")] - public long Version { get; set; } - } + [JsonPropertyName("_version")] + public long Version { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/ItemInfo.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/ItemInfo.cs index f21e68da..9b47e6c7 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/ItemInfo.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/ItemInfo.cs @@ -1,19 +1,18 @@ -using Newtonsoft.Json; +using System.Text.Json.Serialization; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk; + +internal class ItemInfo { - internal class ItemInfo - { - [JsonProperty(PropertyName = "index")] - public IndexBulkResponse Index { get; set; } + [JsonPropertyName("index")] + public IndexBulkResponse Index { get; set; } - [JsonProperty(PropertyName = "create")] - public ResponseBase Create { get; set; } + [JsonPropertyName("create")] + public ResponseBase Create { get; set; } - [JsonProperty(PropertyName = "update")] - public ResponseBase Update { get; set; } + [JsonPropertyName("update")] + public ResponseBase Update { get; set; } - [JsonProperty(PropertyName = "delete")] - public ResponseBase Delete { get; set; } - } + [JsonPropertyName("delete")] + public ResponseBase Delete { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Get/GetResponse.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Get/GetResponse.cs index 02873470..fab04199 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Get/GetResponse.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Get/GetResponse.cs @@ -1,25 +1,24 @@ -using Newtonsoft.Json; +using System.Text.Json.Serialization; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Get +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Get; + +internal class GetResponse { - internal class GetResponse - { - [JsonProperty(PropertyName = "_index")] - public string Index { get; set; } + [JsonPropertyName("_index")] + public string Index { get; set; } - [JsonProperty(PropertyName = "_type")] - public string Type { get; set; } + [JsonPropertyName("_type")] + public string Type { get; set; } - [JsonProperty(PropertyName = "_id")] - public string Id { get; set; } + [JsonPropertyName("_id")] + public string Id { get; set; } - [JsonProperty(PropertyName = "_version")] - public long Version { get; set; } + [JsonPropertyName("_version")] + public long Version { get; set; } - [JsonProperty(PropertyName = "found")] - public bool Found { get; set; } + [JsonPropertyName("found")] + public bool Found { get; set; } - [JsonProperty(PropertyName = "_source")] - public T Source { get; set; } - } + [JsonPropertyName("_source")] + public T Source { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/ResponseBase.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/ResponseBase.cs index e096d783..398e8803 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/ResponseBase.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/ResponseBase.cs @@ -1,13 +1,12 @@ -using Newtonsoft.Json; +using System.Text.Json.Serialization; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses; + +internal class ResponseBase { - internal class ResponseBase - { - [JsonProperty(PropertyName = "status")] - public int Status { get; set; } + [JsonPropertyName("status")] + public int Status { get; set; } - [JsonProperty(PropertyName = "error")] - public object Error { get; set; } - } + [JsonPropertyName("error")] + public object Error { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitInfo.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitInfo.cs index 9598ffed..75fe33b6 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitInfo.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitInfo.cs @@ -1,13 +1,11 @@ -using JetBrains.Annotations; +#nullable enable -using Newtonsoft.Json; +using System.Text.Json.Serialization; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search; + +internal class HitInfo { - internal class HitInfo - { - [NotNull] - [JsonProperty(PropertyName = "_id")] - public string Id { get; set; } - } + [JsonPropertyName("_id")] + public string Id { get; set; } = null!; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitsCollection.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitsCollection.cs index ae6e9609..70330525 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitsCollection.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitsCollection.cs @@ -1,19 +1,17 @@ -using JetBrains.Annotations; +#nullable enable -using Newtonsoft.Json; +using System.Text.Json.Serialization; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search; + +internal class HitsCollection { - internal class HitsCollection - { - [JsonProperty(PropertyName = "total")] - [JsonConverter(typeof(TotalCountCompatibilityConverter))] - public long TotalCount { get; set; } + [JsonPropertyName("total")] + [JsonConverter(typeof(TotalCountCompatibilityConverter))] + public long TotalCount { get; set; } - [NotNull, ItemNotNull] - [JsonProperty(PropertyName = "hits")] - public HitInfo[] Hits { get; set; } - } + [JsonPropertyName("hits")] + public HitInfo[] Hits { get; set; } = null!; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/SearchResponse.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/SearchResponse.cs index 71e17bca..48dbdbbf 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/SearchResponse.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/SearchResponse.cs @@ -1,13 +1,11 @@ -using JetBrains.Annotations; +#nullable enable -using Newtonsoft.Json; +using System.Text.Json.Serialization; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search; + +internal class SearchResponse { - internal class SearchResponse - { - [NotNull] - [JsonProperty(PropertyName = "hits")] - public HitsCollection Hits { get; set; } - } + [JsonPropertyName("hits")] + public HitsCollection Hits { get; set; } = null!; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Indexer/RtqElasticsearchIndexerSettings.cs b/Cassandra.DistributedTaskQueue.Monitoring/Indexer/RtqElasticsearchIndexerSettings.cs index b48bfd70..6cef3801 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Indexer/RtqElasticsearchIndexerSettings.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Indexer/RtqElasticsearchIndexerSettings.cs @@ -1,80 +1,69 @@ -using System; +#nullable enable -using JetBrains.Annotations; - -using Newtonsoft.Json; -using Newtonsoft.Json.Converters; +using System; +using System.Text.Json; +using System.Text.Json.Serialization; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing; using SkbKontur.Cassandra.TimeBasedUuid; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer; + +public class RtqElasticsearchIndexerSettings { - public class RtqElasticsearchIndexerSettings + public RtqElasticsearchIndexerSettings(string eventFeedKey, string rtqGraphitePathPrefix) { - public RtqElasticsearchIndexerSettings([NotNull] string eventFeedKey, [NotNull] string rtqGraphitePathPrefix) - { - if (string.IsNullOrEmpty(eventFeedKey)) - throw new InvalidOperationException("eventFeedKey is empty"); - if (string.IsNullOrEmpty(rtqGraphitePathPrefix)) - throw new InvalidOperationException("rtqGraphitePathPrefix is empty"); + if (string.IsNullOrEmpty(eventFeedKey)) + throw new InvalidOperationException("eventFeedKey is empty"); + if (string.IsNullOrEmpty(rtqGraphitePathPrefix)) + throw new InvalidOperationException("rtqGraphitePathPrefix is empty"); - EventFeedKey = eventFeedKey; - RtqGraphitePathPrefix = rtqGraphitePathPrefix; - } - - [NotNull] - public string EventFeedKey { get; } + EventFeedKey = eventFeedKey; + RtqGraphitePathPrefix = rtqGraphitePathPrefix; + } - [NotNull] - public string RtqGraphitePathPrefix { get; } + public static JsonSerializerOptions GetJsonOptions() + { + var defaultJsonSerializerOptions = new JsonSerializerOptions(); + defaultJsonSerializerOptions.Converters.Add(new TruncateLongStringsConverter(500)); + defaultJsonSerializerOptions.Converters.Add(new JsonStringEnumConverter()); + defaultJsonSerializerOptions.Converters.Add(new TimestampJsonConverter()); + defaultJsonSerializerOptions.Converters.Add(new TimeGuidJsonConverter()); + defaultJsonSerializerOptions.Converters.Add(new OmitBinaryAndAbstractPropertyConverter()); + + return defaultJsonSerializerOptions; + } - [NotNull] - public string PerfGraphitePathPrefix => $"{RtqGraphitePathPrefix}.ElasticsearchIndexer.Perf"; + public string EventFeedKey { get; } - [NotNull] - public string EventFeedGraphitePathPrefix => $"{RtqGraphitePathPrefix}.ElasticsearchIndexer.EventFeed"; + public string RtqGraphitePathPrefix { get; } - [NotNull] - public Timestamp InitialIndexingStartTimestamp { get; set; } = new Timestamp(new DateTime(2020, 01, 01, 0, 0, 0, DateTimeKind.Utc)); + public string PerfGraphitePathPrefix => $"{RtqGraphitePathPrefix}.ElasticsearchIndexer.Perf"; - public TimeSpan MaxEventsProcessingTimeWindow { get; set; } = TimeSpan.FromHours(1); + public string EventFeedGraphitePathPrefix => $"{RtqGraphitePathPrefix}.ElasticsearchIndexer.EventFeed"; - public int MaxEventsProcessingTasksCount { get; set; } = 60000; + public Timestamp InitialIndexingStartTimestamp { get; } = new(new DateTime(2020, 01, 01, 0, 0, 0, DateTimeKind.Utc)); - public int TaskIdsProcessingBatchSize { get; set; } = 4000; + public TimeSpan MaxEventsProcessingTimeWindow { get; } = TimeSpan.FromHours(1); - public int IndexingThreadsCount { get; set; } = 2; + public int MaxEventsProcessingTasksCount { get; set; } = 60000; - public TimeSpan BulkIndexRequestTimeout { get; set; } = TimeSpan.FromMinutes(5); + public int TaskIdsProcessingBatchSize { get; set; } = 4000; - public TimeSpan InitialIndexingOffsetFromNow { get; set; } = TimeSpan.FromMinutes(30); + public int IndexingThreadsCount { get; set; } = 2; - [NotNull] - public JsonSerializerSettings JsonSerializerSettings { get; } = DefaultJsonSerializerSettings; + public TimeSpan BulkIndexRequestTimeout { get; set; } = TimeSpan.FromMinutes(5); - public override string ToString() - { - return $"InitialIndexingStartTimestamp: {InitialIndexingStartTimestamp}, " + - $"MaxEventsProcessingTimeWindow: {MaxEventsProcessingTimeWindow}, " + - $"MaxEventsProcessingTasksCount: {MaxEventsProcessingTasksCount}, " + - $"TaskIdsProcessingBatchSize: {TaskIdsProcessingBatchSize}, " + - $"IndexingThreadsCount: {IndexingThreadsCount}, " + - $"BulkIndexRequestTimeout: {BulkIndexRequestTimeout}"; - } + public TimeSpan InitialIndexingOffsetFromNow { get; set; } = TimeSpan.FromMinutes(30); - [NotNull] - public static readonly JsonSerializerSettings DefaultJsonSerializerSettings = new JsonSerializerSettings - { - ContractResolver = new OmitBinaryAndAbstractPropertiesContractResolver(), - Converters = new JsonConverter[] - { - new TruncateLongStringsConverter(500), - new StringEnumConverter(), - new TimestampJsonConverter(), - new TimeGuidJsonConverter() - }, - }; + public override string ToString() + { + return $"InitialIndexingStartTimestamp: {InitialIndexingStartTimestamp}, " + + $"MaxEventsProcessingTimeWindow: {MaxEventsProcessingTimeWindow}, " + + $"MaxEventsProcessingTasksCount: {MaxEventsProcessingTasksCount}, " + + $"TaskIdsProcessingBatchSize: {TaskIdsProcessingBatchSize}, " + + $"IndexingThreadsCount: {IndexingThreadsCount}, " + + $"BulkIndexRequestTimeout: {BulkIndexRequestTimeout}"; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Indexer/TaskMetaProcessor.cs b/Cassandra.DistributedTaskQueue.Monitoring/Indexer/TaskMetaProcessor.cs index 6cc74e23..52ceb0d7 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Indexer/TaskMetaProcessor.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Indexer/TaskMetaProcessor.cs @@ -1,17 +1,16 @@ -using System; +#nullable enable + +using System; using System.Collections.Generic; using System.Linq; +using System.Text.Json; using Elasticsearch.Net; using GroBuf; -using JetBrains.Annotations; - using MoreLinqInlined; -using Newtonsoft.Json; - using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Entities; using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Repositories; using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Repositories.BlobStorages; @@ -24,146 +23,143 @@ using Vostok.Logging.Abstractions; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer; + +public class TaskMetaProcessor { - public class TaskMetaProcessor + public TaskMetaProcessor(ILog logger, + RtqElasticsearchIndexerSettings settings, + IRtqElasticsearchClient elasticClient, + RemoteTaskQueue remoteTaskQueue, + RtqMonitoringPerfGraphiteReporter perfGraphiteReporter) { - public TaskMetaProcessor(ILog logger, - RtqElasticsearchIndexerSettings settings, - IRtqElasticsearchClient elasticClient, - RemoteTaskQueue remoteTaskQueue, - RtqMonitoringPerfGraphiteReporter perfGraphiteReporter) - { - this.logger = logger.ForContext(nameof(TaskMetaProcessor)); - this.settings = settings; - handleTasksMetaStorage = remoteTaskQueue.HandleTasksMetaStorage; - taskDataRegistry = remoteTaskQueue.TaskDataRegistry; - taskDataStorage = remoteTaskQueue.TaskDataStorage; - taskExceptionInfoStorage = remoteTaskQueue.TaskExceptionInfoStorage; - serializer = remoteTaskQueue.Serializer; - this.perfGraphiteReporter = perfGraphiteReporter; - this.elasticClient = elasticClient; - bulkRequestTimeout = new BulkRequestParameters - { - Timeout = this.settings.BulkIndexRequestTimeout, - RequestConfiguration = new RequestConfiguration {RequestTimeout = this.settings.BulkIndexRequestTimeout} - }; - } + this.logger = logger.ForContext(nameof(TaskMetaProcessor)); + this.settings = settings; + handleTasksMetaStorage = remoteTaskQueue.HandleTasksMetaStorage; + taskDataRegistry = remoteTaskQueue.TaskDataRegistry; + taskDataStorage = remoteTaskQueue.TaskDataStorage; + taskExceptionInfoStorage = remoteTaskQueue.TaskExceptionInfoStorage; + serializer = remoteTaskQueue.Serializer; + this.perfGraphiteReporter = perfGraphiteReporter; + this.elasticClient = elasticClient; + bulkRequestTimeout = new BulkRequestParameters + { + Timeout = this.settings.BulkIndexRequestTimeout, + RequestConfiguration = new RequestConfiguration {RequestTimeout = this.settings.BulkIndexRequestTimeout} + }; + } - public void ProcessTasks([NotNull, ItemNotNull] List taskIdsToProcess) - { - logger.Info("Processing tasks: {ProcessingTasksCount}", new {ProcessingTasksCount = taskIdsToProcess.Count}); - taskIdsToProcess.Batch(settings.TaskIdsProcessingBatchSize, Enumerable.ToArray) - .AsParallel() - .WithDegreeOfParallelism(settings.IndexingThreadsCount) - .WithExecutionMode(ParallelExecutionMode.ForceParallelism) - .ForAll(taskIds => - { - var taskMetas = perfGraphiteReporter.ReportTiming("ReadTaskMetas", () => handleTasksMetaStorage.GetMetas(taskIds)); - var taskMetasToIndex = taskMetas.Values.Where(x => x.Ticks > settings.InitialIndexingStartTimestamp.Ticks).ToArray(); - if (taskMetasToIndex.Any()) - IndexMetas(taskMetasToIndex); - }); - } + public void ProcessTasks(List taskIdsToProcess) + { + logger.Info("Processing tasks: {ProcessingTasksCount}", new {ProcessingTasksCount = taskIdsToProcess.Count}); + taskIdsToProcess.Batch(settings.TaskIdsProcessingBatchSize, Enumerable.ToArray) + .AsParallel() + .WithDegreeOfParallelism(settings.IndexingThreadsCount) + .WithExecutionMode(ParallelExecutionMode.ForceParallelism) + .ForAll(taskIds => + { + var taskMetas = perfGraphiteReporter.ReportTiming("ReadTaskMetas", () => handleTasksMetaStorage.GetMetas(taskIds)); + var taskMetasToIndex = taskMetas.Values.Where(x => x.Ticks > settings.InitialIndexingStartTimestamp.Ticks).ToArray(); + if (taskMetasToIndex.Any()) + IndexMetas(taskMetasToIndex); + }); + } - private void IndexMetas([NotNull, ItemNotNull] TaskMetaInformation[] batch) + private void IndexMetas(TaskMetaInformation[] batch) + { + var taskDatas = perfGraphiteReporter.ReportTiming("ReadTaskDatas", () => taskDataStorage.Read(batch)); + var taskExceptionInfos = perfGraphiteReporter.ReportTiming("ReadTaskExceptionInfos", () => taskExceptionInfoStorage.Read(batch)); + var enrichedBatch = new ( TaskMetaInformation TaskMeta, TaskExceptionInfo[] TaskExceptionInfos, object? TaskData)[batch.Length]; + for (var i = 0; i < batch.Length; i++) { - var taskDatas = perfGraphiteReporter.ReportTiming("ReadTaskDatas", () => taskDataStorage.Read(batch)); - var taskExceptionInfos = perfGraphiteReporter.ReportTiming("ReadTaskExceptionInfos", () => taskExceptionInfoStorage.Read(batch)); - var enrichedBatch = new ( /*[NotNull]*/ TaskMetaInformation TaskMeta, /*[NotNull, ItemNotNull]*/ TaskExceptionInfo[] TaskExceptionInfos, /*[CanBeNull]*/ object TaskData)[batch.Length]; - for (var i = 0; i < batch.Length; i++) + var taskMeta = batch[i]; + object? taskDataObj = null; + if (taskDatas.TryGetValue(taskMeta.Id, out var taskData)) { - var taskMeta = batch[i]; - object taskDataObj = null; - if (taskDatas.TryGetValue(taskMeta.Id, out var taskData)) - { - if (taskDataRegistry.TryGetTaskType(taskMeta.Name, out var taskType)) - taskDataObj = TryDeserializeTaskData(taskType, taskData, taskMeta); - } - enrichedBatch[i] = (taskMeta, taskExceptionInfos[taskMeta.Id], taskDataObj); + if (taskDataRegistry.TryGetTaskType(taskMeta.Name, out var taskType)) + taskDataObj = TryDeserializeTaskData(taskType!, taskData, taskMeta); } - perfGraphiteReporter.ReportTiming("IndexBatch", () => IndexBatch(enrichedBatch)); + enrichedBatch[i] = (taskMeta, taskExceptionInfos[taskMeta.Id], taskDataObj); } + perfGraphiteReporter.ReportTiming("IndexBatch", () => IndexBatch(enrichedBatch)); + } - private void IndexBatch([NotNull] ( /*[NotNull]*/ TaskMetaInformation TaskMeta, /*[NotNull, ItemNotNull]*/ TaskExceptionInfo[] TaskExceptionInfos, /*[CanBeNull]*/ object TaskData)[] batch) + private void IndexBatch((TaskMetaInformation TaskMeta, TaskExceptionInfo[] TaskExceptionInfos, object? TaskData)[] batch) + { + logger.Info("IndexBatch: {BatchLength} tasks", new {BatchLength = batch.Length}); + var payload = new string[batch.Length * 2]; + for (var i = 0; i < batch.Length; i++) { - logger.Info("IndexBatch: {BatchLength} tasks", new {BatchLength = batch.Length}); - var payload = new string[batch.Length * 2]; - for (var i = 0; i < batch.Length; i++) - { - payload[2 * i] = JsonConvert.SerializeObject(new {index = CreateIndexInfo(batch[i].TaskMeta)}); - var taskIndexedInfo = BuildTaskIndexedInfo(batch[i].TaskMeta, batch[i].TaskExceptionInfos, batch[i].TaskData); - payload[2 * i + 1] = JsonConvert.SerializeObject(taskIndexedInfo, settings.JsonSerializerSettings); - } - perfGraphiteReporter.ReportTiming("ElasticsearchClient_Bulk", () => elasticClient.Bulk(PostData.MultiJson(payload), bulkRequestTimeout).DieIfBulkRequestFailed()); + payload[2 * i] = JsonSerializer.Serialize(new {index = CreateIndexInfo(batch[i].TaskMeta)}); + var taskIndexedInfo = BuildTaskIndexedInfo(batch[i].TaskMeta, batch[i].TaskExceptionInfos, batch[i].TaskData); + payload[2 * i + 1] = JsonSerializer.Serialize(taskIndexedInfo, RtqElasticsearchIndexerSettings.GetJsonOptions()); } + perfGraphiteReporter.ReportTiming("ElasticsearchClient_Bulk", () => elasticClient.Bulk(PostData.MultiJson(payload), bulkRequestTimeout).DieIfBulkRequestFailed()); + } - private object CreateIndexInfo(TaskMetaInformation taskMeta) + private object CreateIndexInfo(TaskMetaInformation taskMeta) + { + var indexName = DateTimeFormatter.DateFromTicks(taskMeta.Ticks).ToString(RtqElasticsearchConsts.DataIndexNameFormat); + if (elasticClient.UseElastic7) { - var indexName = DateTimeFormatter.DateFromTicks(taskMeta.Ticks).ToString(RtqElasticsearchConsts.DataIndexNameFormat); - if (elasticClient.UseElastic7) - { - return new - { - _index = indexName, - _id = taskMeta.Id, - }; - } return new { _index = indexName, - _type = RtqElasticsearchConsts.RtqIndexTypeName, // see https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html _id = taskMeta.Id, }; } + return new + { + _index = indexName, + _type = RtqElasticsearchConsts.RtqIndexTypeName, // see https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html + _id = taskMeta.Id, + }; + } - [CanBeNull] - private object TryDeserializeTaskData([NotNull] Type taskType, [NotNull] byte[] taskData, [NotNull] TaskMetaInformation taskMetaInformation) + private object TryDeserializeTaskData(Type taskType, byte[] taskData, TaskMetaInformation taskMetaInformation) + { + try { - try - { - return serializer.Deserialize(taskType, taskData); - } - catch (Exception e) - { - logger.Error(e, "Failed to deserialize taskData for: {RtqTaskMeta}", new {RtqTaskMeta = taskMetaInformation}); - return null; - } + return serializer.Deserialize(taskType, taskData); } - - [NotNull] - private static object BuildTaskIndexedInfo([NotNull] TaskMetaInformation taskMeta, [NotNull, ItemNotNull] TaskExceptionInfo[] taskExceptionInfos, [CanBeNull] object taskData) + catch (Exception e) { - var executionDurationTicks = taskMeta.ExecutionDurationTicks; - var meta = new MetaIndexedInfo - { - Id = taskMeta.Id, - Name = taskMeta.Name, - State = taskMeta.State.ToString(), - Attempts = taskMeta.Attempts, - ParentTaskId = taskMeta.ParentTaskId, - TaskGroupLock = taskMeta.TaskGroupLock, - EnqueueTime = taskMeta.Ticks, - FinishExecutingTime = taskMeta.FinishExecutingTicks, - LastModificationTime = taskMeta.LastModificationTicks ?? 0, - MinimalStartTime = taskMeta.MinimalStartTicks, - StartExecutingTime = taskMeta.StartExecutingTicks, - ExpirationTime = taskMeta.ExpirationTimestampTicks ?? 0, - LastExecutionDurationInMs = executionDurationTicks != null ? TimeSpan.FromTicks(executionDurationTicks.Value).TotalMilliseconds : (double?)null - }; - var exceptionInfo = string.Join("\r\n", taskExceptionInfos.Reverse().Select(x => x.ExceptionMessageInfo)); - return new TaskIndexedInfo(meta, exceptionInfo, taskData); + logger.Error(e, "Failed to deserialize taskData for: {RtqTaskMeta}", new {RtqTaskMeta = taskMetaInformation}); + return null; } + } - private readonly ILog logger; - private readonly RtqElasticsearchIndexerSettings settings; - private readonly IHandleTasksMetaStorage handleTasksMetaStorage; - private readonly IRtqTaskDataRegistry taskDataRegistry; - private readonly ITaskDataStorage taskDataStorage; - private readonly ITaskExceptionInfoStorage taskExceptionInfoStorage; - private readonly ISerializer serializer; - private readonly RtqMonitoringPerfGraphiteReporter perfGraphiteReporter; - private readonly IRtqElasticsearchClient elasticClient; - private readonly BulkRequestParameters bulkRequestTimeout; + private static object BuildTaskIndexedInfo(TaskMetaInformation taskMeta, TaskExceptionInfo[] taskExceptionInfos, object? taskData) + { + var executionDurationTicks = taskMeta.ExecutionDurationTicks; + var meta = new MetaIndexedInfo + { + Id = taskMeta.Id, + Name = taskMeta.Name, + State = taskMeta.State.ToString(), + Attempts = taskMeta.Attempts, + ParentTaskId = taskMeta.ParentTaskId, + TaskGroupLock = taskMeta.TaskGroupLock, + EnqueueTime = taskMeta.Ticks, + FinishExecutingTime = taskMeta.FinishExecutingTicks, + LastModificationTime = taskMeta.LastModificationTicks ?? 0, + MinimalStartTime = taskMeta.MinimalStartTicks, + StartExecutingTime = taskMeta.StartExecutingTicks, + ExpirationTime = taskMeta.ExpirationTimestampTicks ?? 0, + LastExecutionDurationInMs = executionDurationTicks != null ? TimeSpan.FromTicks(executionDurationTicks.Value).TotalMilliseconds : (double?)null + }; + var exceptionInfo = string.Join("\r\n", taskExceptionInfos.Reverse().Select(x => x.ExceptionMessageInfo)); + return new TaskIndexedInfo(meta, exceptionInfo, taskData); } + + private readonly ILog logger; + private readonly RtqElasticsearchIndexerSettings settings; + private readonly IHandleTasksMetaStorage handleTasksMetaStorage; + private readonly IRtqTaskDataRegistry taskDataRegistry; + private readonly ITaskDataStorage taskDataStorage; + private readonly ITaskExceptionInfoStorage taskExceptionInfoStorage; + private readonly ISerializer serializer; + private readonly RtqMonitoringPerfGraphiteReporter perfGraphiteReporter; + private readonly IRtqElasticsearchClient elasticClient; + private readonly BulkRequestParameters bulkRequestTimeout; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/JsonObjectExtensions.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/JsonObjectExtensions.cs index 0da88e30..114bcc11 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Json/JsonObjectExtensions.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/JsonObjectExtensions.cs @@ -1,24 +1,19 @@ -using System.Linq; +#nullable enable -using JetBrains.Annotations; +using System.Text.Json; +using System.Text.Json.Serialization; -using Newtonsoft.Json; -using Newtonsoft.Json.Converters; +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json +internal static class JsonObjectExtensions { - internal static class JsonObjectExtensions + public static string ToPrettyJson(this T obj) { - [NotNull] - public static string ToPrettyJson([CanBeNull] this T o, [NotNull] params JsonConverter[] converters) - { - converters = converters.Concat(new JsonConverter[] - { - new StringEnumConverter(), - new TimestampJsonConverter(), - new TimeGuidJsonConverter() - }).ToArray(); - return JsonConvert.SerializeObject(o, Formatting.Indented, converters); - } + var options = new JsonSerializerOptions(); + options.Converters.Add(new JsonStringEnumConverter()); + options.Converters.Add(new TimestampJsonConverter()); + options.Converters.Add(new TimeGuidJsonConverter()); + + return JsonSerializer.Serialize(obj, options); } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/LongToStringConverter.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/LongToStringConverter.cs index ec0d4f96..3f1ef6cb 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Json/LongToStringConverter.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/LongToStringConverter.cs @@ -1,40 +1,36 @@ -using System; +#nullable enable -using Newtonsoft.Json; +using System; +using System.Text.Json; +using System.Text.Json.Serialization; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; + +/// +/// Конвертер, которые сериализует long-и в строки вместо чисел, +/// ибо JSON.parse() на длинных long-ах теряет последние разряды +/// +public class LongToStringConverter : JsonConverter { - /// - /// Конвертер, которые сериализует long-и в строки вместо чисел, - /// ибо JSON.parse() на длинных long-ах теряет последние разряды - /// - public class LongToStringConverter : JsonConverter + public override long? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { - public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + var value = JsonSerializer.Deserialize(ref reader); + if (value == null) { - if (value == null) - { - writer.WriteNull(); - } - else - { - writer.WriteValue(value.ToString()); - } + return null; } + return long.Parse(value); + } - public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + public override void Write(Utf8JsonWriter writer, long? value, JsonSerializerOptions options) + { + if (value == null) { - var value = serializer.Deserialize(reader); - if (value == null) - { - return null; - } - return long.Parse(value); + writer.WriteNullValue(); } - - public override bool CanConvert(Type objectType) + else { - return typeof(long) == objectType || typeof(long?) == objectType; + writer.WriteStringValue(value.ToString()); } } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/ObjectJsonSerializer.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/ObjectJsonSerializer.cs index 0fd4af15..13e528a2 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Json/ObjectJsonSerializer.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/ObjectJsonSerializer.cs @@ -1,34 +1,25 @@ using System; - -using Newtonsoft.Json; -using Newtonsoft.Json.Converters; +using System.Text.Json; +using System.Text.Json.Serialization; using SkbKontur.Cassandra.DistributedTaskQueue.Handling; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; + +public class TaskDataJsonSerializer : JsonConverter { - public class TaskDataJsonSerializer : JsonConverter + public override IRtqTaskData Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { - public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) - { - staticSerializer.Serialize(writer, value); - } - - public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) - { - throw new NotImplementedException(); - } - - public override bool CanConvert(Type objectType) - { - return typeof(IRtqTaskData).IsAssignableFrom(objectType); - } + throw new NotImplementedException(); + } - public override bool CanRead => false; + public override void Write(Utf8JsonWriter writer, IRtqTaskData value, JsonSerializerOptions options) + { + JsonSerializerOptions jsonSerializerOptions = new(); + jsonSerializerOptions.Converters.Add(new JsonStringEnumConverter()); + jsonSerializerOptions.Converters.Add(new TimeGuidJsonConverter()); + jsonSerializerOptions.Converters.Add(new TimestampJsonConverter()); - private static readonly JsonSerializer staticSerializer = new JsonSerializer - { - Converters = {new StringEnumConverter(), new TimeGuidJsonConverter(), new TimestampJsonConverter()} - }; + JsonSerializer.Serialize(writer, value, jsonSerializerOptions); } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/OmitBinaryAndAbstractPropertyConverter.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/OmitBinaryAndAbstractPropertyConverter.cs new file mode 100644 index 00000000..844d6dd1 --- /dev/null +++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/OmitBinaryAndAbstractPropertyConverter.cs @@ -0,0 +1,28 @@ +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; + +public class OmitBinaryAndAbstractPropertyConverter : JsonConverter +{ + public override bool CanConvert(Type typeToConvert) + { + return typeToConvert.IsAbstract || IsBinaryType(typeToConvert); + } + + public override object Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + return JsonSerializer.Deserialize(ref reader, typeToConvert); + } + + public override void Write(Utf8JsonWriter writer, object value, JsonSerializerOptions options) + { + writer.WriteNullValue(); + } + + private static bool IsBinaryType(Type elementType) + { + return elementType == typeof(byte[]); + } +} \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/TimeGuidJsonConverter.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/TimeGuidJsonConverter.cs index a98fc6c4..e2bf8b03 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Json/TimeGuidJsonConverter.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/TimeGuidJsonConverter.cs @@ -1,34 +1,28 @@ using System; - -using Newtonsoft.Json; +using System.Text.Json; +using System.Text.Json.Serialization; using SkbKontur.Cassandra.TimeBasedUuid; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; + +internal class TimeGuidJsonConverter : JsonConverter { - internal class TimeGuidJsonConverter : JsonConverter + public override TimeGuid Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { - public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) - { - if (value == null) - writer.WriteNull(); - else - writer.WriteValue(((TimeGuid)value).ToGuid().ToString()); - } - - public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + if (reader.TokenType == JsonTokenType.String) { - if (reader.TokenType == JsonToken.String) - { - var readAsString = (string)reader.Value; - return TimeGuid.Parse(readAsString); - } - return null; + var readAsString = reader.GetString(); + return TimeGuid.Parse(readAsString); } + return null; + } - public override bool CanConvert(Type objectType) - { - return objectType == typeof(TimeGuid); - } + public override void Write(Utf8JsonWriter writer, TimeGuid value, JsonSerializerOptions options) + { + if (value == null) + writer.WriteNullValue(); + else + writer.WriteStringValue(value.ToGuid()); } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/TimestampJsonConverter.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/TimestampJsonConverter.cs index 5183b264..43809038 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Json/TimestampJsonConverter.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/TimestampJsonConverter.cs @@ -1,38 +1,27 @@ using System; - -using Newtonsoft.Json; +using System.Text.Json; +using System.Text.Json.Serialization; using SkbKontur.Cassandra.TimeBasedUuid; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; + +internal class TimestampJsonConverter : JsonConverter { - internal class TimestampJsonConverter : JsonConverter + public override Timestamp Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { - public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) - { - if (value == null) - writer.WriteNull(); - else - writer.WriteValue(((Timestamp)value).ToDateTime()); - } - - public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + if (reader.TryGetDateTime(out var dateTime)) { - switch (reader.TokenType) - { - case JsonToken.Null: - return null; - case JsonToken.Date: - return new Timestamp((DateTime)reader.Value); - case JsonToken.Integer: - return new Timestamp((long)reader.Value); - } - throw new JsonSerializationException($"Unexpected token when parsing timestamp. Expected Date or Integer with value type long, got {reader.TokenType}"); + return new Timestamp(dateTime); } + throw new JsonException($"Unexpected token when parsing timestamp. Expected Date or Integer with value type long, got {reader.TokenType}"); + } - public override bool CanConvert(Type objectType) - { - return objectType == typeof(Timestamp); - } + public override void Write(Utf8JsonWriter writer, Timestamp value, JsonSerializerOptions options) + { + if (value == null) + writer.WriteNullValue(); + else + writer.WriteStringValue(value.ToDateTime()); } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/TotalCountCompatibilityConverter.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/TotalCountCompatibilityConverter.cs index 40c54e1e..00a661d9 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Json/TotalCountCompatibilityConverter.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/TotalCountCompatibilityConverter.cs @@ -1,27 +1,24 @@ +#nullable enable + using System; +using System.Text.Json; +using System.Text.Json.Serialization; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; -#nullable enable -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json +internal class TotalCountCompatibilityConverter : JsonConverter { - internal class TotalCountCompatibilityConverter : JsonConverter + public override long Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { - public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer) - { - throw new NotImplementedException(); - } + if (reader.TokenType != JsonTokenType.StartObject) + return JsonSerializer.Deserialize(ref reader); - public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer) - { - if (reader.TokenType == JsonToken.StartObject) - return JObject.Load(reader)["value"]?.ToObject(objectType); - return reader.Value; - } - - public override bool CanWrite => false; + var jsonDocument = JsonDocument.ParseValue(ref reader).RootElement; + return jsonDocument.GetProperty("value").Deserialize(); + } - public override bool CanConvert(Type objectType) => true; + public override void Write(Utf8JsonWriter writer, long value, JsonSerializerOptions options) + { + throw new NotImplementedException(); } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/TwoKeysDictionaryConvertor.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/TwoKeysDictionaryConvertor.cs new file mode 100644 index 00000000..c90cac45 --- /dev/null +++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/TwoKeysDictionaryConvertor.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; + +// need type like {"PendingTaskCountsByNameAndTopic":{"(key1, key2)": object}} +public class TwoKeysDictionaryConvertor : JsonConverter> +{ + public override Dictionary<(string, string), T> Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var intermediateDictionary = JsonSerializer.Deserialize>(ref reader); + var actualDictionary = new Dictionary<(string, string), T>(); + + foreach (var keyValue in intermediateDictionary) + { + // stringKeys = "key1, key2" + var stringKey = keyValue.Key.Substring(1, keyValue.Key.Length - 2); + var stringKeys = stringKey.Split(',', ' '); + var firstKey = stringKeys.First(); + var secondKey = stringKeys.Last(); + + actualDictionary.Add((firstKey, secondKey), keyValue.Value); + } + + return actualDictionary; + } + + public override void Write(Utf8JsonWriter writer, Dictionary<(string, string), T> value, JsonSerializerOptions options) + { + var convertedDictionary = new Dictionary(); + foreach (var keyValue in value) + { + convertedDictionary.Add(keyValue.Key.ToString(), keyValue.Value); + } + JsonSerializer.Serialize(writer, convertedDictionary); + } +} \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Client/TaskSearchClient.cs b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Client/TaskSearchClient.cs index 3550c08c..4ba5a708 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Client/TaskSearchClient.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Client/TaskSearchClient.cs @@ -1,112 +1,107 @@ using System.Collections.Generic; using System.Linq; +using System.Text.Json; using Elasticsearch.Net; -using JetBrains.Annotations; - -using Newtonsoft.Json; - using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Search; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Client +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Client; + +public class TaskSearchClient { - public class TaskSearchClient + public TaskSearchClient(IRtqElasticsearchClient elasticClient) { - public TaskSearchClient(IRtqElasticsearchClient elasticClient) + this.elasticClient = elasticClient; + } + + public TaskSearchResponse Search(TaskSearchRequest taskSearchRequest, int from, int size) + { + var mustClause = new List + { + new + { + query_string = new + { + query = taskSearchRequest.QueryString, + lenient = true, + allow_leading_wildcard = false + }, + } + }; + var matchByName = false; + var shouldClauses = new List(); + foreach (var taskName in taskSearchRequest.TaskNames ?? new string[0]) { - this.elasticClient = elasticClient; + matchByName = true; + shouldClauses.Add(new + { + term = new Dictionary + { + {"Meta.Name", taskName}, + } + }); } - - [NotNull] - public TaskSearchResponse Search([NotNull] TaskSearchRequest taskSearchRequest, int from, int size) + var matchByState = false; + foreach (var taskState in taskSearchRequest.TaskStates ?? new string[0]) { - var mustClause = new List + matchByState = true; + shouldClauses.Add(new { - new + term = new Dictionary { - query_string = new - { - query = taskSearchRequest.QueryString, - lenient = true, - allow_leading_wildcard = false - }, + {"Meta.State", taskState}, } - }; - var matchByName = false; - var shouldClauses = new List(); - foreach (var taskName in taskSearchRequest.TaskNames ?? new string[0]) - { - matchByName = true; - shouldClauses.Add(new - { - term = new Dictionary + }); + } + var query = shouldClauses.Any() + ? new { - {"Meta.Name", taskName}, + must = mustClause, + should = shouldClauses, + minimum_should_match = (matchByName ? 1 : 0) + (matchByState ? 1 : 0) } - }); - } - var matchByState = false; - foreach (var taskState in taskSearchRequest.TaskStates ?? new string[0]) + : (object)new + { + must = mustClause + }; + + var indexForTimeRange = SearchIndexNameFactory.GetIndexForTimeRange(taskSearchRequest.FromTicksUtc, taskSearchRequest.ToTicksUtc); + var request = new { - matchByState = true; - shouldClauses.Add(new + @from, + size, + version = true, + _source = false, + query = new { - term = new Dictionary + @bool = query + }, + sort = new[] + { + new Dictionary { - {"Meta.State", taskState}, + {"Meta.EnqueueTime", new {order = "desc", unmapped_type = "long"}} } - }); - } - var query = shouldClauses.Any() - ? new - { - must = mustClause, - should = shouldClauses, - minimum_should_match = (matchByName ? 1 : 0) + (matchByState ? 1 : 0) - } - : (object)new - { - must = mustClause - }; - - var indexForTimeRange = SearchIndexNameFactory.GetIndexForTimeRange(taskSearchRequest.FromTicksUtc, taskSearchRequest.ToTicksUtc); - var request = new - { - @from, - size, - version = true, - _source = false, - query = new - { - @bool = query - }, - sort = new[] - { - new Dictionary - { - {"Meta.EnqueueTime", new {order = "desc", unmapped_type = "long"}} - } - } - }; - var body = PostData.String(JsonConvert.SerializeObject(request)); - var stringResponse = elasticClient.Search(indexForTimeRange, body, ignoreUnavailableIndices).EnsureSuccess(); - var searchResponse = JsonConvert.DeserializeObject(stringResponse.Body); - return new TaskSearchResponse - { - Ids = searchResponse.Hits.Hits.Select(x => x.Id).ToArray(), - TotalCount = searchResponse.Hits.TotalCount - }; - } - - private readonly IRtqElasticsearchClient elasticClient; - - private readonly SearchRequestParameters ignoreUnavailableIndices = new SearchRequestParameters + } + }; + var body = PostData.String(JsonSerializer.Serialize(request)); + var stringResponse = elasticClient.Search(indexForTimeRange, body, ignoreUnavailableIndices).EnsureSuccess(); + var searchResponse = JsonSerializer.Deserialize(stringResponse.Body); + return new TaskSearchResponse { - IgnoreUnavailable = true, - TrackTotalHits = true, + Ids = searchResponse.Hits.Hits.Select(x => x.Id).ToArray(), + TotalCount = searchResponse.Hits.TotalCount }; } + + private readonly IRtqElasticsearchClient elasticClient; + + private readonly SearchRequestParameters ignoreUnavailableIndices = new() + { + IgnoreUnavailable = true, + TrackTotalHits = true, + }; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Storage/RtqElasticsearchSchema.cs b/Cassandra.DistributedTaskQueue.Monitoring/Storage/RtqElasticsearchSchema.cs index 903a91ce..e82a9dcd 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Storage/RtqElasticsearchSchema.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Storage/RtqElasticsearchSchema.cs @@ -1,196 +1,195 @@ #nullable enable + using System.Collections.Generic; +using System.Text.Json; using Elasticsearch.Net; using Elasticsearch.Net.Specification.IndicesApi; -using Newtonsoft.Json; - using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage; + +public class RtqElasticsearchSchema { - public class RtqElasticsearchSchema + public RtqElasticsearchSchema(IRtqElasticsearchClient elasticClient) { - public RtqElasticsearchSchema(IRtqElasticsearchClient elasticClient) - { - this.elasticClient = elasticClient; - } + this.elasticClient = elasticClient; + } - public void Actualize(bool local, bool bulkLoad) - { - var indexSettings = new {settings = GetIndexingProgressIndexSettings(local, bulkLoad)}; - var indexSettingsPostData = PostData.String(JsonConvert.SerializeObject(indexSettings)); - elasticClient.Indices.Create(RtqElasticsearchConsts.IndexingProgressIndexName, indexSettingsPostData, allowResourceAlreadyExistsStatus).EnsureSuccess(); + public void Actualize(bool local, bool bulkLoad) + { + var indexSettings = new {settings = GetIndexingProgressIndexSettings(local, bulkLoad)}; + var indexSettingsPostData = PostData.String(JsonSerializer.Serialize(indexSettings)); + elasticClient.Indices.Create(RtqElasticsearchConsts.IndexingProgressIndexName, indexSettingsPostData, allowResourceAlreadyExistsStatus).EnsureSuccess(); - var templateSettings = elasticClient.UseElastic7 ? GetTaskIndicesTemplateV2Settings(local, bulkLoad) : GetTaskIndicesTemplateSettings(local, bulkLoad); - var templateSettingsPostData = PostData.String(JsonConvert.SerializeObject(templateSettings)); + var templateSettings = elasticClient.UseElastic7 ? GetTaskIndicesTemplateV2Settings(local, bulkLoad) : GetTaskIndicesTemplateSettings(local, bulkLoad); + var templateSettingsPostData = PostData.String(JsonSerializer.Serialize(templateSettings)); - if (elasticClient.UseElastic7) - elasticClient.Indices.PutTemplateV2ForAll(RtqElasticsearchConsts.TemplateName, templateSettingsPostData).EnsureSuccess(); - else - elasticClient.Indices.PutTemplateForAll(RtqElasticsearchConsts.TemplateName, templateSettingsPostData).EnsureSuccess(); - } + if (elasticClient.UseElastic7) + elasticClient.Indices.PutTemplateV2ForAll(RtqElasticsearchConsts.TemplateName, templateSettingsPostData).EnsureSuccess(); + else + elasticClient.Indices.PutTemplateForAll(RtqElasticsearchConsts.TemplateName, templateSettingsPostData).EnsureSuccess(); + } - private static object GetIndexingProgressIndexSettings(bool local, bool bulkLoad) - { - return new - { - index = new - { - number_of_shards = 6, - number_of_replicas = local || bulkLoad ? 0 : 1, - refresh_interval = bulkLoad ? "-1" : (local ? "1s" : "10s"), - merge = new {scheduler = new {max_thread_count = 1}}, // see https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules-merge.html - query = new - { - default_field = "DataAsText", - } - } - }; - } - - private static object GetTaskIndicesTemplateV2Settings(bool local, bool bulkLoad) - { - return new - { - index_patterns = RtqElasticsearchConsts.AllIndicesWildcard, - template = new - { - settings = GetIndexingProgressIndexSettings(local, bulkLoad), - mappings = GetTaskIndexMappings() - }, - priority = 500, - }; - } - - private static object GetTaskIndicesTemplateSettings(bool local, bool bulkLoad) - { - return new - { - index_patterns = RtqElasticsearchConsts.AllIndicesWildcard, - settings = GetIndexingProgressIndexSettings(local, bulkLoad), - mappings = new Dictionary(1) {[RtqElasticsearchConsts.RtqIndexTypeName] = GetTaskIndexMappings()} - }; - } - - private static object GetTaskIndexMappings() - { - return new - { - date_detection = false, - dynamic_templates = new object[] - { - new - { - template_strings = new - { - path_match = "Data.*", - match_mapping_type = "string", - mapping = KeywordTypeWithCopy(), - } - }, - new - { - template_integer = new - { - path_match = "Data.*", - match_mapping_type = "long", - mapping = KeywordType() - } - }, - new - { - template_double = new - { - path_match = "Data.*", - match_mapping_type = "double", - mapping = KeywordType() - } - }, - new - { - template_boolean = new - { - path_match = "Data.*", - match_mapping_type = "boolean", - mapping = KeywordType() - } - } - }, - properties = new - { - Data = new - { - type = "object" - }, - DataAsText = new - { - type = "text", - store = false, - index = true - }, - ExceptionInfo = new - { - type = "text", - store = false, - index = true, - copy_to = "DataAsText", - }, - Meta = new - { - properties = new - { - Name = KeywordTypeWithCopy(), - Id = KeywordTypeWithCopy(), - State = KeywordTypeWithCopy(), - ParentTaskId = KeywordTypeWithCopy(), - TaskGroupLock = KeywordTypeWithCopy(), - Attempts = new {type = "integer", store = false}, - EnqueueTime = DateType(), - MinimalStartTime = DateType(), - StartExecutingTime = DateType(), - FinishExecutingTime = DateType(), - LastModificationTime = DateType(), - ExpirationTime = DateType(), - LastExecutionDurationInMs = DoubleType(), - } - } - } - }; - } - - private static object DateType() - { - return new {type = "date", store = false, format = "dateOptionalTime"}; - } + private static object GetIndexingProgressIndexSettings(bool local, bool bulkLoad) + { + return new + { + index = new + { + number_of_shards = 6, + number_of_replicas = local || bulkLoad ? 0 : 1, + refresh_interval = bulkLoad ? "-1" : (local ? "1s" : "10s"), + merge = new {scheduler = new {max_thread_count = 1}}, // see https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules-merge.html + query = new + { + default_field = "DataAsText", + } + } + }; + } - private static object KeywordType() - { - return new {type = "keyword", store = false, index = true}; - } + private static object GetTaskIndicesTemplateV2Settings(bool local, bool bulkLoad) + { + return new + { + index_patterns = RtqElasticsearchConsts.AllIndicesWildcard, + template = new + { + settings = GetIndexingProgressIndexSettings(local, bulkLoad), + mappings = GetTaskIndexMappings() + }, + priority = 500, + }; + } - private static object DoubleType() - { - return new {type = "double", store = false}; - } + private static object GetTaskIndicesTemplateSettings(bool local, bool bulkLoad) + { + return new + { + index_patterns = RtqElasticsearchConsts.AllIndicesWildcard, + settings = GetIndexingProgressIndexSettings(local, bulkLoad), + mappings = new Dictionary(1) {[RtqElasticsearchConsts.RtqIndexTypeName] = GetTaskIndexMappings()} + }; + } - private static object KeywordTypeWithCopy() - { - return new - { - type = "keyword", - store = false, - index = true, - copy_to = "DataAsText", - }; - } - - private readonly IRtqElasticsearchClient elasticClient; - - private readonly CreateIndexRequestParameters allowResourceAlreadyExistsStatus = new CreateIndexRequestParameters + private static object GetTaskIndexMappings() + { + return new { - RequestConfiguration = new RequestConfiguration {AllowedStatusCodes = new[] {400}} + date_detection = false, + dynamic_templates = new object[] + { + new + { + template_strings = new + { + path_match = "Data.*", + match_mapping_type = "string", + mapping = KeywordTypeWithCopy(), + } + }, + new + { + template_integer = new + { + path_match = "Data.*", + match_mapping_type = "long", + mapping = KeywordType() + } + }, + new + { + template_double = new + { + path_match = "Data.*", + match_mapping_type = "double", + mapping = KeywordType() + } + }, + new + { + template_boolean = new + { + path_match = "Data.*", + match_mapping_type = "boolean", + mapping = KeywordType() + } + } + }, + properties = new + { + Data = new + { + type = "object" + }, + DataAsText = new + { + type = "text", + store = false, + index = true + }, + ExceptionInfo = new + { + type = "text", + store = false, + index = true, + copy_to = "DataAsText", + }, + Meta = new + { + properties = new + { + Name = KeywordTypeWithCopy(), + Id = KeywordTypeWithCopy(), + State = KeywordTypeWithCopy(), + ParentTaskId = KeywordTypeWithCopy(), + TaskGroupLock = KeywordTypeWithCopy(), + Attempts = new {type = "integer", store = false}, + EnqueueTime = DateType(), + MinimalStartTime = DateType(), + StartExecutingTime = DateType(), + FinishExecutingTime = DateType(), + LastModificationTime = DateType(), + ExpirationTime = DateType(), + LastExecutionDurationInMs = DoubleType(), + } + } + } }; } + + private static object DateType() + { + return new {type = "date", store = false, format = "dateOptionalTime"}; + } + + private static object KeywordType() + { + return new {type = "keyword", store = false, index = true}; + } + + private static object DoubleType() + { + return new {type = "double", store = false}; + } + + private static object KeywordTypeWithCopy() + { + return new + { + type = "keyword", + store = false, + index = true, + copy_to = "DataAsText", + }; + } + + private readonly IRtqElasticsearchClient elasticClient; + + private readonly CreateIndexRequestParameters allowResourceAlreadyExistsStatus = new() + { + RequestConfiguration = new RequestConfiguration {AllowedStatusCodes = new[] {400}} + }; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Utils/UtcTicksJsonConverter.cs b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Utils/UtcTicksJsonConverter.cs index 2aa73162..2c8d1504 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Utils/UtcTicksJsonConverter.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Utils/UtcTicksJsonConverter.cs @@ -1,59 +1,68 @@ -using System; -using System.Globalization; +#nullable enable -using Newtonsoft.Json; -using Newtonsoft.Json.Converters; +using System; +using System.Text.Json; +using System.Text.Json.Serialization; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Utils +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Utils; + +public class UtcTicksJsonConverter : JsonConverter { - public class UtcTicksJsonConverter : IsoDateTimeConverter + public override bool CanConvert(Type objectType) { - public UtcTicksJsonConverter() + if (objectType == typeof(DateTime) || objectType == typeof(DateTime?)) { - Culture = CultureInfo.InvariantCulture; + return true; } - - public override bool CanConvert(Type objectType) +#if HAVE_DATE_TIME_OFFSET + if (objectType == typeof(DateTimeOffset) || objectType == typeof(DateTimeOffset?)) { - return objectType == typeof(long) || base.CanConvert(objectType); + return true; } +#endif + + return objectType == typeof(long) || objectType == typeof(long?); + } - public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + public override bool HandleNull => true; + + public override object? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var underlyingType = Nullable.GetUnderlyingType(typeToConvert); + var type = underlyingType ?? typeToConvert; + + if (type == typeof(long)) { - var underlyingType = Nullable.GetUnderlyingType(objectType); - var type = underlyingType ?? objectType; - if (type == typeof(long)) + var dateTime = JsonSerializer.Deserialize(ref reader); + if (dateTime == null) { - var res = base.ReadJson(reader, typeof(DateTime?), existingValue, serializer); - if (res == null) - { - if (underlyingType != null) - return null; - return 0L; - } - return ((DateTime)res).Ticks; + if (underlyingType != null) + return null; + return 0L; } - return base.ReadJson(reader, objectType, existingValue, serializer); + return dateTime.Value.Ticks; } - public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) - { - if (value is long) - base.WriteJson(writer, CreateDateTime((long)value), serializer); - else - base.WriteJson(writer, value, serializer); - } + return JsonSerializer.Deserialize(ref reader); + } - private static DateTime CreateDateTime(long value) - { - if (value < minTicks) - value = minTicks; - if (value > maxTicks) - value = maxTicks; - return new DateTime(value, DateTimeKind.Utc); - } + public override void Write(Utf8JsonWriter writer, object? value, JsonSerializerOptions options) + { + if (value is long ticks) + JsonSerializer.Serialize(writer, CreateDateTime(ticks), options); + else + JsonSerializer.Serialize(writer, value, options); + } - private static readonly long minTicks = DateTime.MinValue.Ticks; - private static readonly long maxTicks = DateTime.MaxValue.Ticks; + private static DateTime CreateDateTime(long value) + { + if (value < minTicks) + value = minTicks; + if (value > maxTicks) + value = maxTicks; + return new DateTime(value, DateTimeKind.Utc); } + + private static readonly long minTicks = DateTime.MinValue.Ticks; + private static readonly long maxTicks = DateTime.MaxValue.Ticks; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/MetaIndexedInfo.cs b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/MetaIndexedInfo.cs index a20b62f4..13198ac6 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/MetaIndexedInfo.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/MetaIndexedInfo.cs @@ -1,36 +1,35 @@ -using Newtonsoft.Json; +using System.Text.Json.Serialization; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Utils; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing; + +public class MetaIndexedInfo { - public class MetaIndexedInfo - { - public string Name { get; set; } - public string Id { get; set; } - public string State { get; set; } - public string ParentTaskId { get; set; } - public string TaskGroupLock { get; set; } - public int Attempts { get; set; } + public string Name { get; set; } + public string Id { get; set; } + public string State { get; set; } + public string ParentTaskId { get; set; } + public string TaskGroupLock { get; set; } + public int Attempts { get; set; } - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long EnqueueTime { get; set; } + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long EnqueueTime { get; set; } - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long MinimalStartTime { get; set; } + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long MinimalStartTime { get; set; } - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long? StartExecutingTime { get; set; } + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long? StartExecutingTime { get; set; } - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long? FinishExecutingTime { get; set; } + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long? FinishExecutingTime { get; set; } - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long LastModificationTime { get; set; } + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long LastModificationTime { get; set; } - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long ExpirationTime { get; set; } + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long ExpirationTime { get; set; } - public double? LastExecutionDurationInMs { get; set; } - } + public double? LastExecutionDurationInMs { get; set; } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/OmitBinaryAndAbstractPropertiesContractResolver.cs b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/OmitBinaryAndAbstractPropertiesContractResolver.cs deleted file mode 100644 index 51a06b7f..00000000 --- a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/OmitBinaryAndAbstractPropertiesContractResolver.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System; -using System.Reflection; - -using Newtonsoft.Json; -using Newtonsoft.Json.Serialization; - -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing -{ - public class OmitBinaryAndAbstractPropertiesContractResolver : DefaultContractResolver - { - protected override JsonProperty CreateProperty(MemberInfo member, MemberSerialization memberSerialization) - { - var propertyInfo = member as PropertyInfo; - if (propertyInfo != null) - { - if (propertyInfo.PropertyType.IsArray) - { - var elementType = propertyInfo.PropertyType.GetElementType(); - if (IsBinaryType(elementType) || IsAbstractType(elementType)) - return null; - } - if (IsBinaryType(propertyInfo.PropertyType) || IsAbstractType(propertyInfo.PropertyType)) - return null; - } - return base.CreateProperty(member, memberSerialization); - } - - private static bool IsBinaryType(Type elementType) - { - return elementType == typeof(byte[]); - } - - private static bool IsAbstractType(Type elementType) - { - return elementType.IsAbstract || elementType == typeof(object); - } - } -} \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/RtqElasticsearchOffsetStorage.cs b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/RtqElasticsearchOffsetStorage.cs index f591cd4e..c9535378 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/RtqElasticsearchOffsetStorage.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/RtqElasticsearchOffsetStorage.cs @@ -1,90 +1,85 @@ -using System; +#nullable enable -using Elasticsearch.Net; - -using JetBrains.Annotations; +using System; +using System.Text.Json; -using Newtonsoft.Json; +using Elasticsearch.Net; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Get; using SkbKontur.Cassandra.TimeBasedUuid; using SkbKontur.EventFeeds; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing; + +public class RtqElasticsearchOffsetStorage : IOffsetStorage { - public class RtqElasticsearchOffsetStorage : IOffsetStorage + public RtqElasticsearchOffsetStorage(IRtqElasticsearchClient elasticsearchClient, + RtqEventLogOffsetInterpreter offsetInterpreter, + string bladeKey, + TimeSpan initialIndexingOffsetFromNow) { - public RtqElasticsearchOffsetStorage(IRtqElasticsearchClient elasticsearchClient, - RtqEventLogOffsetInterpreter offsetInterpreter, - [NotNull] string bladeKey, - TimeSpan initialIndexingOffsetFromNow) - { - this.elasticsearchClient = elasticsearchClient; - this.offsetInterpreter = offsetInterpreter; - this.bladeKey = bladeKey; - this.initialIndexingOffsetFromNow = initialIndexingOffsetFromNow; - } - - [NotNull] - public string GetDescription() - { - return $"RtqElasticsearchOffsetStorage with IndexName: {elasticIndexName}, BladeKey: {bladeKey}"; - } + this.elasticsearchClient = elasticsearchClient; + this.offsetInterpreter = offsetInterpreter; + this.bladeKey = bladeKey; + this.initialIndexingOffsetFromNow = initialIndexingOffsetFromNow; + } - public void Write([CanBeNull] string newOffset) - { - var payload = new OffsetStorageElement {Offset = newOffset}; - var postData = PostData.String(JsonConvert.SerializeObject(payload)); + public string GetDescription() + { + return $"RtqElasticsearchOffsetStorage with IndexName: {elasticIndexName}, BladeKey: {bladeKey}"; + } + + public void Write(string? newOffset) + { + var payload = new OffsetStorageElement {Offset = newOffset}; + var postData = PostData.String(JsonSerializer.Serialize(payload)); - if (elasticsearchClient.UseElastic7) - elasticsearchClient.Index(elasticIndexName, bladeKey, postData).EnsureSuccess(); - else + if (elasticsearchClient.UseElastic7) + elasticsearchClient.Index(elasticIndexName, bladeKey, postData).EnsureSuccess(); + else #pragma warning disable CS0618 - elasticsearchClient.IndexUsingType(elasticIndexName, elasticTypeName, bladeKey, postData).EnsureSuccess(); + elasticsearchClient.IndexUsingType(elasticIndexName, elasticTypeName, bladeKey, postData).EnsureSuccess(); #pragma warning restore CS0618 - } + } - [CanBeNull] - public string Read() - { - var stringResponse = elasticsearchClient.UseElastic7 - ? elasticsearchClient.Get(elasticIndexName, bladeKey, allowNotFoundStatusCode).EnsureSuccess() + public string? Read() + { + var stringResponse = elasticsearchClient.UseElastic7 + ? elasticsearchClient.Get(elasticIndexName, bladeKey, allowNotFoundStatusCode).EnsureSuccess() #pragma warning disable CS0618 - : elasticsearchClient.GetUsingType(elasticIndexName, elasticTypeName, bladeKey, allowNotFoundStatusCode).EnsureSuccess(); + : elasticsearchClient.GetUsingType(elasticIndexName, elasticTypeName, bladeKey, allowNotFoundStatusCode).EnsureSuccess(); #pragma warning restore CS0618 - if (string.IsNullOrEmpty(stringResponse.Body)) - return GetDefaultOffset(); - - var elasticResponse = JsonConvert.DeserializeObject>(stringResponse.Body); - if (elasticResponse?.Source == null || !elasticResponse.Found) - return GetDefaultOffset(); + if (string.IsNullOrEmpty(stringResponse.Body)) + return GetDefaultOffset(); - return elasticResponse.Source.Offset; - } + var elasticResponse = JsonSerializer.Deserialize>(stringResponse.Body); + if (elasticResponse?.Source == null || !elasticResponse.Found) + return GetDefaultOffset(); - [CanBeNull] - private string GetDefaultOffset() - { - return offsetInterpreter.GetMaxOffsetForTimestamp(Timestamp.Now - initialIndexingOffsetFromNow); - } + return elasticResponse.Source.Offset; + } - private const string elasticIndexName = RtqElasticsearchConsts.IndexingProgressIndexName; - private const string elasticTypeName = "MultiRazorEventFeedOffset"; + private string GetDefaultOffset() + { + return offsetInterpreter.GetMaxOffsetForTimestamp(Timestamp.Now - initialIndexingOffsetFromNow); + } - private readonly IRtqElasticsearchClient elasticsearchClient; - private readonly RtqEventLogOffsetInterpreter offsetInterpreter; - private readonly string bladeKey; - private readonly TimeSpan initialIndexingOffsetFromNow; + private const string elasticIndexName = RtqElasticsearchConsts.IndexingProgressIndexName; + private const string elasticTypeName = "MultiRazorEventFeedOffset"; - private readonly GetRequestParameters allowNotFoundStatusCode = new GetRequestParameters - { - RequestConfiguration = new RequestConfiguration {AllowedStatusCodes = new[] {404}} - }; + private readonly IRtqElasticsearchClient elasticsearchClient; + private readonly RtqEventLogOffsetInterpreter offsetInterpreter; + private readonly string bladeKey; + private readonly TimeSpan initialIndexingOffsetFromNow; - private class OffsetStorageElement + private readonly GetRequestParameters allowNotFoundStatusCode = new() { - public string Offset { get; set; } - } + RequestConfiguration = new RequestConfiguration {AllowedStatusCodes = new[] {404}} + }; + + private class OffsetStorageElement + { + public string? Offset { get; set; } } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/TaskIndexedInfo.cs b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/TaskIndexedInfo.cs index a03623de..7fc1ac52 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/TaskIndexedInfo.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/TaskIndexedInfo.cs @@ -1,32 +1,30 @@ -using System.Collections.Generic; +#nullable enable -using JetBrains.Annotations; +using System.Collections.Generic; +using System.Text.Json.Serialization; -using Newtonsoft.Json; +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing +public class TaskIndexedInfo { - public class TaskIndexedInfo + public TaskIndexedInfo() { - public TaskIndexedInfo() - { - } + } - public TaskIndexedInfo([NotNull] MetaIndexedInfo meta, [NotNull] string exceptionInfo, [CanBeNull] object data) - { - Meta = meta; - ExceptionInfo = exceptionInfo; - if (data != null) - Data = new Dictionary {{meta.Name, data}}; - } + public TaskIndexedInfo(MetaIndexedInfo meta, string exceptionInfo, object? data) + { + Meta = meta; + ExceptionInfo = exceptionInfo; + if (data != null) + Data = new Dictionary {{meta.Name, data}}; + } - public MetaIndexedInfo Meta { get; set; } + public MetaIndexedInfo Meta { get; set; } = null!; - [JsonConverter(typeof(TruncateLongStringsConverter2K))] - public string ExceptionInfo { get; set; } + [JsonConverter(typeof(TruncateLongStringsConverter2K))] + public string ExceptionInfo { get; set; } = null!; - // NOTE! Using TaskTypeName->TaskData dictionary here to avoid type conflicts between fields with the same name in different TaskData contracts - // since we must index all TaskData types into single elasticsearch mapping type (see https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html) - public Dictionary Data { get; set; } - } + // NOTE! Using TaskTypeName->TaskData dictionary here to avoid type conflicts between fields with the same name in different TaskData contracts + // since we must index all TaskData types into single elasticsearch mapping type (see https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html) + public Dictionary Data { get; set; } = null!; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/TruncateLongStringsConverter.cs b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/TruncateLongStringsConverter.cs index a05e361e..385bbfeb 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/TruncateLongStringsConverter.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/Storage/Writing/TruncateLongStringsConverter.cs @@ -1,37 +1,29 @@ using System; +using System.Text.Json; +using System.Text.Json.Serialization; -using Newtonsoft.Json; +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing +public class TruncateLongStringsConverter : JsonConverter { - public class TruncateLongStringsConverter : JsonConverter + public TruncateLongStringsConverter(int maxStringLength) { - public TruncateLongStringsConverter(int maxStringLength) - { - this.maxStringLength = maxStringLength; - } - - public override bool CanRead => false; - - public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) - { - var str = value as string; - if (str == null || str.Length <= maxStringLength) - writer.WriteValue(str); - else - writer.WriteValue(str.Substring(0, maxStringLength)); - } - - public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) - { - throw new InvalidOperationException("Operation is not supported"); - } + this.maxStringLength = maxStringLength; + } - public override bool CanConvert(Type objectType) - { - return objectType == typeof(string); - } + public override string Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + throw new NotImplementedException(); + } - private readonly int maxStringLength; + public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options) + { + var str = value; + if (str == null || str.Length <= maxStringLength) + writer.WriteStringValue(str); + else + writer.WriteStringValue(str.Substring(0, maxStringLength)); } + + private readonly int maxStringLength; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Monitoring/TaskCounter/RtqTaskCounters.cs b/Cassandra.DistributedTaskQueue.Monitoring/TaskCounter/RtqTaskCounters.cs index 4eb62fa3..8ebcab1c 100644 --- a/Cassandra.DistributedTaskQueue.Monitoring/TaskCounter/RtqTaskCounters.cs +++ b/Cassandra.DistributedTaskQueue.Monitoring/TaskCounter/RtqTaskCounters.cs @@ -1,23 +1,25 @@ -using System.Collections.Generic; +#nullable enable + +using System.Collections.Generic; using System.Linq; +using System.Text.Json.Serialization; using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Entities; +using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json; -#nullable enable +namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.TaskCounter; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.TaskCounter +public class RtqTaskCounters { - public class RtqTaskCounters - { - public int LostTasksCount { get; set; } + public int LostTasksCount { get; set; } - public Dictionary PendingTaskCountsTotal { get; set; } = null!; + public Dictionary PendingTaskCountsTotal { get; set; } = null!; - public Dictionary<(string TaskName, string TaskTopic), Dictionary> PendingTaskCountsByNameAndTopic { get; set; } = null!; + [JsonConverter(typeof(TwoKeysDictionaryConvertor>))] + public Dictionary<(string TaskName, string TaskTopic), Dictionary> PendingTaskCountsByNameAndTopic { get; set; } = null!; - public int GetPendingTaskTotalCount() - { - return PendingTaskCountsTotal.Values.Sum(); - } + public int GetPendingTaskTotalCount() + { + return PendingTaskCountsTotal.Values.Sum(); } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.TestExchangeService/Cassandra.DistributedTaskQueue.TestExchangeService.csproj b/Cassandra.DistributedTaskQueue.TestExchangeService/Cassandra.DistributedTaskQueue.TestExchangeService.csproj index eed873c1..3b1b01d7 100644 --- a/Cassandra.DistributedTaskQueue.TestExchangeService/Cassandra.DistributedTaskQueue.TestExchangeService.csproj +++ b/Cassandra.DistributedTaskQueue.TestExchangeService/Cassandra.DistributedTaskQueue.TestExchangeService.csproj @@ -8,10 +8,6 @@ bin\ - - - - diff --git a/Cassandra.DistributedTaskQueue.TestExchangeService/Startup.cs b/Cassandra.DistributedTaskQueue.TestExchangeService/Startup.cs index 1c8f4db5..d0ba8cc3 100644 --- a/Cassandra.DistributedTaskQueue.TestExchangeService/Startup.cs +++ b/Cassandra.DistributedTaskQueue.TestExchangeService/Startup.cs @@ -4,23 +4,22 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -namespace SkbKontur.Cassandra.DistributedTaskQueue.TestExchangeService +namespace SkbKontur.Cassandra.DistributedTaskQueue.TestExchangeService; + +public class Startup { - public class Startup + public void ConfigureServices(IServiceCollection services) { - public void ConfigureServices(IServiceCollection services) - { - services.AddControllers().AddNewtonsoftJson(); - services.AddSingleton(new GroboControllerFactory()); - } + services.AddControllers(); + services.AddSingleton(new GroboControllerFactory()); + } - public void Configure(IApplicationBuilder app, IWebHostEnvironment env) - { - if (env.IsDevelopment()) - app.UseDeveloperExceptionPage(); + public void Configure(IApplicationBuilder app, IWebHostEnvironment env) + { + if (env.IsDevelopment()) + app.UseDeveloperExceptionPage(); - app.UseRouting(); - app.UseEndpoints(endpoints => endpoints.MapControllers()); - } + app.UseRouting(); + app.UseEndpoints(endpoints => endpoints.MapControllers()); } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Tests/HttpClientForTestsBase.cs b/Cassandra.DistributedTaskQueue.Tests/HttpClientForTestsBase.cs index 881c0c76..f52d91d3 100644 --- a/Cassandra.DistributedTaskQueue.Tests/HttpClientForTestsBase.cs +++ b/Cassandra.DistributedTaskQueue.Tests/HttpClientForTestsBase.cs @@ -1,9 +1,6 @@ using System; using System.Text; - -using JetBrains.Annotations; - -using Newtonsoft.Json; +using System.Text.Json; using SkbKontur.Cassandra.DistributedTaskQueue.FunctionalTests.Common; @@ -14,77 +11,77 @@ using Vostok.Clusterclient.Transport; using Vostok.Logging.Abstractions; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Tests +namespace SkbKontur.Cassandra.DistributedTaskQueue.Tests; + +public abstract class HttpClientForTestsBase { - public abstract class HttpClientForTestsBase + protected HttpClientForTestsBase(int devPort, TimeSpan defaultRequestTimeout) { - protected HttpClientForTestsBase(int devPort, TimeSpan defaultRequestTimeout) - { - var clusterClientLogger = Log.For("HttpClientForTests").WithMinimumLevel(LogLevel.Warn); - clusterClient = new ClusterClient(clusterClientLogger, - configuration => - { - configuration.SetupUniversalTransport(new UniversalTransportSettings - { - AllowAutoRedirect = false, - TcpKeepAliveEnabled = true, - MaxConnectionsPerEndpoint = 4096, - }); - configuration.DefaultConnectionTimeout = TimeSpan.FromMilliseconds(750); - configuration.DefaultTimeout = defaultRequestTimeout; + var clusterClientLogger = Log.For("HttpClientForTests").WithMinimumLevel(LogLevel.Warn); + clusterClient = new ClusterClient(clusterClientLogger, + configuration => + { + configuration.SetupUniversalTransport(new UniversalTransportSettings + { + AllowAutoRedirect = false, + TcpKeepAliveEnabled = true, + MaxConnectionsPerEndpoint = 4096, + }); + configuration.DefaultConnectionTimeout = TimeSpan.FromMilliseconds(750); + configuration.DefaultTimeout = defaultRequestTimeout; - configuration.ClusterProvider = new FixedClusterProvider(new Uri($"http://localhost:{devPort}")); - configuration.DefaultRequestStrategy = Strategy.Sequential1; + configuration.ClusterProvider = new FixedClusterProvider(new Uri($"http://localhost:{devPort}")); + configuration.DefaultRequestStrategy = Strategy.Sequential1; - configuration.AddRequestTransform(request => request.Content == null || request.Content.Length == 0 - ? request.WithHeader("Content", "no") - : request); - }); - } + configuration.AddRequestTransform(request => request.Content == null || request.Content.Length == 0 + ? request.WithHeader("Content", "no") + : request); + }); + } - [NotNull] - protected RequestResult Post([NotNull] string methodName) - { - return Post(methodName, requestContent : null); - } + protected RequestResult Post(string methodName) + { + return Post(methodName, requestContent : null); + } - [NotNull] - protected RequestResult Post([NotNull] string methodName, [CanBeNull] T1 arg1) - { - return Post(methodName, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(arg1))); - } + protected RequestResult Post(string methodName, T1 arg1) + { + return Post(methodName, Encoding.UTF8.GetBytes(JsonSerializer.Serialize(arg1))); + } - [NotNull] - private RequestResult Post([NotNull] string methodName, [CanBeNull] byte[] requestContent) - { - var request = Request.Post(methodName); - if (requestContent != null) - request = request.WithContent(requestContent).WithHeader("Content-Type", "application/json"); + private RequestResult Post(string methodName, byte[] requestContent) + { + var request = Request.Post(methodName); + if (requestContent != null) + request = request.WithContent(requestContent).WithHeader("Content-Type", "application/json"); - using (var result = clusterClient.SendAsync(request).GetAwaiter().GetResult()) - { - if (result.Status != ClusterResultStatus.Success || result.Response.Code != ResponseCode.Ok) - throw new InvalidOperationException($"Request failed: Request: {result.Request}\nStatus: {result.Status}\nSelected response: {result.Response}"); - return new RequestResult(result.Response.Content.ToArray()); - } + using (var result = clusterClient.SendAsync(request).GetAwaiter().GetResult()) + { + if (result.Status != ClusterResultStatus.Success || result.Response.Code != ResponseCode.Ok) + throw new InvalidOperationException($"Request failed: Request: {result.Request}\nStatus: {result.Status}\nSelected response: {result.Response}"); + return new RequestResult(result.Response.Content.ToArray()); } - - private readonly IClusterClient clusterClient; } - public class RequestResult + private readonly IClusterClient clusterClient; +} + +public class RequestResult +{ + public RequestResult(byte[] serializedResult) { - public RequestResult([NotNull] byte[] serializedResult) - { - this.serializedResult = serializedResult; - } + this.serializedResult = serializedResult; + } - [NotNull] - public T ThenReturn() - { - return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(serializedResult)); - } + public T ThenReturn() + { + var options = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; - private readonly byte[] serializedResult; + return JsonSerializer.Deserialize(Encoding.UTF8.GetString(serializedResult), options); } + + private readonly byte[] serializedResult; } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Tests/RtqTaskCountersForTests.cs b/Cassandra.DistributedTaskQueue.Tests/RtqTaskCountersForTests.cs index b8155a9b..2de8c549 100644 --- a/Cassandra.DistributedTaskQueue.Tests/RtqTaskCountersForTests.cs +++ b/Cassandra.DistributedTaskQueue.Tests/RtqTaskCountersForTests.cs @@ -1,21 +1,20 @@ +#nullable enable + using System.Collections.Generic; using System.Linq; using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Entities; -#nullable enable +namespace SkbKontur.Cassandra.DistributedTaskQueue.Tests; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Tests +public class RtqTaskCountersForTests { - public class RtqTaskCountersForTests - { - public int LostTasksCount { get; set; } + public int LostTasksCount { get; set; } - public Dictionary PendingTaskCountsTotal { get; set; } = null!; + public Dictionary PendingTaskCountsTotal { get; set; } = null!; - public int GetPendingTaskTotalCount() - { - return PendingTaskCountsTotal.Values.Sum(); - } + public int GetPendingTaskTotalCount() + { + return PendingTaskCountsTotal.Values.Sum(); } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Tests/UnitTests/Monitoring/TaskIndexInfoSerializationTest.cs b/Cassandra.DistributedTaskQueue.Tests/UnitTests/Monitoring/TaskIndexInfoSerializationTest.cs index ba03deff..fa55e950 100644 --- a/Cassandra.DistributedTaskQueue.Tests/UnitTests/Monitoring/TaskIndexInfoSerializationTest.cs +++ b/Cassandra.DistributedTaskQueue.Tests/UnitTests/Monitoring/TaskIndexInfoSerializationTest.cs @@ -1,100 +1,92 @@ using System.Collections.Generic; -using System.IO; -using System.Text; - -using Newtonsoft.Json; +using System.Text.Json; using NUnit.Framework; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Tests.UnitTests.Monitoring +namespace SkbKontur.Cassandra.DistributedTaskQueue.Tests.UnitTests.Monitoring; + +public class TaskIndexInfoSerializationTest { - public class TaskIndexInfoSerializationTest + [Test] + public void Simple() { - [Test] - public void Simple() - { - var source = new TaskIndexedInfo(new MetaIndexedInfo {Name = "zzz"}, "exc", new Data - { - A = 1, - S = "222" - }); - Check(source, source); - } + var source = new TaskIndexedInfo(new MetaIndexedInfo {Name = "zzz"}, "exc", new Data + { + A = 1, + S = "222" + }); + Check(source, source); + } - [Test] - public void TruncateStrings() - { - var source = new TaskIndexedInfo(new MetaIndexedInfo {Name = new string('q', 502)}, new string('e', 2049), new Data - { - A = 1, - S = new string('z', 501) - }); - var expected = new TaskIndexedInfo - { - Meta = new MetaIndexedInfo {Name = new string('q', 500)}, - ExceptionInfo = new string('e', 2048), - Data = new Dictionary + [Test] + public void TruncateStrings() + { + var source = new TaskIndexedInfo(new MetaIndexedInfo {Name = new string('q', 502)}, new string('e', 2049), new Data + { + A = 1, + S = new string('z', 501) + }); + var expected = new TaskIndexedInfo + { + Meta = new MetaIndexedInfo {Name = new string('q', 500)}, + ExceptionInfo = new string('e', 2048), + Data = new Dictionary + { { - { - new string('q', 502), new Data - { - A = 1, - S = new string('z', 500) - } - } + new string('q', 502), new Data + { + A = 1, + S = new string('z', 500) + } } - }; - Check(source, expected); - } - - [Test] - public void IgnoreBinaryFields() - { - var source = new TaskIndexedInfo(new MetaIndexedInfo {Name = "zzz"}, string.Empty, new Data2 - { - B = 1, - BA = new byte[] {1, 2, 3, 4}, - O = "qxx", - OO = new object[] {1, "zzz"} - }); - Check(source, new TaskIndexedInfo(new MetaIndexedInfo {Name = "zzz"}, string.Empty, new Data2 - { - B = 1, - BA = null, - O = "qxx", - OO = new object[] {1, "zzz"} - })); - } + } + }; + Check(source, expected); + } - private void Check(TaskIndexedInfo source, TaskIndexedInfo expected) - { - Assert.That(Serialize(source), Is.EqualTo(Serialize(expected))); - } + [Test] + public void IgnoreBinaryFields() + { + var source = new TaskIndexedInfo(new MetaIndexedInfo {Name = "zzz"}, string.Empty, new Data2 + { + B = 1, + BA = new byte[] {1, 2, 3, 4}, + O = "qxx", + OO = new object[] {1, "zzz"} + }); + Check(source, new TaskIndexedInfo(new MetaIndexedInfo {Name = "zzz"}, string.Empty, new Data2 + { + B = 1, + BA = null, + O = "qxx", + OO = new object[] {1, "zzz"} + })); + } - private string Serialize(object obj) - { - var sb = new StringBuilder(); - jsonSerializer.Serialize(new StringWriter(sb), obj); - return sb.ToString(); - } + private void Check(TaskIndexedInfo source, TaskIndexedInfo expected) + { + Assert.That(Serialize(source), Is.EqualTo(Serialize(expected))); + } - private readonly JsonSerializer jsonSerializer = JsonSerializer.Create(RtqElasticsearchIndexerSettings.DefaultJsonSerializerSettings); + private string Serialize(object obj) + { + return JsonSerializer.Serialize(obj, obj.GetType(), RtqElasticsearchIndexerSettings.GetJsonOptions()); + } - private class Data - { - public int A { get; set; } - public string S { get; set; } - } + private class Data + { + public int A { get; set; } + public string S { get; set; } + } - private class Data2 - { - public byte B { get; set; } - public byte[] BA { get; set; } - public object O { get; set; } - public object[] OO { get; set; } - } + private class Data2 + { + public byte B { get; set; } + public byte[] BA { get; set; } + public object O { get; set; } + public object[] OO { get; set; } } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.Tests/UnitTests/Monitoring/UtcTicksJsonConverterTest.cs b/Cassandra.DistributedTaskQueue.Tests/UnitTests/Monitoring/UtcTicksJsonConverterTest.cs index 62f4137a..cf051fcf 100644 --- a/Cassandra.DistributedTaskQueue.Tests/UnitTests/Monitoring/UtcTicksJsonConverterTest.cs +++ b/Cassandra.DistributedTaskQueue.Tests/UnitTests/Monitoring/UtcTicksJsonConverterTest.cs @@ -1,129 +1,128 @@ using System; +using System.Text.Json; using FluentAssertions; -using Newtonsoft.Json; -using Newtonsoft.Json.Converters; +using System.Text.Json.Serialization; using NUnit.Framework; using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Utils; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Tests.UnitTests.Monitoring +namespace SkbKontur.Cassandra.DistributedTaskQueue.Tests.UnitTests.Monitoring; + +public class UtcTicksDateConverterTest { - public class UtcTicksDateConverterTest + [Test] + public void TestSimple() + { + var d = new DateTime(2010, 1, 2, 3, 4, 5, 6, DateTimeKind.Utc); + var d2 = new DateTime(2013, 1, 2, 3, 4, 5, 6, DateTimeKind.Utc); + var c1 = new C1 {UtcTicks = d.Ticks, UtcTicksNullable = d2.Ticks}; + var s = JsonSerializer.Serialize(c1); + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(c1); + var c2 = new C2 + { + UtcTicks = d, + UtcTicksNullable = d2 + }; + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(c2); + JsonSerializer.Deserialize(JsonSerializer.Serialize(c2)).Should().BeEquivalentTo(c1); + } + + [Test] + public void TestFormat() + { + var d = new DateTime(635633850005256502, DateTimeKind.Utc); + var d2 = new DateTime(2013, 1, 2, 3, 4, 5, 6, DateTimeKind.Utc); + var c1 = new C1 {UtcTicks = d.Ticks, UtcTicksNullable = d2.Ticks}; + var s = JsonSerializer.Serialize(c1); + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(c1); + Assert.AreEqual("{\"UtcTicks\":\"2015-03-31T07:50:00.5256502Z\",\"UtcTicksNullable\":\"2013-01-02T03:04:05.006Z\"}", s); + } + + [Test] + public void TestNull() + { + var c1 = new C1 {UtcTicks = 0, UtcTicksNullable = null}; + var s = JsonSerializer.Serialize(c1); + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(c1); + var c2 = new C2 + { + UtcTicks = new DateTime(0, DateTimeKind.Utc), + UtcTicksNullable = null + }; + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(c2); + JsonSerializer.Deserialize(JsonSerializer.Serialize(c2)).Should().BeEquivalentTo(c1); + } + + [Test] + public void TestNullDt() + { + var c1 = new C1 {UtcTicks = 0, UtcTicksNullable = null}; + var s = JsonSerializer.Serialize(c1); + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(c1); + var c3 = new C3 + { + UtcTicks = 0, + UtcTicksNullable = 0 + }; + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(c3); + JsonSerializer.Deserialize(JsonSerializer.Serialize(c3)).Should().BeEquivalentTo(new C1 {UtcTicks = 0, UtcTicksNullable = 0}); + } + + [Test] + public void TestZero() + { + var c1 = new C1 {UtcTicksNullable = 0}; + var s = JsonSerializer.Serialize(c1); + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(c1); + var c2 = new C2 + { + UtcTicks = new DateTime(0, DateTimeKind.Utc), + UtcTicksNullable = new DateTime(0, DateTimeKind.Utc), + }; + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(c2); + JsonSerializer.Deserialize(JsonSerializer.Serialize(c2)).Should().BeEquivalentTo(c1); + } + + [Test] + public void TestBadValues() + { + var c1 = new C1 {UtcTicks = -1, UtcTicksNullable = DateTime.MaxValue.Ticks + 1}; + var s = JsonSerializer.Serialize(c1); + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(new C1 {UtcTicks = DateTime.MinValue.Ticks, UtcTicksNullable = DateTime.MaxValue.Ticks}); + JsonSerializer.Deserialize(s).Should().BeEquivalentTo(new C2 + { + UtcTicks = DateTime.MinValue, + UtcTicksNullable = DateTime.MaxValue + }); + } + + private class C1 + { + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long UtcTicks { get; set; } + + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long? UtcTicksNullable { get; set; } + } + + private class C2 { - [Test] - public void TestSimple() - { - var d = new DateTime(2010, 1, 2, 3, 4, 5, 6, DateTimeKind.Utc); - var d2 = new DateTime(2013, 1, 2, 3, 4, 5, 6, DateTimeKind.Utc); - var c1 = new C1 {UtcTicks = d.Ticks, UtcTicksNullable = d2.Ticks}; - var s = JsonConvert.SerializeObject(c1); - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(c1); - var c2 = new C2() - { - UtcTicks = d, - UtcTicksNullable = d2 - }; - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(c2); - JsonConvert.DeserializeObject(JsonConvert.SerializeObject(c2)).Should().BeEquivalentTo(c1); - } - - [Test] - public void TestFormat() - { - var d = new DateTime(635633850005256502, DateTimeKind.Utc); - var d2 = new DateTime(2013, 1, 2, 3, 4, 5, 6, DateTimeKind.Utc); - var c1 = new C1 {UtcTicks = d.Ticks, UtcTicksNullable = d2.Ticks}; - var s = JsonConvert.SerializeObject(c1); - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(c1); - Assert.AreEqual("{\"UtcTicks\":\"2015-03-31T07:50:00.5256502Z\",\"UtcTicksNullable\":\"2013-01-02T03:04:05.006Z\"}", s); - } - - [Test] - public void TestNull() - { - var c1 = new C1 {UtcTicks = 0, UtcTicksNullable = null}; - var s = JsonConvert.SerializeObject(c1); - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(c1); - var c2 = new C2() - { - UtcTicks = new DateTime(0, DateTimeKind.Utc), - UtcTicksNullable = null - }; - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(c2); - JsonConvert.DeserializeObject(JsonConvert.SerializeObject(c2)).Should().BeEquivalentTo(c1); - } - - [Test] - public void TestNullDt() - { - var c1 = new C1 {UtcTicks = 0, UtcTicksNullable = null}; - var s = JsonConvert.SerializeObject(c1); - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(c1); - var c3 = new C3() - { - UtcTicks = 0, - UtcTicksNullable = 0 - }; - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(c3); - JsonConvert.DeserializeObject(JsonConvert.SerializeObject(c3)).Should().BeEquivalentTo(new C1() {UtcTicks = 0, UtcTicksNullable = 0}); - } - - [Test] - public void TestZero() - { - var c1 = new C1 {UtcTicksNullable = 0}; - var s = JsonConvert.SerializeObject(c1); - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(c1); - var c2 = new C2() - { - UtcTicks = new DateTime(0, DateTimeKind.Utc), - UtcTicksNullable = new DateTime(0, DateTimeKind.Utc), - }; - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(c2); - JsonConvert.DeserializeObject(JsonConvert.SerializeObject(c2)).Should().BeEquivalentTo(c1); - } - - [Test] - public void TestBadValues() - { - var c1 = new C1 {UtcTicks = -1, UtcTicksNullable = DateTime.MaxValue.Ticks + 1}; - var s = JsonConvert.SerializeObject(c1); - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(new C1 {UtcTicks = DateTime.MinValue.Ticks, UtcTicksNullable = DateTime.MaxValue.Ticks}); - JsonConvert.DeserializeObject(s).Should().BeEquivalentTo(new C2() - { - UtcTicks = DateTime.MinValue, - UtcTicksNullable = DateTime.MaxValue - }); - } - - private class C1 - { - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long UtcTicks { get; set; } - - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long? UtcTicksNullable { get; set; } - } - - private class C2 - { - [JsonConverter(typeof(IsoDateTimeConverter))] - public DateTime UtcTicks { get; set; } - - [JsonConverter(typeof(IsoDateTimeConverter))] - public DateTime? UtcTicksNullable { get; set; } - } - - private class C3 - { - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long? UtcTicks { get; set; } - - [JsonConverter(typeof(UtcTicksJsonConverter))] - public long UtcTicksNullable { get; set; } - } + [JsonConverter(typeof(UtcTicksJsonConverter))] + public DateTime UtcTicks { get; set; } + + [JsonConverter(typeof(UtcTicksJsonConverter))] + public DateTime? UtcTicksNullable { get; set; } + } + + private class C3 + { + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long? UtcTicks { get; set; } + + [JsonConverter(typeof(UtcTicksJsonConverter))] + public long UtcTicksNullable { get; set; } } } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue.sln.DotSettings b/Cassandra.DistributedTaskQueue.sln.DotSettings index 6f38970b..04459619 100644 --- a/Cassandra.DistributedTaskQueue.sln.DotSettings +++ b/Cassandra.DistributedTaskQueue.sln.DotSettings @@ -7,4 +7,5 @@ 1 True True - True \ No newline at end of file + True + HINT \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue/Configuration/IRtqTaskDataRegistry.cs b/Cassandra.DistributedTaskQueue/Configuration/IRtqTaskDataRegistry.cs index 15c33856..91fe4af9 100644 --- a/Cassandra.DistributedTaskQueue/Configuration/IRtqTaskDataRegistry.cs +++ b/Cassandra.DistributedTaskQueue/Configuration/IRtqTaskDataRegistry.cs @@ -1,23 +1,22 @@ -using System; +#nullable enable -#nullable enable +using System; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Configuration +namespace SkbKontur.Cassandra.DistributedTaskQueue.Configuration; + +public interface IRtqTaskDataRegistry { - public interface IRtqTaskDataRegistry - { - string[] GetAllTaskNames(); + string[] GetAllTaskNames(); - string GetTaskName(Type type); + string GetTaskName(Type type); - Type GetTaskType(string taskName); + Type GetTaskType(string taskName); - bool TryGetTaskType(string taskName, out Type? taskType); + bool TryGetTaskType(string taskName, out Type? taskType); - string[] GetAllTaskTopics(); + string[] GetAllTaskTopics(); - (string TaskName, string TopicName)[] GetAllTaskNamesWithTopics(); + (string TaskName, string TopicName)[] GetAllTaskNamesWithTopics(); - string GetTaskTopic(string taskName); - } + string GetTaskTopic(string taskName); } \ No newline at end of file diff --git a/Cassandra.DistributedTaskQueue/Configuration/RtqTaskDataRegistryBase.cs b/Cassandra.DistributedTaskQueue/Configuration/RtqTaskDataRegistryBase.cs index d0c81c17..cc6b7f6b 100644 --- a/Cassandra.DistributedTaskQueue/Configuration/RtqTaskDataRegistryBase.cs +++ b/Cassandra.DistributedTaskQueue/Configuration/RtqTaskDataRegistryBase.cs @@ -1,4 +1,6 @@ -using System; +#nullable enable + +using System; using System.Collections.Generic; using System.Globalization; using System.Linq; @@ -6,81 +8,78 @@ using SkbKontur.Cassandra.DistributedTaskQueue.Commons; using SkbKontur.Cassandra.DistributedTaskQueue.Handling; -#nullable enable +namespace SkbKontur.Cassandra.DistributedTaskQueue.Configuration; -namespace SkbKontur.Cassandra.DistributedTaskQueue.Configuration +public abstract class RtqTaskDataRegistryBase : IRtqTaskDataRegistry { - public abstract class RtqTaskDataRegistryBase : IRtqTaskDataRegistry + protected RtqTaskDataRegistryBase(bool allTasksShouldHaveTopic = false) { - protected RtqTaskDataRegistryBase(bool allTasksShouldHaveTopic = false) - { - this.allTasksShouldHaveTopic = allTasksShouldHaveTopic; - } - - protected void Register() where T : IRtqTaskData - { - var taskType = typeof(T); - var taskName = taskType.GetTaskName(); - if (nameToType.ContainsKey(taskName)) - throw new InvalidOperationException($"Duplicate taskName: {taskName}"); - typeToName.Add(taskType, taskName); - nameToType.Add(taskName, taskType); - nameToTopic.Add(taskName, ResolveTopic(taskType, taskName, allTasksShouldHaveTopic)); - } + this.allTasksShouldHaveTopic = allTasksShouldHaveTopic; + } - private static string ResolveTopic(Type taskType, string taskName, bool taskTopicIsRequired) - { - var taskTopic = taskType.TryGetTaskTopic(taskTopicIsRequired); - if (!string.IsNullOrWhiteSpace(taskTopic)) - return taskTopic!; - return ShardingHelpers.GetShard(taskName.GetPersistentHashCode(), topicsCount).ToString(CultureInfo.InvariantCulture); - } + protected void Register() where T : IRtqTaskData + { + var taskType = typeof(T); + var taskName = taskType.GetTaskName(); + if (nameToType.ContainsKey(taskName)) + throw new InvalidOperationException($"Duplicate taskName: {taskName}"); + typeToName.Add(taskType, taskName); + nameToType.Add(taskName, taskType); + nameToTopic.Add(taskName, ResolveTopic(taskType, taskName, allTasksShouldHaveTopic)); + } - public string[] GetAllTaskNames() - { - return nameToType.Keys.ToArray(); - } + private static string ResolveTopic(Type taskType, string taskName, bool taskTopicIsRequired) + { + var taskTopic = taskType.TryGetTaskTopic(taskTopicIsRequired); + if (!string.IsNullOrWhiteSpace(taskTopic)) + return taskTopic!; + return ShardingHelpers.GetShard(taskName.GetPersistentHashCode(), topicsCount).ToString(CultureInfo.InvariantCulture); + } - public string GetTaskName(Type type) - { - if (!typeToName.TryGetValue(type, out var taskName)) - throw new InvalidOperationException($"TaskData with type '{type.FullName}' not registered"); - return taskName; - } + public string[] GetAllTaskNames() + { + return nameToType.Keys.ToArray(); + } - public Type GetTaskType(string taskName) - { - if (!nameToType.TryGetValue(taskName, out var taskType)) - throw new InvalidOperationException($"TaskData with name '{taskName}' not registered"); - return taskType; - } + public string GetTaskName(Type type) + { + if (!typeToName.TryGetValue(type, out var taskName)) + throw new InvalidOperationException($"TaskData with type '{type.FullName}' not registered"); + return taskName; + } - public bool TryGetTaskType(string taskName, out Type? taskType) - { - return nameToType.TryGetValue(taskName, out taskType); - } + public Type GetTaskType(string taskName) + { + if (!nameToType.TryGetValue(taskName, out var taskType)) + throw new InvalidOperationException($"TaskData with name '{taskName}' not registered"); + return taskType; + } - public string[] GetAllTaskTopics() - { - return nameToTopic.Values.Distinct().ToArray(); - } + public bool TryGetTaskType(string taskName, out Type? taskType) + { + return nameToType.TryGetValue(taskName, out taskType); + } - public (string TaskName, string TopicName)[] GetAllTaskNamesWithTopics() - { - return nameToTopic.Select(kvp => (kvp.Key, kvp.Value)).OrderBy(x => x).ToArray(); - } + public string[] GetAllTaskTopics() + { + return nameToTopic.Values.Distinct().ToArray(); + } - public string GetTaskTopic(string taskName) - { - if (!nameToTopic.TryGetValue(taskName, out var taskTopic)) - throw new InvalidOperationException($"TaskData with name '{taskName}' not registered"); - return taskTopic; - } + public (string TaskName, string TopicName)[] GetAllTaskNamesWithTopics() + { + return nameToTopic.Select(kvp => (kvp.Key, kvp.Value)).OrderBy(x => x).ToArray(); + } - private const int topicsCount = 2; - private readonly Dictionary typeToName = new Dictionary(); - private readonly Dictionary nameToType = new Dictionary(); - private readonly Dictionary nameToTopic = new Dictionary(); - private readonly bool allTasksShouldHaveTopic; + public string GetTaskTopic(string taskName) + { + if (!nameToTopic.TryGetValue(taskName, out var taskTopic)) + throw new InvalidOperationException($"TaskData with name '{taskName}' not registered"); + return taskTopic; } + + private const int topicsCount = 2; + private readonly Dictionary typeToName = new Dictionary(); + private readonly Dictionary nameToType = new Dictionary(); + private readonly Dictionary nameToTopic = new Dictionary(); + private readonly bool allTasksShouldHaveTopic; } \ No newline at end of file From b9a56622bae3a14d65c95f56b6d8ff3704163654 Mon Sep 17 00:00:00 2001 From: "skvortsov.m" Date: Wed, 4 Jun 2025 12:02:28 +0300 Subject: [PATCH 2/2] EDITSKS-5515: bump version --- CHANGELOG.md | 3 +++ version.json | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3015036a..4c4b1669 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## v3.4.0 - 2025.06.04 +- Migrate backend to System.Text.Json + ## v3.3.0 - 2025.05.28 - Update react-ui - Update edi-ui diff --git a/version.json b/version.json index 94bd6649..365b1368 100644 --- a/version.json +++ b/version.json @@ -1,5 +1,5 @@ { - "version": "3.3", + "version": "3.4-prerelease1", "assemblyVersion": { "precision": "build" },