diff --git a/.github/workflows/actions.yml b/.github/workflows/actions.yml
index a416d469..48589d77 100644
--- a/.github/workflows/actions.yml
+++ b/.github/workflows/actions.yml
@@ -40,7 +40,7 @@ jobs:
run: dotnet jb cleanupcode Cassandra.DistributedTaskQueue.sln --profile=CatalogueCleanup --verbosity=WARN && git diff --exit-code -- ':!./cassandra-distributed-task-queue-ui/.yarn'
- name: Check front code
run: yarn --cwd cassandra-distributed-task-queue-ui lint
- - name: Build docker compose environment
+ - name: Build docker-compose environment
run: docker compose -f docker-compose.yaml up -d --build
env:
ES_VERSION: ${{ matrix.es-version }}
@@ -50,7 +50,7 @@ jobs:
run: dotnet test --no-build --configuration Release
env:
ES_VERSION: ${{ matrix.es-version }}
- - name: Stop docker compose
+ - name: Stop
if: always()
run: docker compose -f docker-compose.yaml down
env:
diff --git a/.start-all.cmd b/.start-all.cmd
index 9c8c78c8..170f51f2 100644
--- a/.start-all.cmd
+++ b/.start-all.cmd
@@ -1,2 +1,2 @@
wsl -u root sh -c "service docker status || (service docker start && echo 'artificially waiting 20s for docker to warmup...' && sleep 20s)"
-wsl docker-compose -f docker-compose.linux.yaml up -d --build
\ No newline at end of file
+wsl docker compose -f docker-compose.linux.yaml up -d --build
\ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3015036a..4c4b1669 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,8 @@
# Changelog
+## v3.4.0 - 2025.06.04
+- Migrate backend to System.Text.Json
+
## v3.3.0 - 2025.05.28
- Update react-ui
- Update edi-ui
diff --git a/Cassandra.DistributedTaskQueue.Monitoring.TestService/Cassandra.DistributedTaskQueue.Monitoring.TestService.csproj b/Cassandra.DistributedTaskQueue.Monitoring.TestService/Cassandra.DistributedTaskQueue.Monitoring.TestService.csproj
index de0b9da6..86ebe200 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring.TestService/Cassandra.DistributedTaskQueue.Monitoring.TestService.csproj
+++ b/Cassandra.DistributedTaskQueue.Monitoring.TestService/Cassandra.DistributedTaskQueue.Monitoring.TestService.csproj
@@ -9,7 +9,6 @@
-
diff --git a/Cassandra.DistributedTaskQueue.Monitoring.TestService/Startup.cs b/Cassandra.DistributedTaskQueue.Monitoring.TestService/Startup.cs
index 7b2cabc3..6b9e04bc 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring.TestService/Startup.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring.TestService/Startup.cs
@@ -6,23 +6,22 @@
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.TestService
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.TestService;
+
+public class Startup
{
- public class Startup
+ public void ConfigureServices(IServiceCollection services)
{
- public void ConfigureServices(IServiceCollection services)
- {
- services.AddControllers().AddNewtonsoftJson(options => options.SerializerSettings.Converters.Add(new LongToStringConverter()));
- services.AddSingleton(new GroboControllerFactory());
- }
+ services.AddControllers().AddJsonOptions(options => options.JsonSerializerOptions.Converters.Add(new LongToStringConverter()));
+ services.AddSingleton(new GroboControllerFactory());
+ }
- public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
- {
- if (env.IsDevelopment())
- app.UseDeveloperExceptionPage();
+ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
+ {
+ if (env.IsDevelopment())
+ app.UseDeveloperExceptionPage();
- app.UseRouting();
- app.UseEndpoints(endpoints => endpoints.MapControllers());
- }
+ app.UseRouting();
+ app.UseEndpoints(endpoints => endpoints.MapControllers());
}
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchRequest.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchRequest.cs
index b1a6f537..ecdaac56 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchRequest.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchRequest.cs
@@ -1,33 +1,32 @@
-using JetBrains.Annotations;
+using System.Text.Json.Serialization;
-using Newtonsoft.Json;
+using JetBrains.Annotations;
using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Entities;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api;
+
+public class RtqMonitoringSearchRequest
{
- public class RtqMonitoringSearchRequest
- {
- [NotNull]
- [JsonProperty("enqueueTimestampRange")]
- public TimestampRange EnqueueTimestampRange { get; set; }
-
- [CanBeNull]
- [JsonProperty("queryString")]
- public string QueryString { get; set; }
-
- [CanBeNull]
- [JsonProperty("states")]
- public TaskState[] States { get; set; }
-
- [CanBeNull, ItemNotNull]
- [JsonProperty("names")]
- public string[] Names { get; set; }
-
- [JsonProperty("offset")]
- public int? Offset { get; set; }
-
- [JsonProperty("count")]
- public int? Count { get; set; }
- }
+ [NotNull]
+ [JsonPropertyName("enqueueTimestampRange")]
+ public TimestampRange EnqueueTimestampRange { get; set; }
+
+ [CanBeNull]
+ [JsonPropertyName("queryString")]
+ public string QueryString { get; set; }
+
+ [CanBeNull]
+ [JsonPropertyName("states")]
+ public TaskState[] States { get; set; }
+
+ [CanBeNull, ItemNotNull]
+ [JsonPropertyName("names")]
+ public string[] Names { get; set; }
+
+ [JsonPropertyName("offset")]
+ public int? Offset { get; set; }
+
+ [JsonPropertyName("count")]
+ public int? Count { get; set; }
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchResults.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchResults.cs
index d1942b38..0be8bbe0 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchResults.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringSearchResults.cs
@@ -1,16 +1,15 @@
-using JetBrains.Annotations;
+using System.Text.Json.Serialization;
-using Newtonsoft.Json;
+using JetBrains.Annotations;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api;
+
+public class RtqMonitoringSearchResults
{
- public class RtqMonitoringSearchResults
- {
- [JsonProperty("totalCount")]
- public long TotalCount { get; set; }
+ [JsonPropertyName("totalCount")]
+ public long TotalCount { get; set; }
- [NotNull, ItemNotNull]
- [JsonProperty("taskMetas")]
- public RtqMonitoringTaskMeta[] TaskMetas { get; set; }
- }
+ [NotNull, ItemNotNull]
+ [JsonPropertyName("taskMetas")]
+ public RtqMonitoringTaskMeta[] TaskMetas { get; set; } = null!;
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskMeta.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskMeta.cs
index 97635402..acccc947 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskMeta.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskMeta.cs
@@ -1,67 +1,65 @@
-using JetBrains.Annotations;
+using System.Text.Json.Serialization;
-using Newtonsoft.Json;
-using Newtonsoft.Json.Converters;
+using JetBrains.Annotations;
using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Entities;
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api;
+
+public class RtqMonitoringTaskMeta
{
- public class RtqMonitoringTaskMeta
- {
- [NotNull]
- [JsonProperty("name")]
- public string Name { get; set; }
-
- [NotNull]
- [JsonProperty("id")]
- public string Id { get; set; }
-
- [JsonProperty("ticks")]
- [JsonConverter(typeof(LongToStringConverter))]
- public long Ticks { get; set; }
-
- [JsonProperty("minimalStartTicks")]
- [JsonConverter(typeof(LongToStringConverter))]
- public long MinimalStartTicks { get; set; }
-
- [JsonProperty("startExecutingTicks")]
- [JsonConverter(typeof(LongToStringConverter))]
- public long? StartExecutingTicks { get; set; }
-
- [JsonProperty("finishExecutingTicks")]
- [JsonConverter(typeof(LongToStringConverter))]
- public long? FinishExecutingTicks { get; set; }
-
- [JsonProperty("lastModificationTicks")]
- [JsonConverter(typeof(LongToStringConverter))]
- public long? LastModificationTicks { get; set; }
-
- [JsonProperty("expirationTimestampTicks")]
- [JsonConverter(typeof(LongToStringConverter))]
- public long? ExpirationTimestampTicks { get; set; }
-
- [JsonProperty("expirationModificationTicks")]
- [JsonConverter(typeof(LongToStringConverter))]
- public long? ExpirationModificationTicks { get; set; }
-
- [JsonProperty("executionDurationTicks")]
- [JsonConverter(typeof(LongToStringConverter))]
- public long? ExecutionDurationTicks { get; set; }
-
- [JsonProperty("state")]
- [JsonConverter(typeof(StringEnumConverter))]
- public TaskState State { get; set; }
-
- [CanBeNull]
- [JsonProperty("taskActions")]
- public TaskActions TaskActions { get; set; }
-
- [JsonProperty("attempts")]
- public int Attempts { get; set; }
-
- [JsonProperty("parentTaskId")]
- public string ParentTaskId { get; set; }
- }
+ [NotNull]
+ [JsonPropertyName("name")]
+ public string Name { get; set; } = null!;
+
+ [NotNull]
+ [JsonPropertyName("id")]
+ public string Id { get; set; } = null!;
+
+ [JsonPropertyName("ticks")]
+ [JsonConverter(typeof(LongToStringConverter))]
+ public long Ticks { get; set; }
+
+ [JsonPropertyName("minimalStartTicks")]
+ [JsonConverter(typeof(LongToStringConverter))]
+ public long MinimalStartTicks { get; set; }
+
+ [JsonPropertyName("startExecutingTicks")]
+ [JsonConverter(typeof(LongToStringConverter))]
+ public long? StartExecutingTicks { get; set; }
+
+ [JsonPropertyName("finishExecutingTicks")]
+ [JsonConverter(typeof(LongToStringConverter))]
+ public long? FinishExecutingTicks { get; set; }
+
+ [JsonPropertyName("lastModificationTicks")]
+ [JsonConverter(typeof(LongToStringConverter))]
+ public long? LastModificationTicks { get; set; }
+
+ [JsonPropertyName("expirationTimestampTicks")]
+ [JsonConverter(typeof(LongToStringConverter))]
+ public long? ExpirationTimestampTicks { get; set; }
+
+ [JsonPropertyName("expirationModificationTicks")]
+ [JsonConverter(typeof(LongToStringConverter))]
+ public long? ExpirationModificationTicks { get; set; }
+
+ [JsonPropertyName("executionDurationTicks")]
+ [JsonConverter(typeof(LongToStringConverter))]
+ public long? ExecutionDurationTicks { get; set; }
+
+ [JsonPropertyName("state")]
+ [JsonConverter(typeof(JsonStringEnumConverter))]
+ public TaskState State { get; set; }
+
+ [CanBeNull]
+ [JsonPropertyName("taskActions")]
+ public TaskActions TaskActions { get; set; }
+
+ [JsonPropertyName("attempts")]
+ public int Attempts { get; set; }
+
+ [JsonPropertyName("parentTaskId")]
+ public string ParentTaskId { get; set; }
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskModel.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskModel.cs
index 567a5c1b..a2421862 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskModel.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/RtqMonitoringTaskModel.cs
@@ -1,31 +1,30 @@
using JetBrains.Annotations;
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
using SkbKontur.Cassandra.DistributedTaskQueue.Handling;
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api;
+
+public class RtqMonitoringTaskModel
{
- public class RtqMonitoringTaskModel
- {
- public static RtqMonitoringTaskModel Empty => new RtqMonitoringTaskModel();
-
- [NotNull]
- [JsonProperty("taskMeta")]
- public RtqMonitoringTaskMeta TaskMeta { get; set; }
-
- [NotNull]
- [JsonProperty("taskData")]
- [JsonConverter(typeof(TaskDataJsonSerializer))]
- public IRtqTaskData TaskData { get; set; }
-
- [NotNull, ItemNotNull]
- [JsonProperty("childTaskIds")]
- public string[] ChildTaskIds { get; set; }
-
- [NotNull, ItemNotNull]
- [JsonProperty("exceptionInfos")]
- public string[] ExceptionInfos { get; set; }
- }
+ public static RtqMonitoringTaskModel Empty => new RtqMonitoringTaskModel();
+
+ [NotNull]
+ [JsonPropertyName("taskMeta")]
+ public RtqMonitoringTaskMeta TaskMeta { get; set; }
+
+ [NotNull]
+ [JsonPropertyName("taskData")]
+ [JsonConverter(typeof(TaskDataJsonSerializer))]
+ public IRtqTaskData TaskData { get; set; }
+
+ [NotNull, ItemNotNull]
+ [JsonPropertyName("childTaskIds")]
+ public string[] ChildTaskIds { get; set; }
+
+ [NotNull, ItemNotNull]
+ [JsonPropertyName("exceptionInfos")]
+ public string[] ExceptionInfos { get; set; }
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/TaskActions.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/TaskActions.cs
index 4c9e6fd1..d47f8efd 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Api/TaskActions.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/TaskActions.cs
@@ -1,12 +1,12 @@
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api;
public class TaskActions
{
- [JsonProperty("canCancel")]
+ [JsonPropertyName("canCancel")]
public bool CanCancel { get; set; }
- [JsonProperty("canRerun")]
+ [JsonPropertyName("canRerun")]
public bool CanRerun { get; set; }
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Api/TimestampRange.cs b/Cassandra.DistributedTaskQueue.Monitoring/Api/TimestampRange.cs
index 37e36a65..ab4b8c8c 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Api/TimestampRange.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Api/TimestampRange.cs
@@ -1,22 +1,21 @@
-using JetBrains.Annotations;
+using System.Text.Json.Serialization;
-using Newtonsoft.Json;
+using JetBrains.Annotations;
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
using SkbKontur.Cassandra.TimeBasedUuid;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Api;
+
+public class TimestampRange
{
- public class TimestampRange
- {
- [NotNull]
- [JsonProperty("lowerBound")]
- [JsonConverter(typeof(TimestampJsonConverter))]
- public Timestamp LowerBound { get; set; }
+ [NotNull]
+ [JsonPropertyName("lowerBound")]
+ [JsonConverter(typeof(TimestampJsonConverter))]
+ public Timestamp LowerBound { get; set; } = null!;
- [NotNull]
- [JsonProperty("upperBound")]
- [JsonConverter(typeof(TimestampJsonConverter))]
- public Timestamp UpperBound { get; set; }
- }
+ [NotNull]
+ [JsonPropertyName("upperBound")]
+ [JsonConverter(typeof(TimestampJsonConverter))]
+ public Timestamp UpperBound { get; set; } = null!;
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Cassandra.DistributedTaskQueue.Monitoring.csproj b/Cassandra.DistributedTaskQueue.Monitoring/Cassandra.DistributedTaskQueue.Monitoring.csproj
index 72686f4d..45d41ca0 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Cassandra.DistributedTaskQueue.Monitoring.csproj
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Cassandra.DistributedTaskQueue.Monitoring.csproj
@@ -10,8 +10,8 @@
-
+
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/ElasticsearchResponseWrapperExtensions.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/ElasticsearchResponseWrapperExtensions.cs
index b953129d..71322bc8 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/ElasticsearchResponseWrapperExtensions.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/ElasticsearchResponseWrapperExtensions.cs
@@ -1,74 +1,69 @@
-using System;
+#nullable enable
+
+using System;
using System.Linq;
using System.Text;
+using System.Text.Json;
using Elasticsearch.Net;
-using JetBrains.Annotations;
-
-using Newtonsoft.Json;
-
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses;
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk;
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions;
+
+// todo (andrew, 11.11.2018): maybe enable ConnectionConfiguration.ThrowExceptions() instead of all this boilerplate
+internal static class ElasticsearchResponseWrapperExtensions
{
- // todo (andrew, 11.11.2018): maybe enable ConnectionConfiguration.ThrowExceptions() instead of all this boilerplate
- internal static class ElasticsearchResponseWrapperExtensions
+ public static T EnsureSuccess(this T response)
+ where T : ElasticsearchResponseBase
{
- [NotNull]
- public static T EnsureSuccess([NotNull] this T response)
- where T : ElasticsearchResponseBase
- {
- if (!response.Success)
- throw new InvalidOperationException(response.ExtendErrorMessageWithElasticInfo("Unsuccessful response"), response.OriginalException);
- return response;
- }
+ if (!response.Success)
+ throw new InvalidOperationException(response.ExtendErrorMessageWithElasticInfo("Unsuccessful response"), response.OriginalException);
+ return response;
+ }
- public static void DieIfBulkRequestFailed([NotNull] this StringResponse response)
+ public static void DieIfBulkRequestFailed(this StringResponse response)
+ {
+ response.EnsureSuccess();
+ var bulkResponse = JsonSerializer.Deserialize(response.Body);
+ if (bulkResponse!.HasErrors)
{
- response.EnsureSuccess();
- var bulkResponse = JsonConvert.DeserializeObject(response.Body);
- if (bulkResponse.HasErrors)
- {
- var innerExceptions = bulkResponse
- .Items
- .Select(x => x.Index ?? x.Create ?? x.Update ?? x.Delete)
- .Select(CreateExceptionIfError)
- .Where(x => x != null)
- .ToList();
- var message = response.ExtendErrorMessageWithElasticInfo($"Bulk request failure [{innerExceptions.Count}]");
- throw new InvalidOperationException(message, new AggregateException(innerExceptions.Take(100)));
- }
+ var innerExceptions = bulkResponse
+ .Items
+ .Select(x => x.Index ?? x.Create ?? x.Update ?? x.Delete)
+ .Select(CreateExceptionIfError)
+ .Where(x => x != null)
+ .ToList();
+ var message = response.ExtendErrorMessageWithElasticInfo($"Bulk request failure [{innerExceptions.Count}]");
+ throw new InvalidOperationException(message, new AggregateException(innerExceptions.Take(100)));
}
+ }
- [NotNull]
- private static string ExtendErrorMessageWithElasticInfo([CanBeNull] this T response, string errorMessage)
- where T : IElasticsearchResponse
- {
- var fullErrorMessage = new StringBuilder($"ElasticSearch error: '{errorMessage}'").AppendLine();
-
- // note ConnectionConfiguration.EnableMetrics() is gone (https://github.com/elastic/elasticsearch-net/issues/1762)
- // note IElasticSearchResponse.NumberOfRetries is gone (https://stackoverflow.com/questions/38602670/what-is-the-elasticsearch-net-2-x-equivalent-for-ielasticsearchresponse-numberof)
+ private static string ExtendErrorMessageWithElasticInfo(this T? response, string errorMessage)
+ where T : IElasticsearchResponse
+ {
+ var fullErrorMessage = new StringBuilder($"ElasticSearch error: '{errorMessage}'").AppendLine();
- if (response != null)
- {
- if (response.TryGetServerErrorReason(out var serverErrorReason))
- fullErrorMessage.AppendLine($"ServerErrorReason: '{serverErrorReason}'");
+ // note ConnectionConfiguration.EnableMetrics() is gone (https://github.com/elastic/elasticsearch-net/issues/1762)
+ // note IElasticSearchResponse.NumberOfRetries is gone (https://stackoverflow.com/questions/38602670/what-is-the-elasticsearch-net-2-x-equivalent-for-ielasticsearchresponse-numberof)
- fullErrorMessage.AppendLine($"For response: '{response}'");
- }
+ if (response != null)
+ {
+ if (response.TryGetServerErrorReason(out var serverErrorReason))
+ fullErrorMessage.AppendLine($"ServerErrorReason: '{serverErrorReason}'");
- return fullErrorMessage.ToString();
+ fullErrorMessage.AppendLine($"For response: '{response}'");
}
- [CanBeNull]
- private static Exception CreateExceptionIfError([NotNull] ResponseBase responseItem, int requestNumber)
- {
- return responseItem.Status >= 400 && responseItem.Status < 600
- ? new InvalidOperationException($"Request number #{requestNumber} failed: '{responseItem.ToPrettyJson()}'")
- : null;
- }
+ return fullErrorMessage.ToString();
+ }
+
+ private static Exception? CreateExceptionIfError(ResponseBase responseItem, int requestNumber)
+ {
+ return responseItem.Status >= 400 && responseItem.Status < 600
+ ? new InvalidOperationException($"Request number #{requestNumber} failed: '{responseItem.ToPrettyJson()}'")
+ : null;
}
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/BulkResponse.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/BulkResponse.cs
index e4f85809..9a3fee97 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/BulkResponse.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/BulkResponse.cs
@@ -1,16 +1,15 @@
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk;
+
+internal class BulkResponse
{
- internal class BulkResponse
- {
- [JsonProperty(PropertyName = "took")]
- public int TimeMs { get; set; }
+ [JsonPropertyName("took")]
+ public int TimeMs { get; set; }
- [JsonProperty(PropertyName = "errors")]
- public bool HasErrors { get; set; }
+ [JsonPropertyName("errors")]
+ public bool HasErrors { get; set; }
- [JsonProperty(PropertyName = "items")]
- public ItemInfo[] Items { get; set; }
- }
+ [JsonPropertyName("items")]
+ public ItemInfo[] Items { get; set; }
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/IndexBulkResponse.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/IndexBulkResponse.cs
index 2d318121..8e300c66 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/IndexBulkResponse.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/IndexBulkResponse.cs
@@ -1,19 +1,18 @@
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk;
+
+internal class IndexBulkResponse : ResponseBase
{
- internal class IndexBulkResponse : ResponseBase
- {
- [JsonProperty(PropertyName = "_index")]
- public string Index { get; set; }
+ [JsonPropertyName("_index")]
+ public string Index { get; set; }
- [JsonProperty(PropertyName = "_type")]
- public string Type { get; set; }
+ [JsonPropertyName("_type")]
+ public string Type { get; set; }
- [JsonProperty(PropertyName = "_id")]
- public string Id { get; set; }
+ [JsonPropertyName("_id")]
+ public string Id { get; set; }
- [JsonProperty(PropertyName = "_version")]
- public long Version { get; set; }
- }
+ [JsonPropertyName("_version")]
+ public long Version { get; set; }
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/ItemInfo.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/ItemInfo.cs
index f21e68da..9b47e6c7 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/ItemInfo.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Bulk/ItemInfo.cs
@@ -1,19 +1,18 @@
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Bulk;
+
+internal class ItemInfo
{
- internal class ItemInfo
- {
- [JsonProperty(PropertyName = "index")]
- public IndexBulkResponse Index { get; set; }
+ [JsonPropertyName("index")]
+ public IndexBulkResponse Index { get; set; }
- [JsonProperty(PropertyName = "create")]
- public ResponseBase Create { get; set; }
+ [JsonPropertyName("create")]
+ public ResponseBase Create { get; set; }
- [JsonProperty(PropertyName = "update")]
- public ResponseBase Update { get; set; }
+ [JsonPropertyName("update")]
+ public ResponseBase Update { get; set; }
- [JsonProperty(PropertyName = "delete")]
- public ResponseBase Delete { get; set; }
- }
+ [JsonPropertyName("delete")]
+ public ResponseBase Delete { get; set; }
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Get/GetResponse.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Get/GetResponse.cs
index 02873470..fab04199 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Get/GetResponse.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Get/GetResponse.cs
@@ -1,25 +1,24 @@
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Get
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Get;
+
+internal class GetResponse
{
- internal class GetResponse
- {
- [JsonProperty(PropertyName = "_index")]
- public string Index { get; set; }
+ [JsonPropertyName("_index")]
+ public string Index { get; set; }
- [JsonProperty(PropertyName = "_type")]
- public string Type { get; set; }
+ [JsonPropertyName("_type")]
+ public string Type { get; set; }
- [JsonProperty(PropertyName = "_id")]
- public string Id { get; set; }
+ [JsonPropertyName("_id")]
+ public string Id { get; set; }
- [JsonProperty(PropertyName = "_version")]
- public long Version { get; set; }
+ [JsonPropertyName("_version")]
+ public long Version { get; set; }
- [JsonProperty(PropertyName = "found")]
- public bool Found { get; set; }
+ [JsonPropertyName("found")]
+ public bool Found { get; set; }
- [JsonProperty(PropertyName = "_source")]
- public T Source { get; set; }
- }
+ [JsonPropertyName("_source")]
+ public T Source { get; set; }
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/ResponseBase.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/ResponseBase.cs
index e096d783..398e8803 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/ResponseBase.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/ResponseBase.cs
@@ -1,13 +1,12 @@
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses;
+
+internal class ResponseBase
{
- internal class ResponseBase
- {
- [JsonProperty(PropertyName = "status")]
- public int Status { get; set; }
+ [JsonPropertyName("status")]
+ public int Status { get; set; }
- [JsonProperty(PropertyName = "error")]
- public object Error { get; set; }
- }
+ [JsonPropertyName("error")]
+ public object Error { get; set; }
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitInfo.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitInfo.cs
index 9598ffed..75fe33b6 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitInfo.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitInfo.cs
@@ -1,13 +1,11 @@
-using JetBrains.Annotations;
+#nullable enable
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search;
+
+internal class HitInfo
{
- internal class HitInfo
- {
- [NotNull]
- [JsonProperty(PropertyName = "_id")]
- public string Id { get; set; }
- }
+ [JsonPropertyName("_id")]
+ public string Id { get; set; } = null!;
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitsCollection.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitsCollection.cs
index ae6e9609..70330525 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitsCollection.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/HitsCollection.cs
@@ -1,19 +1,17 @@
-using JetBrains.Annotations;
+#nullable enable
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search;
+
+internal class HitsCollection
{
- internal class HitsCollection
- {
- [JsonProperty(PropertyName = "total")]
- [JsonConverter(typeof(TotalCountCompatibilityConverter))]
- public long TotalCount { get; set; }
+ [JsonPropertyName("total")]
+ [JsonConverter(typeof(TotalCountCompatibilityConverter))]
+ public long TotalCount { get; set; }
- [NotNull, ItemNotNull]
- [JsonProperty(PropertyName = "hits")]
- public HitInfo[] Hits { get; set; }
- }
+ [JsonPropertyName("hits")]
+ public HitInfo[] Hits { get; set; } = null!;
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/SearchResponse.cs b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/SearchResponse.cs
index 71e17bca..48dbdbbf 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/SearchResponse.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/ElasticsearchClientExtensions/Responses/Search/SearchResponse.cs
@@ -1,13 +1,11 @@
-using JetBrains.Annotations;
+#nullable enable
-using Newtonsoft.Json;
+using System.Text.Json.Serialization;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.ElasticsearchClientExtensions.Responses.Search;
+
+internal class SearchResponse
{
- internal class SearchResponse
- {
- [NotNull]
- [JsonProperty(PropertyName = "hits")]
- public HitsCollection Hits { get; set; }
- }
+ [JsonPropertyName("hits")]
+ public HitsCollection Hits { get; set; } = null!;
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Indexer/RtqElasticsearchIndexerSettings.cs b/Cassandra.DistributedTaskQueue.Monitoring/Indexer/RtqElasticsearchIndexerSettings.cs
index b48bfd70..6cef3801 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Indexer/RtqElasticsearchIndexerSettings.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Indexer/RtqElasticsearchIndexerSettings.cs
@@ -1,80 +1,69 @@
-using System;
+#nullable enable
-using JetBrains.Annotations;
-
-using Newtonsoft.Json;
-using Newtonsoft.Json.Converters;
+using System;
+using System.Text.Json;
+using System.Text.Json.Serialization;
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
using SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Storage.Writing;
using SkbKontur.Cassandra.TimeBasedUuid;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer;
+
+public class RtqElasticsearchIndexerSettings
{
- public class RtqElasticsearchIndexerSettings
+ public RtqElasticsearchIndexerSettings(string eventFeedKey, string rtqGraphitePathPrefix)
{
- public RtqElasticsearchIndexerSettings([NotNull] string eventFeedKey, [NotNull] string rtqGraphitePathPrefix)
- {
- if (string.IsNullOrEmpty(eventFeedKey))
- throw new InvalidOperationException("eventFeedKey is empty");
- if (string.IsNullOrEmpty(rtqGraphitePathPrefix))
- throw new InvalidOperationException("rtqGraphitePathPrefix is empty");
+ if (string.IsNullOrEmpty(eventFeedKey))
+ throw new InvalidOperationException("eventFeedKey is empty");
+ if (string.IsNullOrEmpty(rtqGraphitePathPrefix))
+ throw new InvalidOperationException("rtqGraphitePathPrefix is empty");
- EventFeedKey = eventFeedKey;
- RtqGraphitePathPrefix = rtqGraphitePathPrefix;
- }
-
- [NotNull]
- public string EventFeedKey { get; }
+ EventFeedKey = eventFeedKey;
+ RtqGraphitePathPrefix = rtqGraphitePathPrefix;
+ }
- [NotNull]
- public string RtqGraphitePathPrefix { get; }
+ public static JsonSerializerOptions GetJsonOptions()
+ {
+ var defaultJsonSerializerOptions = new JsonSerializerOptions();
+ defaultJsonSerializerOptions.Converters.Add(new TruncateLongStringsConverter(500));
+ defaultJsonSerializerOptions.Converters.Add(new JsonStringEnumConverter());
+ defaultJsonSerializerOptions.Converters.Add(new TimestampJsonConverter());
+ defaultJsonSerializerOptions.Converters.Add(new TimeGuidJsonConverter());
+ defaultJsonSerializerOptions.Converters.Add(new OmitBinaryAndAbstractPropertyConverter());
+
+ return defaultJsonSerializerOptions;
+ }
- [NotNull]
- public string PerfGraphitePathPrefix => $"{RtqGraphitePathPrefix}.ElasticsearchIndexer.Perf";
+ public string EventFeedKey { get; }
- [NotNull]
- public string EventFeedGraphitePathPrefix => $"{RtqGraphitePathPrefix}.ElasticsearchIndexer.EventFeed";
+ public string RtqGraphitePathPrefix { get; }
- [NotNull]
- public Timestamp InitialIndexingStartTimestamp { get; set; } = new Timestamp(new DateTime(2020, 01, 01, 0, 0, 0, DateTimeKind.Utc));
+ public string PerfGraphitePathPrefix => $"{RtqGraphitePathPrefix}.ElasticsearchIndexer.Perf";
- public TimeSpan MaxEventsProcessingTimeWindow { get; set; } = TimeSpan.FromHours(1);
+ public string EventFeedGraphitePathPrefix => $"{RtqGraphitePathPrefix}.ElasticsearchIndexer.EventFeed";
- public int MaxEventsProcessingTasksCount { get; set; } = 60000;
+ public Timestamp InitialIndexingStartTimestamp { get; } = new(new DateTime(2020, 01, 01, 0, 0, 0, DateTimeKind.Utc));
- public int TaskIdsProcessingBatchSize { get; set; } = 4000;
+ public TimeSpan MaxEventsProcessingTimeWindow { get; } = TimeSpan.FromHours(1);
- public int IndexingThreadsCount { get; set; } = 2;
+ public int MaxEventsProcessingTasksCount { get; set; } = 60000;
- public TimeSpan BulkIndexRequestTimeout { get; set; } = TimeSpan.FromMinutes(5);
+ public int TaskIdsProcessingBatchSize { get; set; } = 4000;
- public TimeSpan InitialIndexingOffsetFromNow { get; set; } = TimeSpan.FromMinutes(30);
+ public int IndexingThreadsCount { get; set; } = 2;
- [NotNull]
- public JsonSerializerSettings JsonSerializerSettings { get; } = DefaultJsonSerializerSettings;
+ public TimeSpan BulkIndexRequestTimeout { get; set; } = TimeSpan.FromMinutes(5);
- public override string ToString()
- {
- return $"InitialIndexingStartTimestamp: {InitialIndexingStartTimestamp}, " +
- $"MaxEventsProcessingTimeWindow: {MaxEventsProcessingTimeWindow}, " +
- $"MaxEventsProcessingTasksCount: {MaxEventsProcessingTasksCount}, " +
- $"TaskIdsProcessingBatchSize: {TaskIdsProcessingBatchSize}, " +
- $"IndexingThreadsCount: {IndexingThreadsCount}, " +
- $"BulkIndexRequestTimeout: {BulkIndexRequestTimeout}";
- }
+ public TimeSpan InitialIndexingOffsetFromNow { get; set; } = TimeSpan.FromMinutes(30);
- [NotNull]
- public static readonly JsonSerializerSettings DefaultJsonSerializerSettings = new JsonSerializerSettings
- {
- ContractResolver = new OmitBinaryAndAbstractPropertiesContractResolver(),
- Converters = new JsonConverter[]
- {
- new TruncateLongStringsConverter(500),
- new StringEnumConverter(),
- new TimestampJsonConverter(),
- new TimeGuidJsonConverter()
- },
- };
+ public override string ToString()
+ {
+ return $"InitialIndexingStartTimestamp: {InitialIndexingStartTimestamp}, " +
+ $"MaxEventsProcessingTimeWindow: {MaxEventsProcessingTimeWindow}, " +
+ $"MaxEventsProcessingTasksCount: {MaxEventsProcessingTasksCount}, " +
+ $"TaskIdsProcessingBatchSize: {TaskIdsProcessingBatchSize}, " +
+ $"IndexingThreadsCount: {IndexingThreadsCount}, " +
+ $"BulkIndexRequestTimeout: {BulkIndexRequestTimeout}";
}
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Indexer/TaskMetaProcessor.cs b/Cassandra.DistributedTaskQueue.Monitoring/Indexer/TaskMetaProcessor.cs
index 6cc74e23..52ceb0d7 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Indexer/TaskMetaProcessor.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Indexer/TaskMetaProcessor.cs
@@ -1,17 +1,16 @@
-using System;
+#nullable enable
+
+using System;
using System.Collections.Generic;
using System.Linq;
+using System.Text.Json;
using Elasticsearch.Net;
using GroBuf;
-using JetBrains.Annotations;
-
using MoreLinqInlined;
-using Newtonsoft.Json;
-
using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Entities;
using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Repositories;
using SkbKontur.Cassandra.DistributedTaskQueue.Cassandra.Repositories.BlobStorages;
@@ -24,146 +23,143 @@
using Vostok.Logging.Abstractions;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Indexer;
+
+public class TaskMetaProcessor
{
- public class TaskMetaProcessor
+ public TaskMetaProcessor(ILog logger,
+ RtqElasticsearchIndexerSettings settings,
+ IRtqElasticsearchClient elasticClient,
+ RemoteTaskQueue remoteTaskQueue,
+ RtqMonitoringPerfGraphiteReporter perfGraphiteReporter)
{
- public TaskMetaProcessor(ILog logger,
- RtqElasticsearchIndexerSettings settings,
- IRtqElasticsearchClient elasticClient,
- RemoteTaskQueue remoteTaskQueue,
- RtqMonitoringPerfGraphiteReporter perfGraphiteReporter)
- {
- this.logger = logger.ForContext(nameof(TaskMetaProcessor));
- this.settings = settings;
- handleTasksMetaStorage = remoteTaskQueue.HandleTasksMetaStorage;
- taskDataRegistry = remoteTaskQueue.TaskDataRegistry;
- taskDataStorage = remoteTaskQueue.TaskDataStorage;
- taskExceptionInfoStorage = remoteTaskQueue.TaskExceptionInfoStorage;
- serializer = remoteTaskQueue.Serializer;
- this.perfGraphiteReporter = perfGraphiteReporter;
- this.elasticClient = elasticClient;
- bulkRequestTimeout = new BulkRequestParameters
- {
- Timeout = this.settings.BulkIndexRequestTimeout,
- RequestConfiguration = new RequestConfiguration {RequestTimeout = this.settings.BulkIndexRequestTimeout}
- };
- }
+ this.logger = logger.ForContext(nameof(TaskMetaProcessor));
+ this.settings = settings;
+ handleTasksMetaStorage = remoteTaskQueue.HandleTasksMetaStorage;
+ taskDataRegistry = remoteTaskQueue.TaskDataRegistry;
+ taskDataStorage = remoteTaskQueue.TaskDataStorage;
+ taskExceptionInfoStorage = remoteTaskQueue.TaskExceptionInfoStorage;
+ serializer = remoteTaskQueue.Serializer;
+ this.perfGraphiteReporter = perfGraphiteReporter;
+ this.elasticClient = elasticClient;
+ bulkRequestTimeout = new BulkRequestParameters
+ {
+ Timeout = this.settings.BulkIndexRequestTimeout,
+ RequestConfiguration = new RequestConfiguration {RequestTimeout = this.settings.BulkIndexRequestTimeout}
+ };
+ }
- public void ProcessTasks([NotNull, ItemNotNull] List taskIdsToProcess)
- {
- logger.Info("Processing tasks: {ProcessingTasksCount}", new {ProcessingTasksCount = taskIdsToProcess.Count});
- taskIdsToProcess.Batch(settings.TaskIdsProcessingBatchSize, Enumerable.ToArray)
- .AsParallel()
- .WithDegreeOfParallelism(settings.IndexingThreadsCount)
- .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
- .ForAll(taskIds =>
- {
- var taskMetas = perfGraphiteReporter.ReportTiming("ReadTaskMetas", () => handleTasksMetaStorage.GetMetas(taskIds));
- var taskMetasToIndex = taskMetas.Values.Where(x => x.Ticks > settings.InitialIndexingStartTimestamp.Ticks).ToArray();
- if (taskMetasToIndex.Any())
- IndexMetas(taskMetasToIndex);
- });
- }
+ public void ProcessTasks(List taskIdsToProcess)
+ {
+ logger.Info("Processing tasks: {ProcessingTasksCount}", new {ProcessingTasksCount = taskIdsToProcess.Count});
+ taskIdsToProcess.Batch(settings.TaskIdsProcessingBatchSize, Enumerable.ToArray)
+ .AsParallel()
+ .WithDegreeOfParallelism(settings.IndexingThreadsCount)
+ .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
+ .ForAll(taskIds =>
+ {
+ var taskMetas = perfGraphiteReporter.ReportTiming("ReadTaskMetas", () => handleTasksMetaStorage.GetMetas(taskIds));
+ var taskMetasToIndex = taskMetas.Values.Where(x => x.Ticks > settings.InitialIndexingStartTimestamp.Ticks).ToArray();
+ if (taskMetasToIndex.Any())
+ IndexMetas(taskMetasToIndex);
+ });
+ }
- private void IndexMetas([NotNull, ItemNotNull] TaskMetaInformation[] batch)
+ private void IndexMetas(TaskMetaInformation[] batch)
+ {
+ var taskDatas = perfGraphiteReporter.ReportTiming("ReadTaskDatas", () => taskDataStorage.Read(batch));
+ var taskExceptionInfos = perfGraphiteReporter.ReportTiming("ReadTaskExceptionInfos", () => taskExceptionInfoStorage.Read(batch));
+ var enrichedBatch = new ( TaskMetaInformation TaskMeta, TaskExceptionInfo[] TaskExceptionInfos, object? TaskData)[batch.Length];
+ for (var i = 0; i < batch.Length; i++)
{
- var taskDatas = perfGraphiteReporter.ReportTiming("ReadTaskDatas", () => taskDataStorage.Read(batch));
- var taskExceptionInfos = perfGraphiteReporter.ReportTiming("ReadTaskExceptionInfos", () => taskExceptionInfoStorage.Read(batch));
- var enrichedBatch = new ( /*[NotNull]*/ TaskMetaInformation TaskMeta, /*[NotNull, ItemNotNull]*/ TaskExceptionInfo[] TaskExceptionInfos, /*[CanBeNull]*/ object TaskData)[batch.Length];
- for (var i = 0; i < batch.Length; i++)
+ var taskMeta = batch[i];
+ object? taskDataObj = null;
+ if (taskDatas.TryGetValue(taskMeta.Id, out var taskData))
{
- var taskMeta = batch[i];
- object taskDataObj = null;
- if (taskDatas.TryGetValue(taskMeta.Id, out var taskData))
- {
- if (taskDataRegistry.TryGetTaskType(taskMeta.Name, out var taskType))
- taskDataObj = TryDeserializeTaskData(taskType, taskData, taskMeta);
- }
- enrichedBatch[i] = (taskMeta, taskExceptionInfos[taskMeta.Id], taskDataObj);
+ if (taskDataRegistry.TryGetTaskType(taskMeta.Name, out var taskType))
+ taskDataObj = TryDeserializeTaskData(taskType!, taskData, taskMeta);
}
- perfGraphiteReporter.ReportTiming("IndexBatch", () => IndexBatch(enrichedBatch));
+ enrichedBatch[i] = (taskMeta, taskExceptionInfos[taskMeta.Id], taskDataObj);
}
+ perfGraphiteReporter.ReportTiming("IndexBatch", () => IndexBatch(enrichedBatch));
+ }
- private void IndexBatch([NotNull] ( /*[NotNull]*/ TaskMetaInformation TaskMeta, /*[NotNull, ItemNotNull]*/ TaskExceptionInfo[] TaskExceptionInfos, /*[CanBeNull]*/ object TaskData)[] batch)
+ private void IndexBatch((TaskMetaInformation TaskMeta, TaskExceptionInfo[] TaskExceptionInfos, object? TaskData)[] batch)
+ {
+ logger.Info("IndexBatch: {BatchLength} tasks", new {BatchLength = batch.Length});
+ var payload = new string[batch.Length * 2];
+ for (var i = 0; i < batch.Length; i++)
{
- logger.Info("IndexBatch: {BatchLength} tasks", new {BatchLength = batch.Length});
- var payload = new string[batch.Length * 2];
- for (var i = 0; i < batch.Length; i++)
- {
- payload[2 * i] = JsonConvert.SerializeObject(new {index = CreateIndexInfo(batch[i].TaskMeta)});
- var taskIndexedInfo = BuildTaskIndexedInfo(batch[i].TaskMeta, batch[i].TaskExceptionInfos, batch[i].TaskData);
- payload[2 * i + 1] = JsonConvert.SerializeObject(taskIndexedInfo, settings.JsonSerializerSettings);
- }
- perfGraphiteReporter.ReportTiming("ElasticsearchClient_Bulk", () => elasticClient.Bulk(PostData.MultiJson(payload), bulkRequestTimeout).DieIfBulkRequestFailed());
+ payload[2 * i] = JsonSerializer.Serialize(new {index = CreateIndexInfo(batch[i].TaskMeta)});
+ var taskIndexedInfo = BuildTaskIndexedInfo(batch[i].TaskMeta, batch[i].TaskExceptionInfos, batch[i].TaskData);
+ payload[2 * i + 1] = JsonSerializer.Serialize(taskIndexedInfo, RtqElasticsearchIndexerSettings.GetJsonOptions());
}
+ perfGraphiteReporter.ReportTiming("ElasticsearchClient_Bulk", () => elasticClient.Bulk(PostData.MultiJson(payload), bulkRequestTimeout).DieIfBulkRequestFailed());
+ }
- private object CreateIndexInfo(TaskMetaInformation taskMeta)
+ private object CreateIndexInfo(TaskMetaInformation taskMeta)
+ {
+ var indexName = DateTimeFormatter.DateFromTicks(taskMeta.Ticks).ToString(RtqElasticsearchConsts.DataIndexNameFormat);
+ if (elasticClient.UseElastic7)
{
- var indexName = DateTimeFormatter.DateFromTicks(taskMeta.Ticks).ToString(RtqElasticsearchConsts.DataIndexNameFormat);
- if (elasticClient.UseElastic7)
- {
- return new
- {
- _index = indexName,
- _id = taskMeta.Id,
- };
- }
return new
{
_index = indexName,
- _type = RtqElasticsearchConsts.RtqIndexTypeName, // see https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html
_id = taskMeta.Id,
};
}
+ return new
+ {
+ _index = indexName,
+ _type = RtqElasticsearchConsts.RtqIndexTypeName, // see https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html
+ _id = taskMeta.Id,
+ };
+ }
- [CanBeNull]
- private object TryDeserializeTaskData([NotNull] Type taskType, [NotNull] byte[] taskData, [NotNull] TaskMetaInformation taskMetaInformation)
+ private object TryDeserializeTaskData(Type taskType, byte[] taskData, TaskMetaInformation taskMetaInformation)
+ {
+ try
{
- try
- {
- return serializer.Deserialize(taskType, taskData);
- }
- catch (Exception e)
- {
- logger.Error(e, "Failed to deserialize taskData for: {RtqTaskMeta}", new {RtqTaskMeta = taskMetaInformation});
- return null;
- }
+ return serializer.Deserialize(taskType, taskData);
}
-
- [NotNull]
- private static object BuildTaskIndexedInfo([NotNull] TaskMetaInformation taskMeta, [NotNull, ItemNotNull] TaskExceptionInfo[] taskExceptionInfos, [CanBeNull] object taskData)
+ catch (Exception e)
{
- var executionDurationTicks = taskMeta.ExecutionDurationTicks;
- var meta = new MetaIndexedInfo
- {
- Id = taskMeta.Id,
- Name = taskMeta.Name,
- State = taskMeta.State.ToString(),
- Attempts = taskMeta.Attempts,
- ParentTaskId = taskMeta.ParentTaskId,
- TaskGroupLock = taskMeta.TaskGroupLock,
- EnqueueTime = taskMeta.Ticks,
- FinishExecutingTime = taskMeta.FinishExecutingTicks,
- LastModificationTime = taskMeta.LastModificationTicks ?? 0,
- MinimalStartTime = taskMeta.MinimalStartTicks,
- StartExecutingTime = taskMeta.StartExecutingTicks,
- ExpirationTime = taskMeta.ExpirationTimestampTicks ?? 0,
- LastExecutionDurationInMs = executionDurationTicks != null ? TimeSpan.FromTicks(executionDurationTicks.Value).TotalMilliseconds : (double?)null
- };
- var exceptionInfo = string.Join("\r\n", taskExceptionInfos.Reverse().Select(x => x.ExceptionMessageInfo));
- return new TaskIndexedInfo(meta, exceptionInfo, taskData);
+ logger.Error(e, "Failed to deserialize taskData for: {RtqTaskMeta}", new {RtqTaskMeta = taskMetaInformation});
+ return null;
}
+ }
- private readonly ILog logger;
- private readonly RtqElasticsearchIndexerSettings settings;
- private readonly IHandleTasksMetaStorage handleTasksMetaStorage;
- private readonly IRtqTaskDataRegistry taskDataRegistry;
- private readonly ITaskDataStorage taskDataStorage;
- private readonly ITaskExceptionInfoStorage taskExceptionInfoStorage;
- private readonly ISerializer serializer;
- private readonly RtqMonitoringPerfGraphiteReporter perfGraphiteReporter;
- private readonly IRtqElasticsearchClient elasticClient;
- private readonly BulkRequestParameters bulkRequestTimeout;
+ private static object BuildTaskIndexedInfo(TaskMetaInformation taskMeta, TaskExceptionInfo[] taskExceptionInfos, object? taskData)
+ {
+ var executionDurationTicks = taskMeta.ExecutionDurationTicks;
+ var meta = new MetaIndexedInfo
+ {
+ Id = taskMeta.Id,
+ Name = taskMeta.Name,
+ State = taskMeta.State.ToString(),
+ Attempts = taskMeta.Attempts,
+ ParentTaskId = taskMeta.ParentTaskId,
+ TaskGroupLock = taskMeta.TaskGroupLock,
+ EnqueueTime = taskMeta.Ticks,
+ FinishExecutingTime = taskMeta.FinishExecutingTicks,
+ LastModificationTime = taskMeta.LastModificationTicks ?? 0,
+ MinimalStartTime = taskMeta.MinimalStartTicks,
+ StartExecutingTime = taskMeta.StartExecutingTicks,
+ ExpirationTime = taskMeta.ExpirationTimestampTicks ?? 0,
+ LastExecutionDurationInMs = executionDurationTicks != null ? TimeSpan.FromTicks(executionDurationTicks.Value).TotalMilliseconds : (double?)null
+ };
+ var exceptionInfo = string.Join("\r\n", taskExceptionInfos.Reverse().Select(x => x.ExceptionMessageInfo));
+ return new TaskIndexedInfo(meta, exceptionInfo, taskData);
}
+
+ private readonly ILog logger;
+ private readonly RtqElasticsearchIndexerSettings settings;
+ private readonly IHandleTasksMetaStorage handleTasksMetaStorage;
+ private readonly IRtqTaskDataRegistry taskDataRegistry;
+ private readonly ITaskDataStorage taskDataStorage;
+ private readonly ITaskExceptionInfoStorage taskExceptionInfoStorage;
+ private readonly ISerializer serializer;
+ private readonly RtqMonitoringPerfGraphiteReporter perfGraphiteReporter;
+ private readonly IRtqElasticsearchClient elasticClient;
+ private readonly BulkRequestParameters bulkRequestTimeout;
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/JsonObjectExtensions.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/JsonObjectExtensions.cs
index 0da88e30..114bcc11 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Json/JsonObjectExtensions.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/JsonObjectExtensions.cs
@@ -1,24 +1,19 @@
-using System.Linq;
+#nullable enable
-using JetBrains.Annotations;
+using System.Text.Json;
+using System.Text.Json.Serialization;
-using Newtonsoft.Json;
-using Newtonsoft.Json.Converters;
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json
+internal static class JsonObjectExtensions
{
- internal static class JsonObjectExtensions
+ public static string ToPrettyJson(this T obj)
{
- [NotNull]
- public static string ToPrettyJson([CanBeNull] this T o, [NotNull] params JsonConverter[] converters)
- {
- converters = converters.Concat(new JsonConverter[]
- {
- new StringEnumConverter(),
- new TimestampJsonConverter(),
- new TimeGuidJsonConverter()
- }).ToArray();
- return JsonConvert.SerializeObject(o, Formatting.Indented, converters);
- }
+ var options = new JsonSerializerOptions();
+ options.Converters.Add(new JsonStringEnumConverter());
+ options.Converters.Add(new TimestampJsonConverter());
+ options.Converters.Add(new TimeGuidJsonConverter());
+
+ return JsonSerializer.Serialize(obj, options);
}
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/LongToStringConverter.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/LongToStringConverter.cs
index ec0d4f96..3f1ef6cb 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Json/LongToStringConverter.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/LongToStringConverter.cs
@@ -1,40 +1,36 @@
-using System;
+#nullable enable
-using Newtonsoft.Json;
+using System;
+using System.Text.Json;
+using System.Text.Json.Serialization;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
+
+///
+/// Конвертер, которые сериализует long-и в строки вместо чисел,
+/// ибо JSON.parse() на длинных long-ах теряет последние разряды
+///
+public class LongToStringConverter : JsonConverter
{
- ///
- /// Конвертер, которые сериализует long-и в строки вместо чисел,
- /// ибо JSON.parse() на длинных long-ах теряет последние разряды
- ///
- public class LongToStringConverter : JsonConverter
+ public override long? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
- public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
+ var value = JsonSerializer.Deserialize(ref reader);
+ if (value == null)
{
- if (value == null)
- {
- writer.WriteNull();
- }
- else
- {
- writer.WriteValue(value.ToString());
- }
+ return null;
}
+ return long.Parse(value);
+ }
- public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
+ public override void Write(Utf8JsonWriter writer, long? value, JsonSerializerOptions options)
+ {
+ if (value == null)
{
- var value = serializer.Deserialize(reader);
- if (value == null)
- {
- return null;
- }
- return long.Parse(value);
+ writer.WriteNullValue();
}
-
- public override bool CanConvert(Type objectType)
+ else
{
- return typeof(long) == objectType || typeof(long?) == objectType;
+ writer.WriteStringValue(value.ToString());
}
}
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/ObjectJsonSerializer.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/ObjectJsonSerializer.cs
index 0fd4af15..13e528a2 100644
--- a/Cassandra.DistributedTaskQueue.Monitoring/Json/ObjectJsonSerializer.cs
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/ObjectJsonSerializer.cs
@@ -1,34 +1,25 @@
using System;
-
-using Newtonsoft.Json;
-using Newtonsoft.Json.Converters;
+using System.Text.Json;
+using System.Text.Json.Serialization;
using SkbKontur.Cassandra.DistributedTaskQueue.Handling;
-namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
+
+public class TaskDataJsonSerializer : JsonConverter
{
- public class TaskDataJsonSerializer : JsonConverter
+ public override IRtqTaskData Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
- public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
- {
- staticSerializer.Serialize(writer, value);
- }
-
- public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
- {
- throw new NotImplementedException();
- }
-
- public override bool CanConvert(Type objectType)
- {
- return typeof(IRtqTaskData).IsAssignableFrom(objectType);
- }
+ throw new NotImplementedException();
+ }
- public override bool CanRead => false;
+ public override void Write(Utf8JsonWriter writer, IRtqTaskData value, JsonSerializerOptions options)
+ {
+ JsonSerializerOptions jsonSerializerOptions = new();
+ jsonSerializerOptions.Converters.Add(new JsonStringEnumConverter());
+ jsonSerializerOptions.Converters.Add(new TimeGuidJsonConverter());
+ jsonSerializerOptions.Converters.Add(new TimestampJsonConverter());
- private static readonly JsonSerializer staticSerializer = new JsonSerializer
- {
- Converters = {new StringEnumConverter(), new TimeGuidJsonConverter(), new TimestampJsonConverter()}
- };
+ JsonSerializer.Serialize(writer, value, jsonSerializerOptions);
}
}
\ No newline at end of file
diff --git a/Cassandra.DistributedTaskQueue.Monitoring/Json/OmitBinaryAndAbstractPropertyConverter.cs b/Cassandra.DistributedTaskQueue.Monitoring/Json/OmitBinaryAndAbstractPropertyConverter.cs
new file mode 100644
index 00000000..844d6dd1
--- /dev/null
+++ b/Cassandra.DistributedTaskQueue.Monitoring/Json/OmitBinaryAndAbstractPropertyConverter.cs
@@ -0,0 +1,28 @@
+using System;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+
+namespace SkbKontur.Cassandra.DistributedTaskQueue.Monitoring.Json;
+
+public class OmitBinaryAndAbstractPropertyConverter : JsonConverter