Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions AkkaDotBootApi/Controllers/KafkaController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using AkkaDotModule.Kafka;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;

namespace AkkaDotBootApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class KafkaController : ControllerBase
{
private ProducerSystem producerSystem;

public KafkaController(ProducerSystem _producerSystem)
{
producerSystem = _producerSystem;
}

/// <summary>
/// Kafka 메시지 생성
/// 개수와 tps조절가능
///
/// testTopic : akka100
/// </summary>
[HttpGet("HelloActor-Tell")]
public int Kafka_ProducerMessage(int count,int tps)
{
List<string> messages = new List<string>();
for (int i = 0; i < count; i++)
{
messages.Add($"message-{i}");
}
producerSystem.SinkMessage("producer1", "akka100", messages, tps);
return 1;
}
}
}
65 changes: 6 additions & 59 deletions AkkaDotBootApi/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Reflection;
using Akka.Actor;
using AkkaDotBootApi.Actor;
using AkkaDotModule.ActorSample;
using AkkaDotModule.ActorUtils;
using Akka.Actor;
using AkkaDotBootApi.Test;
using AkkaDotModule.Config;
using AkkaDotModule.Kafka;
using AkkaDotModule.Models;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.OpenApi.Models;
using System;
using System.IO;
using System.Reflection;
using IApplicationLifetime = Microsoft.Extensions.Hosting.IApplicationLifetime;

namespace AkkaDotBootApi
Expand Down Expand Up @@ -131,56 +127,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IApplica

lifetime.ApplicationStarted.Register(() =>
{
// HelloActor 기본액터
AkkaLoad.RegisterActor("helloActor" /*AkkaLoad가 인식하는 유니크명*/,
actorSystem.ActorOf(Props.Create(() => new HelloActor("webnori")), "helloActor" /*AKKA가 인식하는 Path명*/
));

var helloActor = actorSystem.ActorSelection("user/helloActor");
var helloActor2 = AkkaLoad.ActorSelect("helloActor");

helloActor.Tell("hello");
helloActor2.Tell("hello");


// 밸브 Work : 초당 작업량을 조절
int timeSec = 1;
int elemntPerSec = 5;
var throttleWork = AkkaLoad.RegisterActor("throttleWork",
actorSystem.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)), "throttleWork"));

// 실제 Work : 밸브에 방출되는 Task를 개별로 처리
var worker = AkkaLoad.RegisterActor("worker", actorSystem.ActorOf(Props.Create<WorkActor>(), "worker"));

// 배브의 작업자를 지정
throttleWork.Tell(new SetTarget(worker));

// KAFKA 셋팅
// 각 System은 싱글톤이기때문에 DI를 통해 Controller에서 참조획득가능
var consumerSystem = app.ApplicationServices.GetService<ConsumerSystem>();
var producerSystem = app.ApplicationServices.GetService<ProducerSystem>();

//소비자 : 복수개의 소비자 생성가능
consumerSystem.Start(new ConsumerAkkaOption()
{
KafkaGroupId = "testGroup",
KafkaUrl = "kafka:9092",
RelayActor = null, //소비되는 메시지가 지정 액터로 전달되기때문에,처리기는 액터로 구현
Topics = "akka100"
});

//생산자 : 복수개의 생산자 생성가능
producerSystem.Start(new ProducerAkkaOption()
{
KafkaUrl = "kafka:9092",
ProducerName = "producer1"
});

List<string> messages = new List<string>();
//보너스 : 생산의 속도를 조절할수 있습니다.
int tps = 10;
producerSystem.SinkMessage("producer1", "akka100", messages, tps);

TestAkka.Run(app, actorSystem);
});
}
}
Expand Down
75 changes: 75 additions & 0 deletions AkkaDotBootApi/Test/TestAkka.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using Akka.Actor;
using AkkaDotBootApi.Actor;
using AkkaDotModule.ActorSample;
using AkkaDotModule.ActorUtils;
using AkkaDotModule.Config;
using AkkaDotModule.Kafka;
using AkkaDotModule.Models;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using System.Collections.Generic;

namespace AkkaDotBootApi.Test
{
public class TestAkka
{
static public void Run(IApplicationBuilder app, ActorSystem actorSystem)
{
// HelloActor 기본액터
AkkaLoad.RegisterActor("helloActor" /*AkkaLoad가 인식하는 유니크명*/,
actorSystem.ActorOf(Props.Create(() => new HelloActor("webnori")), "helloActor" /*AKKA가 인식하는 Path명*/
));

var helloActor = actorSystem.ActorSelection("user/helloActor");
var helloActor2 = AkkaLoad.ActorSelect("helloActor");

helloActor.Tell("hello");
helloActor2.Tell("hello");


// 밸브 Work : 초당 작업량을 조절
int timeSec = 1;
int elemntPerSec = 5;
var throttleWork = AkkaLoad.RegisterActor("throttleWork",
actorSystem.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)), "throttleWork"));

// 실제 Work : 밸브에 방출되는 Task를 개별로 처리
var worker = AkkaLoad.RegisterActor("worker", actorSystem.ActorOf(Props.Create<WorkActor>(), "worker"));

// 배브의 작업자를 지정
throttleWork.Tell(new SetTarget(worker));

