Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added message queues #8

Merged
merged 1 commit into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Server/Data/Models/UserInputResult/UserInputResultDto.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Data.Models
public class UserInputResultDto
{
public string ResultDescription { get; set; } = string.Empty;
public float BuyOrSell { get; set; } // range 0 (sell) - 1 (buy)
public float BuyOrSell { get; set; } // range -1 (sell) to 1 (buy)

public List<KeywordToCount> KeywordToCount { get; set; } = new List<KeywordToCount>();
}
Expand Down
15 changes: 15 additions & 0 deletions Server/MessageQueues/MessageConsumer/IMessageConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MessageQueues.MessageConsumer
{
public interface IMessageConsumer
{
string Name { get; }
void Start();
void Stop();
}
}
130 changes: 130 additions & 0 deletions Server/MessageQueues/MessageConsumer/MessageConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using log4net;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Text;
using Microsoft.Extensions.Configuration;
using MessageQueues.MessageConsumer;

namespace MessageConsumerService
{
public class MessageConsumer : IMessageConsumer
{
// Message consumer details
private string _name;
private string _queuePath;
private bool _isRunning;

// RabbitMQ config
private string _hostname;
private int _port;
private string _username;
private string _password;
private string _virtualHost;
private IConnection _connection;
private IModel _channel;
private EventingBasicConsumer _consumer;

// callback
Func<string, Task> _onMessageReceived;

public MessageConsumer(string name,
string queuePath,
string hostname,
int port,
string username,
string password,
string virtualHost,
Func<string, Task> onMessageReceived)
{
_onMessageReceived = onMessageReceived;

_name = name;
_queuePath = queuePath;
_isRunning = false;

_hostname = hostname;
_port = port;
_username = username;
_password = password;
_virtualHost = virtualHost;

Initialize();
}

private void Initialize()
{
ConnectionFactory factory = new ConnectionFactory
{
HostName = _hostname,
Port = _port,
UserName = _username,
Password = _password,
VirtualHost = _virtualHost,
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(
queue: _queuePath,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);

_consumer = new EventingBasicConsumer(_channel);
}

public string Name
{
get { return _name; }
}

public void Start()
{
if (!_isRunning)
{
_isRunning = true;

}

_consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
await _onMessageReceived(message);

};

_channel.BasicConsume(
queue: _queuePath,
autoAck: false,
consumer: _consumer
);
Console.WriteLine($"Message receiver for {Name} is started.");

}

public void Stop()
{
if (_isRunning)
{
_isRunning = false;
_consumer.Received -= async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
};
Console.WriteLine($"Message receiver for {Name} is stopped.");
}

}
}
}
13 changes: 13 additions & 0 deletions Server/MessageQueues/MessageProducer/IMessageProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MessageQueues.MessageProducer
{
public interface IMessageProducer
{
void SendMessage<T>(T message);
}
}
70 changes: 70 additions & 0 deletions Server/MessageQueues/MessageProducer/MessageProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using RabbitMQ.Client;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using System.Text;
using log4net;
using System.Reflection;
using RabbitMQ.Client.Events;

namespace MessageQueues.MessageProducer
{
public class MessageProducer : IMessageProducer
{
// utils
private ILog _logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);

// RabbitMQ config
private string _queuePath;
private string _hostname;
private int _port;
private string _username;
private string _password;
private string _virtualHost;
private IConnection _connection;
private IModel _channel;

public MessageProducer(string queuePath, string hostname, int port, string username, string password, string virtualHost)
{
_queuePath = queuePath;
_hostname = hostname;
_port = port;
_username = username;
_password = password;
_virtualHost = virtualHost;

Initialize();
}


public void SendMessage<T>(T message)
{
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);

_channel.BasicPublish(exchange: string.Empty,
routingKey: _queuePath,
body: body);

}

private void Initialize()
{
ConnectionFactory factory = new ConnectionFactory
{
HostName = _hostname,
Port = _port,
UserName = _username,
Password = _password,
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(
queue: _queuePath,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
}
}
}
16 changes: 16 additions & 0 deletions Server/MessageQueues/MessageQueues.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="log4net" Version="2.0.15" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="7.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
<PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
</ItemGroup>

