Skip to content

Commit b19b946

Browse files
authored
handle broker run service exception handling (actions#3163)
* handle run service exception handling * force fail always * format * format
1 parent 3db5c90 commit b19b946

File tree

1 file changed

+63
-65
lines changed

1 file changed

+63
-65
lines changed

src/Runner.Listener/JobDispatcher.cs

Lines changed: 63 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public interface IJobDispatcher : IRunnerService
3535
// This implementation of IJobDispatcher is not thread safe.
3636
// It is based on the fact that the current design of the runner is a dequeue
3737
// and processes one message from the message queue at a time.
38-
// In addition, it only executes one job every time,
38+
// In addition, it only executes one job every time,
3939
// and the server will not send another job while this one is still running.
4040
public sealed class JobDispatcher : RunnerService, IJobDispatcher
4141
{
@@ -546,13 +546,27 @@ await processChannel.SendAsync(
546546
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
547547

548548
var jobServer = await InitializeJobServerAsync(systemConnection);
549-
await LogWorkerProcessUnhandledException(jobServer, message, detailInfo);
550-
551-
// Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space.
552-
if (detailInfo.Contains(typeof(System.IO.IOException).ToString(), StringComparison.OrdinalIgnoreCase))
549+
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo };
550+
unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash;
551+
switch (jobServer)
553552
{
554-
Trace.Info($"Finish job with result 'Failed' due to IOException.");
555-
await ForceFailJob(jobServer, message, detailInfo);
553+
case IJobServer js:
554+
{
555+
await LogWorkerProcessUnhandledException(js, message, unhandledExceptionIssue);
556+
// Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space.
557+
if (detailInfo.Contains(typeof(System.IO.IOException).ToString(), StringComparison.OrdinalIgnoreCase))
558+
{
559+
Trace.Info($"Finish job with result 'Failed' due to IOException.");
560+
await ForceFailJob(js, message);
561+
}
562+
563+
break;
564+
}
565+
case IRunServer rs:
566+
await ForceFailJob(rs, message, unhandledExceptionIssue);
567+
break;
568+
default:
569+
throw new NotSupportedException($"JobServer type '{jobServer.GetType().Name}' is not supported.");
556570
}
557571
}
558572

@@ -644,7 +658,7 @@ await processChannel.SendAsync(
644658
}
645659
}
646660

647-
// wait worker to exit
661+
// wait worker to exit
648662
// if worker doesn't exit within timeout, then kill worker.
649663
completedTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));
650664

@@ -1131,86 +1145,70 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest
11311145
}
11321146

