-
Notifications
You must be signed in to change notification settings - Fork 135
/
ChangeFeedFunction.cs
91 lines (78 loc) · 3.5 KB
/
ChangeFeedFunction.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using StockService.Documents;
using StockService.Entities;
namespace StockService.StockProcessor
{
public class ChangeFeedFunction
{
public ChangeFeedFunction(StockDbContext context, TelemetryConfiguration telemetryConfiguration)
{
_context = context;
_telemetryClient = new TelemetryClient(telemetryConfiguration);
}
private readonly StockDbContext _context;
private readonly TelemetryClient _telemetryClient;
[FunctionName(nameof(ChangeFeedFunction))]
public async Task Run([CosmosDBTrigger("StockBackend", "StockTransaction", ConnectionStringSetting = "CosmosDBConnection", LeaseCollectionName = "leases", LeaseDatabaseName = "StockBackend",
LeasesCollectionThroughput = 400, CreateLeaseCollectionIfNotExists = true, FeedPollDelay = 500)]
JArray input,
IBinder binder,
ILogger log)
{
if (input == null || input.Count <= 0)
{
return;
}
// StockDocument へデシリアライズ
var documents = input.ToObject<StockDocument[]>();
// 在庫情報を SQL DB に書き込む
var entities = documents.SelectMany(x => x.Items.Select(xs => new StockEntity
{
DocumentId = x.Id,
TransactionId = x.TransactionId,
TransactionDate = x.TransactionDate,
TransactionType = x.TransactionType,
LocationCode = x.LocationCode,
CompanyCode = x.CompanyCode,
StoreCode = x.StoreCode,
TerminalCode = x.TerminalCode,
LineNo = xs.LineNo,
ItemCode = xs.ItemCode,
Quantity = -xs.Quantity
}));
await _context.Stocks.AddRangeAsync(entities);
await _context.SaveChangesAsync();
// SignalR Service への接続文字列がセットされている場合のみ有効化
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("SignalRConnection", EnvironmentVariableTarget.Process)))
{
var signalRMessages = binder.Bind<IAsyncCollector<SignalRMessage>>(new SignalRAttribute { ConnectionStringSetting = "SignalRConnection", HubName = "monitor" });
// 変更通知を SignalR で送信する
foreach (var document in documents)
{
foreach (var item in document.Items)
{
await signalRMessages.AddAsync(new SignalRMessage
{
Target = "update",
Arguments = new object[] { document.TerminalCode, item.ItemCode }
});
}
}
}
// Application Insights に通知
foreach (var document in documents)
{
_telemetryClient.TrackTrace("End Stock Processor", new Dictionary<string, string> { { "ActivityId", document.ActivityId } });
}
}
}
}