diff --git a/src/ApiService/ApiService/Functions/QueueJobResult.cs b/src/ApiService/ApiService/Functions/QueueJobResult.cs index 5e3bec0048..31b39802d6 100644 --- a/src/ApiService/ApiService/Functions/QueueJobResult.cs +++ b/src/ApiService/ApiService/Functions/QueueJobResult.cs @@ -31,7 +31,12 @@ public class QueueJobResult { var job = await _jobs.Get(task.JobId); if (job == null) { - _log.LogWarning("invalid {JobId}", task.JobId); + _log.LogWarning("invalid message {JobId}", task.JobId); + return; + } + + if (jr.CreatedAt == null) { + _log.LogWarning("invalid message, no created_at field {JobId}", task.JobId); return; } @@ -52,7 +57,7 @@ public class QueueJobResult { return; } - var jobResult = await _context.JobResultOperations.CreateOrUpdate(job.JobId, jobResultType, value); + var jobResult = await _context.JobResultOperations.CreateOrUpdate(job.JobId, jr.TaskId, jr.MachineId, jr.CreatedAt.Value, jr.Version, jobResultType, value); if (!jobResult.IsOk) { _log.LogError("failed to create or update with job result {JobId}", job.JobId); } diff --git a/src/ApiService/ApiService/OneFuzzTypes/Model.cs b/src/ApiService/ApiService/OneFuzzTypes/Model.cs index ab41853a74..4dd4000283 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Model.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Model.cs @@ -34,19 +34,6 @@ public enum HeartbeatType { TaskAlive, } -[SkipRename] -public enum JobResultType { - NewCrashingInput, - NoReproCrashingInput, - NewReport, - NewUniqueReport, - NewRegressionReport, - NewCoverage, - NewCrashDump, - CoverageData, - RuntimeStats, -} - public record HeartbeatData(HeartbeatType Type); public record TaskHeartbeatEntry( @@ -55,12 +42,14 @@ public record TaskHeartbeatEntry( [property: Required] Guid MachineId, HeartbeatData[] Data); -public record JobResultData(JobResultType Type); +public record JobResultData(string Type); public record TaskJobResultEntry( Guid TaskId, Guid? JobId, Guid MachineId, + DateTime? CreatedAt, + double Version, JobResultData Data, Dictionary Value ); @@ -921,26 +910,24 @@ public record SecretAddress(Uri Url) : ISecret { public record SecretData(ISecret Secret) { } +[SkipRename] +public enum JobResultType { + CoverageData, + RuntimeStats, +} + public record JobResult( - [PartitionKey][RowKey] Guid JobId, + [PartitionKey] Guid JobId, + [RowKey] string TaskIdMachineIdMetric, + Guid TaskId, + Guid MachineId, + DateTime CreatedAt, string Project, string Name, - double NewCrashingInput = 0, - double NoReproCrashingInput = 0, - double NewReport = 0, - double NewUniqueReport = 0, - double NewRegressionReport = 0, - double NewCrashDump = 0, - double InstructionsCovered = 0, - double TotalInstructions = 0, - double CoverageRate = 0, - double IterationCount = 0 -) : EntityBase() { - public JobResult(Guid JobId, string Project, string Name) : this( - JobId: JobId, - Project: Project, - Name: Name, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) { } -} + string Type, + double Version, + Dictionary MetricValue +) : EntityBase(); public record JobConfig( string Project, diff --git a/src/ApiService/ApiService/onefuzzlib/JobResultOperations.cs b/src/ApiService/ApiService/onefuzzlib/JobResultOperations.cs index 1166cf91d4..b39c654642 100644 --- a/src/ApiService/ApiService/onefuzzlib/JobResultOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/JobResultOperations.cs @@ -2,99 +2,75 @@ using Microsoft.Extensions.Logging; using Polly; namespace Microsoft.OneFuzz.Service; +using System.Net; public interface IJobResultOperations : IOrm { - Async.Task GetJobResult(Guid jobId); - Async.Task CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary resultValue); + Async.Task GetJobResult(Guid jobId, Guid taskId, Guid machineId, string metricType); + Async.Task CreateOrUpdate(Guid jobId, Guid taskId, Guid machineId, DateTime createdAt, double version, string resultType, Dictionary resultValue); } public class JobResultOperations : Orm, IJobResultOperations { + const string COVERAGE_DATA = "CoverageData"; + const string RUNTIME_STATS = "RuntimeStats"; + public JobResultOperations(ILogger log, IOnefuzzContext context) : base(log, context) { } - public async Async.Task GetJobResult(Guid jobId) { - return await SearchByPartitionKeys(new[] { jobId.ToString() }).SingleOrDefaultAsync(); + public async Async.Task GetJobResult(Guid jobId, Guid taskId, Guid machineId, string metricType) { + var data = QueryAsync(Query.SingleEntity(jobId.ToString(), string.Concat(taskId, "-", machineId, "-", metricType))); + return await data.FirstOrDefaultAsync(); } - private JobResult UpdateResult(JobResult result, JobResultType type, Dictionary resultValue) { - - var newResult = result; - double newValue; - switch (type) { - case JobResultType.NewCrashingInput: - newValue = result.NewCrashingInput + resultValue["count"]; - newResult = result with { NewCrashingInput = newValue }; - break; - case JobResultType.NewReport: - newValue = result.NewReport + resultValue["count"]; - newResult = result with { NewReport = newValue }; - break; - case JobResultType.NewUniqueReport: - newValue = result.NewUniqueReport + resultValue["count"]; - newResult = result with { NewUniqueReport = newValue }; - break; - case JobResultType.NewRegressionReport: - newValue = result.NewRegressionReport + resultValue["count"]; - newResult = result with { NewRegressionReport = newValue }; - break; - case JobResultType.NewCrashDump: - newValue = result.NewCrashDump + resultValue["count"]; - newResult = result with { NewCrashDump = newValue }; - break; - case JobResultType.CoverageData: - double newCovered = resultValue["covered"]; - double newTotalCovered = resultValue["features"]; - double newCoverageRate = resultValue["rate"]; - newResult = result with { InstructionsCovered = newCovered, TotalInstructions = newTotalCovered, CoverageRate = newCoverageRate }; - break; - case JobResultType.RuntimeStats: - double newTotalIterations = resultValue["total_count"]; - newResult = result with { IterationCount = newTotalIterations }; - break; - default: - _logTracer.LogWarning($"Invalid Field {type}."); - break; - } - _logTracer.LogInformation($"Attempting to log new result: {newResult}"); - return newResult; - } - - private async Async.Task TryUpdate(Job job, JobResultType resultType, Dictionary resultValue) { + private async Async.Task TryUpdate(Job job, Guid taskId, Guid machineId, DateTime createdAt, double version, string resultType, Dictionary resultValue) { var jobId = job.JobId; + var taskIdMachineIdMetric = string.Concat(taskId, "-", machineId, "-", resultType); - var jobResult = await GetJobResult(jobId); - - if (jobResult == null) { - _logTracer.LogInformation("Creating new JobResult for Job {JobId}", jobId); - - var entry = new JobResult(JobId: jobId, Project: job.Config.Project, Name: job.Config.Name); + var oldEntry = await GetJobResult(jobId, taskId, machineId, resultType); - jobResult = UpdateResult(entry, resultType, resultValue); - - var r = await Insert(jobResult); - if (!r.IsOk) { - throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}"); + if (oldEntry == null) { + _logTracer.LogInformation($"attempt to insert new job result {taskId} and taskId+machineId+metricType {taskIdMachineIdMetric}"); + var newEntry = new JobResult(JobId: jobId, TaskIdMachineIdMetric: taskIdMachineIdMetric, TaskId: taskId, MachineId: machineId, CreatedAt: createdAt, Project: job.Config.Project, Name: job.Config.Name, resultType, Version: version, resultValue); + var result = await Insert(newEntry); + if (!result.IsOk) { + throw new InvalidOperationException($"failed to insert job result with taskId {taskId} and taskId+machineId+metricType {taskIdMachineIdMetric}"); } - _logTracer.LogInformation("created job result {JobId}", jobResult.JobId); - } else { - _logTracer.LogInformation("Updating existing JobResult entry for Job {JobId}", jobId); - - jobResult = UpdateResult(jobResult, resultType, resultValue); + return true; + } - var r = await Update(jobResult); - if (!r.IsOk) { - throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}"); - } - _logTracer.LogInformation("updated job result {JobId}", jobResult.JobId); + ResultVoid<(HttpStatusCode Status, string Reason)> r; + switch (resultType) { + case COVERAGE_DATA: + case RUNTIME_STATS: + if (oldEntry.CreatedAt < createdAt) { + oldEntry = oldEntry with { CreatedAt = createdAt, MetricValue = resultValue }; + r = await Update(oldEntry); + if (!r.IsOk) { + throw new InvalidOperationException($"failed to replace job result with taskId {taskId} and machineId+metricType {taskIdMachineIdMetric}"); + } + } else { + _logTracer.LogInformation($"received an out-of-date metric. skipping."); + } + break; + default: + _logTracer.LogInformation($"attempt to update job result {taskId} and taskId+machineId+metricType {taskIdMachineIdMetric}"); + oldEntry.MetricValue["count"]++; + oldEntry = oldEntry with { MetricValue = oldEntry.MetricValue }; + r = await Update(oldEntry); + if (!r.IsOk) { + throw new InvalidOperationException($"failed to update job result with taskId {taskId} and machineId+metricType {taskIdMachineIdMetric}"); + } + break; } + return true; + } - public async Async.Task CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary resultValue) { + public async Async.Task CreateOrUpdate(Guid jobId, Guid taskId, Guid machineId, DateTime createdAt, double version, string resultType, Dictionary resultValue) { var job = await _context.JobOperations.Get(jobId); if (job == null) { @@ -106,7 +82,7 @@ public JobResultOperations(ILogger log, IOnefuzzContext con _logTracer.LogInformation("attempt to update job result {JobId}", job.JobId); var policy = Policy.Handle().WaitAndRetryAsync(50, _ => new TimeSpan(0, 0, 5)); await policy.ExecuteAsync(async () => { - success = await TryUpdate(job, resultType, resultValue); + success = await TryUpdate(job, taskId, machineId, createdAt, version, resultType, resultValue); _logTracer.LogInformation("attempt {success}", success); }); return OneFuzzResultVoid.Ok; diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 5393e9b767..e4143473e3 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -463,6 +463,7 @@ dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", + "serde", "winapi 0.3.9", ] @@ -2241,6 +2242,7 @@ version = "0.2.0" dependencies = [ "anyhow", "async-trait", + "chrono", "log", "onefuzz-telemetry", "reqwest", diff --git a/src/agent/onefuzz-result/Cargo.toml b/src/agent/onefuzz-result/Cargo.toml index 7c7de6615c..7e156ac91d 100644 --- a/src/agent/onefuzz-result/Cargo.toml +++ b/src/agent/onefuzz-result/Cargo.toml @@ -9,10 +9,14 @@ license = "MIT" [dependencies] anyhow = { version = "1.0", features = ["backtrace"] } async-trait = "0.1" +chrono = { version = "0.4", default-features = false, features = [ + "clock", + "std", + "serde" +] } reqwest = "0.11" serde = "1.0" storage-queue = { path = "../storage-queue" } uuid = { version = "1.4", features = ["serde", "v4"] } onefuzz-telemetry = { path = "../onefuzz-telemetry" } log = "0.4" - diff --git a/src/agent/onefuzz-result/src/job_result.rs b/src/agent/onefuzz-result/src/job_result.rs index 08f7bbc1ee..e6b4f50377 100644 --- a/src/agent/onefuzz-result/src/job_result.rs +++ b/src/agent/onefuzz-result/src/job_result.rs @@ -3,6 +3,8 @@ use anyhow::Result; use async_trait::async_trait; +use chrono::DateTime; +pub use chrono::Utc; use onefuzz_telemetry::warn; use reqwest::Url; use serde::{self, Deserialize, Serialize}; @@ -32,6 +34,8 @@ struct JobResult { job_id: Uuid, machine_id: Uuid, machine_name: String, + created_at: DateTime, + version: f64, data: JobResultData, value: HashMap, } @@ -103,7 +107,8 @@ impl JobResultSender for TaskJobResultClient { let job_id = self.context.state.job_id; let machine_id = self.context.state.machine_id; let machine_name = self.context.state.machine_name.clone(); - + let created_at = chrono::Utc::now(); + let version = 1.0; let _ = self .context .queue_client @@ -112,6 +117,8 @@ impl JobResultSender for TaskJobResultClient { job_id, machine_id, machine_name, + created_at, + version, data, value, })