/
Startup.cs
242 lines (217 loc) · 9.75 KB
/
Startup.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using DurableFunctionsMonitor.DotNetBackend;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Hosting;
using Microsoft.Data.SqlClient;
[assembly: WebJobsStartup(typeof(Dfm.MsSql.Startup))]
namespace Dfm.MsSql
{
public class Startup : IWebJobsStartup
{
private static readonly string ConnString;
private static readonly string SchemaName = "dt";
static Startup()
{
ConnString = Environment.GetEnvironmentVariable("DFM_SQL_CONNECTION_STRING");
// Getting custom schema name passed to us by VsCode ext
string schemaName = Environment.GetEnvironmentVariable("AzureFunctionsJobHost__extensions__durableTask__storageProvider__schemaName");
if (!string.IsNullOrEmpty(schemaName))
{
SchemaName = schemaName;
}
}
public void Configure(IWebJobsBuilder builder)
{
DfmEndpoint.Setup(null, new DfmExtensionPoints
{
GetInstanceHistoryRoutine = (client, connName, hubName, instanceId) => Task.FromResult(GetInstanceHistory(client, connName, hubName, instanceId)),
GetParentInstanceIdRoutine = GetParentInstanceId,
GetTaskHubNamesRoutine = GetTaskHubNames
});
}
/// <summary>
/// Custom routine for fetching Task Hub names
/// </summary>
public static async Task<IEnumerable<string>> GetTaskHubNames(string connName)
{
var result = new List<string>();
string sql =
$@"SELECT DISTINCT
i.TaskHub as TaskHub
FROM
[{SchemaName}].Instances i";
using (var conn = new SqlConnection(ConnString))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
using (var reader = cmd.ExecuteReader())
{
while (await reader.ReadAsync())
{
result.Add(reader["TaskHub"].ToString());
}
}
}
}
return result;
}
/// <summary>
/// Custom routine for fetching parent orchestration id
/// </summary>
public static async Task<string> GetParentInstanceId(IDurableClient durableClient, string connName, string hubName, string instanceId)
{
string sql =
$@"SELECT
i.ParentInstanceID as ParentInstanceID
FROM
[{SchemaName}].Instances i
WHERE
i.InstanceID = @OrchestrationInstanceId AND i.TaskHub = @TaskHub";
using (var conn = new SqlConnection(ConnString))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
cmd.Parameters.Add(new SqlParameter("@OrchestrationInstanceId", SqlDbType.VarChar, 256) { Value = instanceId });
cmd.Parameters.Add(new SqlParameter("@TaskHub", SqlDbType.VarChar, 50) { Value = hubName });
using (var reader = cmd.ExecuteReader())
{
if (await reader.ReadAsync())
{
var parentInstanceId = reader["ParentInstanceID"];
if (parentInstanceId != null)
{
string parentInstanceIdString = parentInstanceId.ToString();
if (!string.IsNullOrWhiteSpace(parentInstanceIdString))
{
return parentInstanceIdString;
}
}
}
}
}
}
return null;
}
/// <summary>
/// Custom routine for fetching orchestration history
/// </summary>
public static IEnumerable<HistoryEvent> GetInstanceHistory(IDurableClient durableClient, string connName, string hubName, string instanceId)
{
string sql =
$@"SELECT
IIF(h2.TaskID IS NULL, h.Timestamp, h2.Timestamp) as Timestamp,
IIF(h2.TaskID IS NULL, h.EventType, h2.EventType) as EventType,
h.TaskID as EventId,
h.Name as Name,
IIF(h2.TaskID IS NULL, NULL, h.Timestamp) as ScheduledTime,
p1.Text as Input,
p2.Text as Result,
p2.Reason as Details,
cih.InstanceID as SubOrchestrationId
FROM
[{SchemaName}].History h
LEFT JOIN
[{SchemaName}].History h2
ON
(
h.EventType IN ('TaskScheduled', 'SubOrchestrationInstanceCreated')
AND
h2.EventType IN ('SubOrchestrationInstanceCompleted', 'SubOrchestrationInstanceFailed', 'TaskCompleted', 'TaskFailed')
AND
h.InstanceID = h2.InstanceID AND h.ExecutionID = h2.ExecutionID AND h.TaskHub = h2.TaskHub AND h.TaskID = h2.TaskID AND h.SequenceNumber != h2.SequenceNumber
)
LEFT JOIN
[{SchemaName}].Payloads p1
ON
p1.PayloadID = h.DataPayloadID AND p1.TaskHub = h.TaskHub AND p1.InstanceID = h.InstanceID
LEFT JOIN
[{SchemaName}].Payloads p2
ON
p2.PayloadID = h2.DataPayloadID AND p2.TaskHub = h2.TaskHub AND p2.InstanceID = h2.InstanceID
LEFT JOIN
(
select
cii.ParentInstanceID,
cii.InstanceID,
cii.TaskHub,
chh.TaskID
from
[{SchemaName}].Instances cii
INNER JOIN
[{SchemaName}].History chh
ON
(chh.InstanceID = cii.InstanceID AND chh.TaskHub = cii.TaskHub AND chh.EventType = 'ExecutionStarted')
) cih
ON
(cih.ParentInstanceID = h.InstanceID AND cih.TaskHub = h.TaskHub AND cih.TaskID = h.TaskID AND h.EventType = 'SubOrchestrationInstanceCreated')
WHERE
h.EventType IN
(
'ExecutionStarted', 'ExecutionCompleted', 'ExecutionFailed', 'ExecutionTerminated', 'TaskScheduled', 'SubOrchestrationInstanceCreated',
'ContinueAsNew', 'TimerCreated', 'TimerFired', 'EventRaised', 'EventSent'
)
AND
h.InstanceID = @OrchestrationInstanceId AND h.TaskHub = @TaskHub
ORDER BY
h.SequenceNumber";
using (var conn = new SqlConnection(ConnString))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
cmd.Parameters.Add(new SqlParameter("@OrchestrationInstanceId", SqlDbType.VarChar, 256) { Value = instanceId });
cmd.Parameters.Add(new SqlParameter("@TaskHub", SqlDbType.VarChar, 50) { Value = hubName });
using (var reader = cmd.ExecuteReader())
{
// Memorizing 'ExecutionStarted' event, to further correlate with 'ExecutionCompleted'
DateTimeOffset? executionStartedTimestamp = null;
while (reader.Read())
{
var evt = ToHistoryEvent(reader, executionStartedTimestamp);
if (evt.EventType == "ExecutionStarted")
{
executionStartedTimestamp = evt.Timestamp;
}
yield return evt;
}
}
}
}
}
private static HistoryEvent ToHistoryEvent(SqlDataReader reader, DateTimeOffset? executionStartTime)
{
var evt = new HistoryEvent
{
Timestamp = ((DateTime)reader["Timestamp"]).ToUniversalTime(),
EventType = reader["EventType"].ToString(),
EventId = reader["EventId"] is DBNull ? null : (int?)reader["EventId"],
Name = reader["Name"].ToString(),
Result = reader["Result"].ToString(),
Details = reader["Details"].ToString(),
SubOrchestrationId = reader["SubOrchestrationId"].ToString(),
};
var rawScheduledTime = reader["ScheduledTime"];
if (!(rawScheduledTime is DBNull))
{
evt.ScheduledTime = ((DateTime)rawScheduledTime).ToUniversalTime();
}
else if(evt.EventType == "ExecutionCompleted")
{
evt.ScheduledTime = executionStartTime?.ToUniversalTime();
}
if (evt.ScheduledTime.HasValue)
{
evt.DurationInMs = (evt.Timestamp - evt.ScheduledTime.Value).TotalMilliseconds;
}
return evt;
}
}
}