// KAFKA 셋팅
// 각 System은 싱글톤이기때문에 DI를 통해 Controller에서 참조획득가능
var consumerSystem = app.ApplicationServices.GetService<ConsumerSystem>();
var producerSystem = app.ApplicationServices.GetService<ProducerSystem>();

//소비자 : 복수개의 소비자 생성가능
consumerSystem.Start(new ConsumerAkkaOption()
{
KafkaGroupId = "testGroup",
KafkaUrl = "kafka:9092",
RelayActor = null, //소비되는 메시지가 지정 액터로 전달되기때문에,처리기는 액터로 구현
Topics = "akka100"
});

//생산자 : 복수개의 생산자 생성가능
producerSystem.Start(new ProducerAkkaOption()
{
KafkaUrl = "kafka:9092",
ProducerName = "producer1"
});

List<string> messages = new List<string>();
for(int i=0; i < 10; i++)
{
messages.Add($"message-{i}");
}

//보너스 : 생산의 속도를 조절할수 있습니다.
int tps = 10;
producerSystem.SinkMessage("producer1", "akka100", messages, tps);

}
}
}
6 changes: 6 additions & 0 deletions AkkaDotModule.sln
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestAkkaDotModule", "TestAk
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AkkaDotBootApi", "AkkaDotBootApi\AkkaDotBootApi.csproj", "{FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}"
EndProject
Project("{E53339B2-1760-4266-BCC7-CA923CBCF16C}") = "docker-compose-infra", "Compose\docker-compose-infra.dcproj", "{A325C35E-28E0-43CE-B28F-516DC1568E18}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -36,6 +38,10 @@ Global
{FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}.Release|Any CPU.Build.0 = Release|Any CPU
{A325C35E-28E0-43CE-B28F-516DC1568E18}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A325C35E-28E0-43CE-B28F-516DC1568E18}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A325C35E-28E0-43CE-B28F-516DC1568E18}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A325C35E-28E0-43CE-B28F-516DC1568E18}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
2 changes: 1 addition & 1 deletion AkkaDotModule/AkkaDotModule.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>netstandard2.0</TargetFramework>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageId>AkkaDotModule.Webnori</PackageId>
<Version>1.0.5</Version>
<Version>1.0.6</Version>
<RepositoryUrl>https://github.com/psmon/AkkaDotModule</RepositoryUrl>
<PackageProjectUrl>https://github.com/psmon/AkkaDotModule</PackageProjectUrl>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion AkkaDotModule/Kafka/ProducerSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public ProducerSystem()
public void Start(ProducerAkkaOption producerAkkaOption)
{
materializer_producer = producerSystem.Materializer();
producerList[producerAkkaOption.KafkaUrl] = ProducerSettings<Null, string>.Create(producerSystem, null, null)
producerList[producerAkkaOption.ProducerName] = ProducerSettings<Null, string>.Create(producerSystem, null, null)
.WithBootstrapServers(producerAkkaOption.KafkaUrl);
}

Expand Down
15 changes: 15 additions & 0 deletions Compose/docker-compose-infra.dcproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" Sdk="Microsoft.Docker.Sdk">
<PropertyGroup Label="Globals">
<ProjectVersion>2.1</ProjectVersion>
<DockerTargetOS>Linux</DockerTargetOS>
<ProjectGuid>A325C35E-28E0-43CE-B28F-516DC1568E18</ProjectGuid>
<DockerLaunchAction>None</DockerLaunchAction>
<DockerServiceUrl>{Scheme}://localhost:{ServicePort}/swagger</DockerServiceUrl>
<DockerServiceName>akka-infra</DockerServiceName>
</PropertyGroup>
<ItemGroup>
<None Include="docker-compose.yml" />
<None Include=".dockerignore" />
</ItemGroup>
</Project>
55 changes: 55 additions & 0 deletions Compose/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
version: '3.4'

services:
mysql:
image: mysql:5.7
command: --default-authentication-plugin=mysql_native_password
restart: always
ports:
- 13306:3306
environment:
MYSQL_ROOT_PASSWORD: "root"
TZ: "Asia/Seoul"

postgres:
image: postgres:9.6
container_name: postgres
environment:
- POSTGRES_USER=docker
- POSTGRES_PASSWORD=docker
ports:
- '5432:5432'
volumes:
- postgre-data:/usr/local/psql/data

redis_auth:
image: bitnami/redis:5.0
environment:
- ALLOW_EMPTY_PASSWORD=yes
ports:
- 7000:6379
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
labels:
io.rancher.scheduler.affinity:host_label: platform=etc01
io.rancher.container.hostname_override: container_name
kafka:
hostname: kafka
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes

volumes:
elasticsearch-data:
driver: local
postgre-data:
driver: local

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ AKKA의 버전업에 항상 대응하는것이아닌, 유닛테스트를 통해

# 주요 릴리즈 노트

- 1.0.6 : Kafka 도커 인프라추가및, TestAPI 샘플 추가
- 1.0.5 : Kafka Stream 지원 : 액터시스템을 이용하여 Kafka를 더 심플하고 강력하게 사용가능합니다.
- 1.0.4 : AKKA 1.4.7 버전사용
- 1.0.3 : 메시지 우선순위([PriorityMessageMailbox](TestAkkaDotModule/TestActors/PriorityMessageMailboxTest.cs)) 처리기 추가
Expand Down