</Project>
8 changes: 7 additions & 1 deletion Server/Server.sln
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.4.33122.133
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WebAPI", "WebAPI\WebAPI.csproj", "{688EFA86-9788-48BD-AF10-0000AEBBA808}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebAPI", "WebAPI\WebAPI.csproj", "{688EFA86-9788-48BD-AF10-0000AEBBA808}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Data", "Data\Data.csproj", "{3BF27425-9FBD-474F-BACA-5A3C928FE387}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessageQueues", "MessageQueues\MessageQueues.csproj", "{35FE8C9C-885E-4601-A0A2-7FBA7F475D04}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -21,6 +23,10 @@ Global
{3BF27425-9FBD-474F-BACA-5A3C928FE387}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3BF27425-9FBD-474F-BACA-5A3C928FE387}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3BF27425-9FBD-474F-BACA-5A3C928FE387}.Release|Any CPU.Build.0 = Release|Any CPU
{35FE8C9C-885E-4601-A0A2-7FBA7F475D04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{35FE8C9C-885E-4601-A0A2-7FBA7F475D04}.Debug|Any CPU.Build.0 = Debug|Any CPU
{35FE8C9C-885E-4601-A0A2-7FBA7F475D04}.Release|Any CPU.ActiveCfg = Release|Any CPU
{35FE8C9C-885E-4601-A0A2-7FBA7F475D04}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1 change: 1 addition & 0 deletions Server/WebAPI/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Data;
using MessageQueues.MessageProducer;
using Microsoft.EntityFrameworkCore;
using WebAPI.Services;

Expand Down
32 changes: 32 additions & 0 deletions Server/WebAPI/Services/Business/UserInputService.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,43 @@
using Data.Models;
using MessageQueues.MessageProducer;
using Newtonsoft.Json;
using RabbitMQ.Client;

namespace WebAPI.Services
{
public class UserInputService : IUserInputService
{

private readonly IMessageProducer _messageProducer;
private readonly IConfiguration _configuration;

public UserInputService(IConfiguration configuration)
{
_configuration = configuration;

_messageProducer = new MessageProducer(
queuePath: "userId",
hostname: string.IsNullOrEmpty(_configuration["RabbitMQHostname"]) ? ConnectionFactory.DefaultUser : _configuration["RabbitMQHostname"] ?? ConnectionFactory.DefaultUser,
port: string.IsNullOrEmpty(_configuration["RabbitMQPort"]) ? Protocols.DefaultProtocol.DefaultPort : int.Parse(_configuration["RabbitMQPort"] ?? Protocols.DefaultProtocol.DefaultPort.ToString()),
username: string.IsNullOrEmpty(_configuration["RabbitMQUsername"]) ? ConnectionFactory.DefaultUser : _configuration["RabbitMQUsername"] ?? ConnectionFactory.DefaultUser,
password: string.IsNullOrEmpty(_configuration["RabbitMQPassword"]) ? ConnectionFactory.DefaultPass : _configuration["RabbitMQPassword"] ?? ConnectionFactory.DefaultPass,
virtualHost: string.IsNullOrEmpty(_configuration["RabbitMQVirtualHost"]) ? ConnectionFactory.DefaultVHost : _configuration["RabbitMQVirtualHost"] ?? ConnectionFactory.DefaultVHost
);
}

public async Task<bool> HandleCreateInput(UserInput userInput)
{
var jsonMessage = JsonConvert.SerializeObject(userInput, new JsonSerializerSettings { ContractResolver = new Newtonsoft.Json.Serialization.CamelCasePropertyNamesContractResolver(), ReferenceLoopHandling = ReferenceLoopHandling.Ignore });

try
{
_messageProducer.SendMessage(jsonMessage);
}
catch
{
return false;
}

return true;
}
}
Expand Down
1 change: 1 addition & 0 deletions Server/WebAPI/WebAPI.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

<ItemGroup>
<ProjectReference Include="..\Data\Data.csproj" />
<ProjectReference Include="..\MessageQueues\MessageQueues.csproj" />
</ItemGroup>

</Project>
5 changes: 5 additions & 0 deletions Server/WebAPI/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
"ConnectionStrings": {
"DefaultDB": "Data Source=host.docker.internal,42168; Initial Catalog=NotifyMe; Persist Security Info=True; User ID=iop; Password=ioppw; TrustServerCertificate=True"
},
"RabbitMQHostname": "host.docker.internal",
"RabbitMQPort": 5672,
"RabbitMQUsername": "helloworld",
"RabbitMQPassword": "iHMn#Q2tBAfCV8c",
"Host": "host.docker.internal",
"Logging": {
"LogLevel": {
"Default": "Information",
Expand Down