/
TestAkka.cs
257 lines (219 loc) · 11.1 KB
/
TestAkka.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
using Akka.Actor;
using Akka.Routing;
using AkkaDotBootApi.Actor;
using AkkaDotModule.ActorSample;
using AkkaDotModule.ActorUtils;
using AkkaDotModule.ActorUtils.Confluent;
using AkkaDotModule.Config;
using AkkaDotModule.Kafka;
using AkkaDotModule.Models;
using Confluent.Kafka;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace AkkaDotBootApi.Test
{
public class TestAkka
{
static public void Start(IApplicationBuilder app, ActorSystem actorSystem)
{
// http://wiki.webnori.com/display/AKKA/Actors
// HelloActor 기본액터
AkkaLoad.RegisterActor("helloActor" /*AkkaLoad가 인식하는 유니크명*/,
actorSystem.ActorOf(Props.Create(() => new HelloActor("webnori")),
"helloActor" /*AKKA가 인식하는 Path명*/));
var helloActor = actorSystem.ActorSelection("user/helloActor");
var helloActor2 = AkkaLoad.ActorSelect("helloActor");
helloActor.Tell("hello");
helloActor2.Tell("hello");
// 밸브 Work : 초당 작업량을 조절
int timeSec = 1;
int elemntPerSec = 5;
var throttleWork = AkkaLoad.RegisterActor("throttleWork",
actorSystem.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)),
"throttleWork"));
// 실제 Work : 밸브에 방출되는 Task를 개별로 처리
var worker = AkkaLoad.RegisterActor("worker", actorSystem.ActorOf(Props.Create<WorkActor>(),
"worker"));
// 밸브 작업자를 지정
throttleWork.Tell(new SetTarget(worker));
//StartLoadTestByRouter(app,actorSystem);
StartLoadTestByPingPong(app,actorSystem);
//StartKafkaTest(app, actorSystem);
}
static public void StartLoadTestByPingPong(IApplicationBuilder app, ActorSystem actorSystem)
{
//##################################################################
//##### TPS 측정편 - 액터성능
//##### 핑퐁
//#####
//##################################################################
//튜닝요소
//custom-dispatcher , custom-task-dispatcher , default-fork-join-dispatcher
string disPacther = "custom-task-dispatcher";
int pipongGroupCount = 3; // 핑퐁그룹,탁구대를 늘릴수있다. ( 2인1조)
int ballCount = 0; // 핑퐁에 사용된 공개수
// 무한전송 셋트...
for (int i = 0; i < pipongGroupCount; i++)
{
string actorFirstName = "infiniteReflectionActorA" + i;
string actorSecondName = "infiniteReflectionActorB" + i;
// 무한전송 Test Actor생성
var infiniteReflectionActorA = AkkaLoad.RegisterActor(actorFirstName,
actorSystem.ActorOf(Props.Create(() => new InfiniteReflectionActor()).WithDispatcher(disPacther),
actorFirstName));
var infiniteReflectionActorB = AkkaLoad.RegisterActor(actorSecondName,
actorSystem.ActorOf(Props.Create(() => new InfiniteReflectionActor()).WithDispatcher(disPacther),
actorSecondName));
//무한전송을 위한,응답대상을 크로스로 연결및 무한메시지 시작
infiniteReflectionActorA.Tell(infiniteReflectionActorB);
infiniteReflectionActorB.Tell(infiniteReflectionActorA);
for (int ballIdx = 0; ballIdx < ballCount; ballIdx++)
{
infiniteReflectionActorA.Tell(new InfiniteMessage()
{
Message = "서브A",
Count = 0
});
}
}
}
static public void StartLoadTestByRouter(IApplicationBuilder app, ActorSystem actorSystem)
{
//##################################################################
//##### TPS 측정편 - 액터성능
//##### 라우터
//#####
//##################################################################
int testHitCount = 400000;
int distributedCnt = 50;
//튜닝요소
//custom-dispatcher , custom-task-dispatcher , default-fork-join-dispatcher
string disPacther = "custom-task-dispatcher";
var roundPool = AkkaLoad.RegisterActor("roundPool",
actorSystem.ActorOf(Props.Create(() => new InfiniteReflectionActor()).WithDispatcher(disPacther)
.WithRouter(new RoundRobinPool(distributedCnt)),
"roundPool"));
// Wait for all tasks to complete.
Task[] tasks = new Task[distributedCnt];
for (int i = 0; i < distributedCnt; i++)
{
tasks[i] = Task.Run(() => {
for (int hitidx = 0; hitidx < testHitCount; hitidx++)
{
roundPool.Tell(new DistributedMessage()
{
Message = "test"
});
}
});
}
try
{
//Task.WaitAll(tasks);
Console.WriteLine("Completed,Tell");
}
catch (AggregateException ae)
{
Console.WriteLine("One or more exceptions occurred: ");
foreach (var ex in ae.Flatten().InnerExceptions)
Console.WriteLine(" {0}", ex.Message);
}
}
static public void StartKafkaTest(IApplicationBuilder app, ActorSystem actorSystem)
{
// 기호에따라 사용방식이 약간 다른 KAFKA를 선택할수 있습니다.
//##################################################################
//##### Confluent.Kafka를 Akka액터 모드로 연결한 모드로
//##### 보안연결이 지원하기때문에 Saas형태의 Kafka에 보안연결이 가능합니다.
//##### 커스텀한 액터를 생성하여,AkkaStream을 이해하고 직접 연결할수 있을때 유용합니다.
//##################################################################
//ProducerActor
var producerAkkaOption = new ProducerAkkaOption()
{
BootstrapServers = "webnori.servicebus.windows.net:9093",
ProducerName = "webnori-kafka",
SecurityOption = new KafkaSecurityOption()
{
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = "Endpoint=sb://webnori.servicebus.windows.net/;SharedAccessKeyName=MessageAccessKey;SharedAccessKey=GoWQotBdNiYoCYQEDeuxeKJQCa6iDC0A4TLK7UvvA1A=",
SslCaLocation = "./cacert.pem"
}
};
string producerActorName = "producerActor";
var producerActor = AkkaLoad.RegisterActor(producerActorName /*AkkaLoad가 인식하는 유니크명*/,
actorSystem.ActorOf(Props.Create(() =>
new ProducerActor(producerAkkaOption)),
producerActorName /*AKKA가 인식하는 Path명*/
));
producerActor.Tell(new BatchData()
{
Data = new KafkaTextMessage()
{
Topic = "akka100",
Message = "testData"
}
});
//ConsumerActor
var consumerAkkaOption = new ConsumerAkkaOption()
{
BootstrapServers = "webnori.servicebus.windows.net:9093",
Topics = "akka100",
AutoOffsetReset = AutoOffsetReset.Earliest,
KafkaGroupId = "akakTestGroup",
RelayActor = null, //작업자 액터를 연결하면, 소비메시지가 작업자에게 전달된다 ( 컨슘기능과 작업자 기능의 분리)
SecurityOption = new KafkaSecurityOption()
{
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = "Endpoint=sb://webnori.servicebus.windows.net/;SharedAccessKeyName=MessageAccessKey;SharedAccessKey=GoWQotBdNiYoCYQEDeuxeKJQCa6iDC0A4TLK7UvvA1A=",
SslCaLocation = "./cacert.pem"
}
};
string consumerActorName = "consumerActor";
var consumerActor = AkkaLoad.RegisterActor(consumerActorName /*AkkaLoad가 인식하는 유니크명*/,
actorSystem.ActorOf(Props.Create(() =>
new ConsumerActor(consumerAkkaOption)),
consumerActorName /*AKKA가 인식하는 Path명*/
));
//컨슈머를 작동시킨다.
consumerActor.Tell(new ConsumerStart());
//##################################################################
//##### Akka.Streams.Kafka(의존:Confluent.Kafka) 을 사용하는 모드로, Security(SSL)이 아직 지원되지 않습니다.
//##### Private으로 구성된, Kafka Pass 모드일때 사용가능합니다.
//##### AkkaStream.Kafka가 제공하는 스트림을 활용핼때 장점이 있습니다.
//##################################################################
// KAFKA -
// 각 System은 싱글톤이기때문에 DI를 통해 Controller에서 참조획득가능
var consumerSystem = app.ApplicationServices.GetService<ConsumerSystem>();
var producerSystem = app.ApplicationServices.GetService<ProducerSystem>();
//소비자 : 복수개의 소비자 생성가능
consumerSystem.Start(new ConsumerAkkaOption()
{
KafkaGroupId = "testGroup",
BootstrapServers = "kafka:9092",
RelayActor = null, //소비되는 메시지가 지정 액터로 전달되기때문에,처리기는 액터로 구현
Topics = "akka100",
});
//생산자 : 복수개의 생산자 생성가능
producerSystem.Start(new ProducerAkkaOption()
{
BootstrapServers = "kafka:9092",
ProducerName = "producer1",
});
List<string> messages = new List<string>();
for (int i = 0; i < 10; i++)
{
messages.Add($"message-{i}");
}
//보너스 : 생산의 속도를 조절할수 있습니다.
int tps = 10;
producerSystem.SinkMessage("producer1", "akka100", messages, tps);
}
}
}