diff --git a/AkkaDotBootApi/Controllers/KafkaController.cs b/AkkaDotBootApi/Controllers/KafkaController.cs new file mode 100644 index 0000000..b78d077 --- /dev/null +++ b/AkkaDotBootApi/Controllers/KafkaController.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using AkkaDotModule.Kafka; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; + +namespace AkkaDotBootApi.Controllers +{ + [Route("api/[controller]")] + [ApiController] + public class KafkaController : ControllerBase + { + private ProducerSystem producerSystem; + + public KafkaController(ProducerSystem _producerSystem) + { + producerSystem = _producerSystem; + } + + /// + /// Kafka 메시지 생성 + /// 개수와 tps조절가능 + /// + /// testTopic : akka100 + /// + [HttpGet("HelloActor-Tell")] + public int Kafka_ProducerMessage(int count,int tps) + { + List messages = new List(); + for (int i = 0; i < count; i++) + { + messages.Add($"message-{i}"); + } + producerSystem.SinkMessage("producer1", "akka100", messages, tps); + return 1; + } + } +} diff --git a/AkkaDotBootApi/Startup.cs b/AkkaDotBootApi/Startup.cs index b71defc..fa06d18 100644 --- a/AkkaDotBootApi/Startup.cs +++ b/AkkaDotBootApi/Startup.cs @@ -1,20 +1,16 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Reflection; -using Akka.Actor; -using AkkaDotBootApi.Actor; -using AkkaDotModule.ActorSample; -using AkkaDotModule.ActorUtils; +using Akka.Actor; +using AkkaDotBootApi.Test; using AkkaDotModule.Config; using AkkaDotModule.Kafka; -using AkkaDotModule.Models; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.OpenApi.Models; +using System; +using System.IO; +using System.Reflection; using IApplicationLifetime = Microsoft.Extensions.Hosting.IApplicationLifetime; namespace AkkaDotBootApi @@ -131,56 +127,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IApplica lifetime.ApplicationStarted.Register(() => { - // HelloActor 기본액터 - AkkaLoad.RegisterActor("helloActor" /*AkkaLoad가 인식하는 유니크명*/, - actorSystem.ActorOf(Props.Create(() => new HelloActor("webnori")), "helloActor" /*AKKA가 인식하는 Path명*/ - )); - - var helloActor = actorSystem.ActorSelection("user/helloActor"); - var helloActor2 = AkkaLoad.ActorSelect("helloActor"); - - helloActor.Tell("hello"); - helloActor2.Tell("hello"); - - - // 밸브 Work : 초당 작업량을 조절 - int timeSec = 1; - int elemntPerSec = 5; - var throttleWork = AkkaLoad.RegisterActor("throttleWork", - actorSystem.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)), "throttleWork")); - - // 실제 Work : 밸브에 방출되는 Task를 개별로 처리 - var worker = AkkaLoad.RegisterActor("worker", actorSystem.ActorOf(Props.Create(), "worker")); - - // 배브의 작업자를 지정 - throttleWork.Tell(new SetTarget(worker)); - - // KAFKA 셋팅 - // 각 System은 싱글톤이기때문에 DI를 통해 Controller에서 참조획득가능 - var consumerSystem = app.ApplicationServices.GetService(); - var producerSystem = app.ApplicationServices.GetService(); - - //소비자 : 복수개의 소비자 생성가능 - consumerSystem.Start(new ConsumerAkkaOption() - { - KafkaGroupId = "testGroup", - KafkaUrl = "kafka:9092", - RelayActor = null, //소비되는 메시지가 지정 액터로 전달되기때문에,처리기는 액터로 구현 - Topics = "akka100" - }); - - //생산자 : 복수개의 생산자 생성가능 - producerSystem.Start(new ProducerAkkaOption() - { - KafkaUrl = "kafka:9092", - ProducerName = "producer1" - }); - - List messages = new List(); - //보너스 : 생산의 속도를 조절할수 있습니다. - int tps = 10; - producerSystem.SinkMessage("producer1", "akka100", messages, tps); - + TestAkka.Run(app, actorSystem); }); } } diff --git a/AkkaDotBootApi/Test/TestAkka.cs b/AkkaDotBootApi/Test/TestAkka.cs new file mode 100644 index 0000000..e28499c --- /dev/null +++ b/AkkaDotBootApi/Test/TestAkka.cs @@ -0,0 +1,75 @@ +using Akka.Actor; +using AkkaDotBootApi.Actor; +using AkkaDotModule.ActorSample; +using AkkaDotModule.ActorUtils; +using AkkaDotModule.Config; +using AkkaDotModule.Kafka; +using AkkaDotModule.Models; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using System.Collections.Generic; + +namespace AkkaDotBootApi.Test +{ + public class TestAkka + { + static public void Run(IApplicationBuilder app, ActorSystem actorSystem) + { + // HelloActor 기본액터 + AkkaLoad.RegisterActor("helloActor" /*AkkaLoad가 인식하는 유니크명*/, + actorSystem.ActorOf(Props.Create(() => new HelloActor("webnori")), "helloActor" /*AKKA가 인식하는 Path명*/ + )); + + var helloActor = actorSystem.ActorSelection("user/helloActor"); + var helloActor2 = AkkaLoad.ActorSelect("helloActor"); + + helloActor.Tell("hello"); + helloActor2.Tell("hello"); + + + // 밸브 Work : 초당 작업량을 조절 + int timeSec = 1; + int elemntPerSec = 5; + var throttleWork = AkkaLoad.RegisterActor("throttleWork", + actorSystem.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)), "throttleWork")); + + // 실제 Work : 밸브에 방출되는 Task를 개별로 처리 + var worker = AkkaLoad.RegisterActor("worker", actorSystem.ActorOf(Props.Create(), "worker")); + + // 배브의 작업자를 지정 + throttleWork.Tell(new SetTarget(worker)); + + // KAFKA 셋팅 + // 각 System은 싱글톤이기때문에 DI를 통해 Controller에서 참조획득가능 + var consumerSystem = app.ApplicationServices.GetService(); + var producerSystem = app.ApplicationServices.GetService(); + + //소비자 : 복수개의 소비자 생성가능 + consumerSystem.Start(new ConsumerAkkaOption() + { + KafkaGroupId = "testGroup", + KafkaUrl = "kafka:9092", + RelayActor = null, //소비되는 메시지가 지정 액터로 전달되기때문에,처리기는 액터로 구현 + Topics = "akka100" + }); + + //생산자 : 복수개의 생산자 생성가능 + producerSystem.Start(new ProducerAkkaOption() + { + KafkaUrl = "kafka:9092", + ProducerName = "producer1" + }); + + List messages = new List(); + for(int i=0; i < 10; i++) + { + messages.Add($"message-{i}"); + } + + //보너스 : 생산의 속도를 조절할수 있습니다. + int tps = 10; + producerSystem.SinkMessage("producer1", "akka100", messages, tps); + + } + } +} diff --git a/AkkaDotModule.sln b/AkkaDotModule.sln index 521943d..4309ee8 100644 --- a/AkkaDotModule.sln +++ b/AkkaDotModule.sln @@ -18,6 +18,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestAkkaDotModule", "TestAk EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AkkaDotBootApi", "AkkaDotBootApi\AkkaDotBootApi.csproj", "{FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}" EndProject +Project("{E53339B2-1760-4266-BCC7-CA923CBCF16C}") = "docker-compose-infra", "Compose\docker-compose-infra.dcproj", "{A325C35E-28E0-43CE-B28F-516DC1568E18}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -36,6 +38,10 @@ Global {FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}.Debug|Any CPU.Build.0 = Debug|Any CPU {FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}.Release|Any CPU.ActiveCfg = Release|Any CPU {FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}.Release|Any CPU.Build.0 = Release|Any CPU + {A325C35E-28E0-43CE-B28F-516DC1568E18}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A325C35E-28E0-43CE-B28F-516DC1568E18}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A325C35E-28E0-43CE-B28F-516DC1568E18}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A325C35E-28E0-43CE-B28F-516DC1568E18}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/AkkaDotModule/AkkaDotModule.csproj b/AkkaDotModule/AkkaDotModule.csproj index 20b4401..e163f7c 100644 --- a/AkkaDotModule/AkkaDotModule.csproj +++ b/AkkaDotModule/AkkaDotModule.csproj @@ -4,7 +4,7 @@ netstandard2.0 true AkkaDotModule.Webnori - 1.0.5 + 1.0.6 https://github.com/psmon/AkkaDotModule https://github.com/psmon/AkkaDotModule diff --git a/AkkaDotModule/Kafka/ProducerSystem.cs b/AkkaDotModule/Kafka/ProducerSystem.cs index d414e77..2b3ba61 100644 --- a/AkkaDotModule/Kafka/ProducerSystem.cs +++ b/AkkaDotModule/Kafka/ProducerSystem.cs @@ -39,7 +39,7 @@ public ProducerSystem() public void Start(ProducerAkkaOption producerAkkaOption) { materializer_producer = producerSystem.Materializer(); - producerList[producerAkkaOption.KafkaUrl] = ProducerSettings.Create(producerSystem, null, null) + producerList[producerAkkaOption.ProducerName] = ProducerSettings.Create(producerSystem, null, null) .WithBootstrapServers(producerAkkaOption.KafkaUrl); } diff --git a/Compose/docker-compose-infra.dcproj b/Compose/docker-compose-infra.dcproj new file mode 100644 index 0000000..2f83150 --- /dev/null +++ b/Compose/docker-compose-infra.dcproj @@ -0,0 +1,15 @@ + + + + 2.1 + Linux + A325C35E-28E0-43CE-B28F-516DC1568E18 + None + {Scheme}://localhost:{ServicePort}/swagger + akka-infra + + + + + + \ No newline at end of file diff --git a/Compose/docker-compose.yml b/Compose/docker-compose.yml new file mode 100644 index 0000000..6c55e21 --- /dev/null +++ b/Compose/docker-compose.yml @@ -0,0 +1,55 @@ +version: '3.4' + +services: + mysql: + image: mysql:5.7 + command: --default-authentication-plugin=mysql_native_password + restart: always + ports: + - 13306:3306 + environment: + MYSQL_ROOT_PASSWORD: "root" + TZ: "Asia/Seoul" + + postgres: + image: postgres:9.6 + container_name: postgres + environment: + - POSTGRES_USER=docker + - POSTGRES_PASSWORD=docker + ports: + - '5432:5432' + volumes: + - postgre-data:/usr/local/psql/data + + redis_auth: + image: bitnami/redis:5.0 + environment: + - ALLOW_EMPTY_PASSWORD=yes + ports: + - 7000:6379 + zookeeper: + image: 'bitnami/zookeeper:latest' + ports: + - '2181:2181' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + labels: + io.rancher.scheduler.affinity:host_label: platform=etc01 + io.rancher.container.hostname_override: container_name + kafka: + hostname: kafka + image: 'bitnami/kafka:latest' + ports: + - '9092:9092' + environment: + - KAFKA_ADVERTISED_HOST_NAME=kafka + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + +volumes: + elasticsearch-data: + driver: local + postgre-data: + driver: local + \ No newline at end of file diff --git a/README.md b/README.md index 0f3472a..d11ff98 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ AKKA의 버전업에 항상 대응하는것이아닌, 유닛테스트를 통해 # 주요 릴리즈 노트 +- 1.0.6 : Kafka 도커 인프라추가및, TestAPI 샘플 추가 - 1.0.5 : Kafka Stream 지원 : 액터시스템을 이용하여 Kafka를 더 심플하고 강력하게 사용가능합니다. - 1.0.4 : AKKA 1.4.7 버전사용 - 1.0.3 : 메시지 우선순위([PriorityMessageMailbox](TestAkkaDotModule/TestActors/PriorityMessageMailboxTest.cs)) 처리기 추가