11331147
// log an error issue to job level timeline record
1134-
private async Task LogWorkerProcessUnhandledException(IRunnerService server, Pipelines.AgentJobRequestMessage message, string detailInfo)
1148+
private async Task LogWorkerProcessUnhandledException(IJobServer jobServer, Pipelines.AgentJobRequestMessage message, Issue issue)
11351149
{
1136-
if (server is IJobServer jobServer)
1150+
try
11371151
{
1138-
try
1139-
{
1140-
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
1141-
ArgUtil.NotNull(timeline, nameof(timeline));
1152+
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
1153+
ArgUtil.NotNull(timeline, nameof(timeline));
11421154

1143-
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
1144-
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
1155+
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
1156+
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
11451157

1146-
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo };
1147-
unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash;
1148-
jobRecord.ErrorCount++;
1149-
jobRecord.Issues.Add(unhandledExceptionIssue);
11501158

1151-
if (message.Variables.TryGetValue("DistributedTask.MarkJobAsFailedOnWorkerCrash", out var markJobAsFailedOnWorkerCrash) &&
1152-
StringUtil.ConvertToBoolean(markJobAsFailedOnWorkerCrash?.Value))
1153-
{
1154-
Trace.Info("Mark the job as failed since the worker crashed");
1155-
jobRecord.Result = TaskResult.Failed;
1156-
// mark the job as completed so service will pickup the result
1157-
jobRecord.State = TimelineRecordState.Completed;
1158-
}
1159+
jobRecord.ErrorCount++;
1160+
jobRecord.Issues.Add(issue);
11591161

1160-
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
1161-
}
1162-
catch (Exception ex)
1162+
if (message.Variables.TryGetValue("DistributedTask.MarkJobAsFailedOnWorkerCrash", out var markJobAsFailedOnWorkerCrash) &&
1163+
StringUtil.ConvertToBoolean(markJobAsFailedOnWorkerCrash?.Value))
11631164
{
1164-
Trace.Error("Fail to report unhandled exception from Runner.Worker process");
1165-
Trace.Error(ex);
1165+
Trace.Info("Mark the job as failed since the worker crashed");
1166+
jobRecord.Result = TaskResult.Failed;
1167+
// mark the job as completed so service will pickup the result
1168+
jobRecord.State = TimelineRecordState.Completed;
11661169
}
1170+
1171+
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
11671172
}
1168-
else
1173+
catch (Exception ex)
11691174
{
1170-
Trace.Info("Job server does not support handling unhandled exception yet, error message: {0}", detailInfo);
1171-
return;
1175+
Trace.Error("Fail to report unhandled exception from Runner.Worker process");
1176+
Trace.Error(ex);
11721177
}
11731178
}
11741179

11751180
// raise job completed event to fail the job.
1176-
private async Task ForceFailJob(IRunnerService server, Pipelines.AgentJobRequestMessage message, string detailInfo)
1181+
private async Task ForceFailJob(IJobServer jobServer, Pipelines.AgentJobRequestMessage message)
11771182
{
1178-
if (server is IJobServer jobServer)
1183+
try
11791184
{
1180-
try
1181-
{
1182-
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed);
1183-
await jobServer.RaisePlanEventAsync<JobCompletedEvent>(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
1184-
}
1185-
catch (Exception ex)
1186-
{
1187-
Trace.Error("Fail to raise JobCompletedEvent back to service.");
1188-
Trace.Error(ex);
1189-
}
1185+
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed);
1186+
await jobServer.RaisePlanEventAsync<JobCompletedEvent>(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
11901187
}
1191-
else if (server is IRunServer runServer)
1188+
catch (Exception ex)
11921189
{
1193-
try
1194-
{
1195-
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo };
1196-
var unhandledAnnotation = unhandledExceptionIssue.ToAnnotation();
1197-
var jobAnnotations = new List<Annotation>();
1198-
if (unhandledAnnotation.HasValue)
1199-
{
1200-
jobAnnotations.Add(unhandledAnnotation.Value);
1201-
}
1190+
Trace.Error("Fail to raise JobCompletedEvent back to service.");
1191+
Trace.Error(ex);
1192+
}
1193+
}
12021194

1203-
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, environmentUrl: null, CancellationToken.None);
1204-
}
1205-
catch (Exception ex)
1195+
private async Task ForceFailJob(IRunServer runServer, Pipelines.AgentJobRequestMessage message, Issue issue)
1196+
{
1197+
try
1198+
{
1199+
var annotation = issue.ToAnnotation();
1200+
var jobAnnotations = new List<Annotation>();
1201+
if (annotation.HasValue)
12061202
{
1207-
Trace.Error("Fail to raise job completion back to service.");
1208-
Trace.Error(ex);
1203+
jobAnnotations.Add(annotation.Value);
12091204
}
1205+
1206+
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, environmentUrl: null, CancellationToken.None);
12101207
}
1211-
else
1208+
catch (Exception ex)
12121209
{
1213-
throw new NotSupportedException($"Server type {server.GetType().FullName} is not supported.");
1210+
Trace.Error("Fail to raise job completion back to service.");
1211+
Trace.Error(ex);
12141212
}
12151213
}
12161214

0 commit comments

Comments
 (0)