Skip to content

Commit

Permalink
actors -> workers
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed May 18, 2024
1 parent f677233 commit fe30de8
Show file tree
Hide file tree
Showing 31 changed files with 91 additions and 56 deletions.
22 changes: 11 additions & 11 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,28 @@
"scrollbarSlider.activeBackground": "#288bb3",
"scrollbarSlider.background": "#19241b",
"scrollbarSlider.hoverBackground": "#288bb3",
"activityBar.activeBackground": "#d0c277",
"activityBar.background": "#d0c277",
"activityBar.activeBackground": "#26f0b3",
"activityBar.background": "#26f0b3",
"activityBar.foreground": "#15202b",
"activityBar.inactiveForeground": "#15202b99",
"activityBarBadge.background": "#318e7f",
"activityBarBadge.foreground": "#e7e7e7",
"activityBarBadge.background": "#cb6af5",
"activityBarBadge.foreground": "#15202b",
"commandCenter.border": "#15202b99",
"sash.hoverBorder": "#d0c277",
"statusBar.background": "#c3b151",
"sash.hoverBorder": "#26f0b3",
"statusBar.background": "#0fd499",
"statusBar.foreground": "#15202b",
"statusBarItem.hoverBackground": "#a7963a",
"statusBarItem.remoteBackground": "#c3b151",
"statusBarItem.hoverBackground": "#0ca477",
"statusBarItem.remoteBackground": "#0fd499",
"statusBarItem.remoteForeground": "#15202b",
"titleBar.activeBackground": "#c3b151",
"titleBar.activeBackground": "#0fd499",
"titleBar.activeForeground": "#15202b",
"titleBar.inactiveBackground": "#c3b15199",
"titleBar.inactiveBackground": "#0fd49999",
"titleBar.inactiveForeground": "#15202b99"
},
"rust-analyzer.linkedProjects": [
"./Cargo.toml"
],
"rust-analyzer.showUnlinkedFileNotification": false,
"rust-analyzer.checkOnSave": true,
"peacock.color": "#c3b151"
"peacock.color": "#0fd499"
}
38 changes: 38 additions & 0 deletions logs/error-kind/zerlog.log
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,41 @@ code: 65535 | message: Connection refused (os error 61) | due to: Redis, RMQ or
| time: 1715871583576 | method name: get_notif.redis_pool
code: 65535 | message: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1715871583957 | method name: get_notif.redis_pool
code: 65535 | message: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035270529 | method name: check.redis_pool
code: 65535 | message: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035272278 | method name: check.redis_pool
code: 65535 | message: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035272837 | method name: check.redis_pool
code: 65535 | message: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035273388 | method name: check.redis_pool
code: 65535 | message: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035273792 | method name: check.redis_pool
code: 65535 | message: REDIS: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035511246 | method name: check.redis_pool
code: 65535 | message: REDIS: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035512974 | method name: check.redis_pool
code: 65535 | message: REDIS: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035533122 | method name: check.redis_pool
code: 65535 | message: Error occurred while creating a new object: IO error: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035533128 | method name: ZerLogProducerActor.produce_pool
code: 65535 | message: REDIS: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035533976 | method name: check.redis_pool
code: 65535 | message: Error occurred while creating a new object: IO error: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035533981 | method name: ZerLogProducerActor.produce_pool
code: 65535 | message: REDIS: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035534466 | method name: check.redis_pool
code: 65535 | message: Error occurred while creating a new object: IO error: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035534473 | method name: ZerLogProducerActor.produce_pool
code: 65535 | message: REDIS: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035535337 | method name: check.redis_pool
code: 65535 | message: Error occurred while creating a new object: IO error: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035535342 | method name: ZerLogProducerActor.produce_pool
code: 65535 | message: REDIS: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035535957 | method name: check.redis_pool
code: 65535 | message: Error occurred while creating a new object: IO error: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035535961 | method name: ZerLogProducerActor.produce_pool
code: 65535 | message: REDIS: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035536652 | method name: check.redis_pool
code: 65535 | message: Error occurred while creating a new object: IO error: Connection refused (os error 61) | due to: Redis, RMQ or Seaorm Error
| time: 1716035536656 | method name: ZerLogProducerActor.produce_pool
2 changes: 1 addition & 1 deletion src/apis/http/v1/events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@


use crate::{
actors::cqrs::accessors::notif::{NotifAccessorActor, RequestNotifData},
workers::cqrs::accessors::notif::{NotifAccessorActor, RequestNotifData},
consts::{STORAGE_IO_ERROR_CODE},
};
use crate::{
Expand Down
3 changes: 1 addition & 2 deletions src/apis/http/v1/events/get.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@



use crate::{actors::cqrs::accessors::notif::NotifDataResponse, models::event::{NotifData, EventQuery}};
use crate::{workers::cqrs::accessors::notif::NotifDataResponse, models::event::{NotifData, EventQuery}};
use redis::AsyncCommands;
pub use super::*;

Expand Down Expand Up @@ -60,7 +60,6 @@ pub(self) async fn get_notif(
match redis_pool.get().await{
Ok(mut redis_conn) => {


// by default every incoming notif from the producer will be cached in redis
// while the consumer consumes them so we first try to get owner notif from redis
// otherwise we send message to accessor actor to fetch them from db.
Expand Down
11 changes: 4 additions & 7 deletions src/apis/http/v1/health/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ pub(self) async fn test_stream(
pub(self) async fn check(
req: HttpRequest,
app_state: web::Data<AppState>
// probably web::Path, web::Json, web::Query params
// ...
) -> HoopoeHttpResponse{

let app_storage = app_state.clone().app_storage.clone().unwrap();
Expand All @@ -142,9 +140,8 @@ pub(self) async fn check(
let zerlog_producer_actor = actors.producer_actors.zerlog_actor;

// check db health
if db_ping.is_err(){
let e = db_ping.unwrap_err();
let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
if let Err(e) = db_ping{
let source = format!("SEARORM: {}", &e.source().unwrap().to_string()); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let err_instance = crate::error::HoopoeErrorResponse::new(
*STORAGE_IO_ERROR_CODE, // error hex (u16) code
source.as_bytes().to_vec(), // text of error source in form of utf8 bytes
Expand All @@ -157,7 +154,7 @@ pub(self) async fn check(

// check redis health
if let Err(e) = redis_conn{
let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let source = format!("REDIS: {}", &e.source().unwrap().to_string()); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let err_instance = crate::error::HoopoeErrorResponse::new(
*STORAGE_IO_ERROR_CODE, // error hex (u16) code
source.as_bytes().to_vec(), // text of error source in form of utf8 bytes
Expand All @@ -170,7 +167,7 @@ pub(self) async fn check(

// check rmq health
if let Err(e) = rmq_conn{
let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let source = format!("RMQ: {}", &e.source().unwrap().to_string()); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let err_instance = crate::error::HoopoeErrorResponse::new(
*STORAGE_IO_ERROR_CODE, // error hex (u16) code
source.as_bytes().to_vec(), // text of error source in form of utf8 bytes
Expand Down
6 changes: 3 additions & 3 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ Coded by
*/

use crate::actors::consumers::notif::ConsumeNotif;
use crate::actors::producers::notif::ProduceNotif;
use crate::workers::consumers::notif::ConsumeNotif;
use crate::workers::producers::notif::ProduceNotif;
use crate::consts::APP_NAME;
use std::env;
use std::net::SocketAddr;
Expand Down Expand Up @@ -127,7 +127,7 @@ use migration::{Migrator, MigratorTrait};
// so accessing them from other crates and modules is
// like use crate::macro_name;

mod actors;
mod workers;
mod interfaces;
mod s3;
mod config;
Expand Down
16 changes: 8 additions & 8 deletions src/appstate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@

use std::collections::HashMap;
use actix::{Actor, Addr};
use crate::actors::cqrs::accessors::hoop::HoopAccessorActor;
use crate::actors::cqrs::mutators::hoop::HoopMutatorActor;
use crate::actors::cqrs::mutators::notif::NotifMutatorActor;
use crate::actors::cqrs::accessors::notif::NotifAccessorActor;
use crate::workers::cqrs::accessors::hoop::HoopAccessorActor;
use crate::workers::cqrs::mutators::hoop::HoopMutatorActor;
use crate::workers::cqrs::mutators::notif::NotifMutatorActor;
use crate::workers::cqrs::accessors::notif::NotifAccessorActor;
use crate::config::{Env as ConfigEnv, Context};
use crate::config::EnvExt;
use crate::s3::Storage;
use crate::actors::consumers::notif::NotifConsumerActor;
use crate::actors::producers::notif::NotifProducerActor;
use crate::actors::producers::zerlog::ZerLogProducerActor;
use crate::workers::consumers::notif::NotifConsumerActor;
use crate::workers::producers::notif::NotifProducerActor;
use crate::workers::producers::zerlog::ZerLogProducerActor;
use serde::{Serialize, Deserialize};
use crate::types::*;
use crate::consts::*;
use crate::storage;
use crate::actors::ws::servers::hoop::HoopServer;
use crate::workers::ws::servers::hoop::HoopServer;


#[derive(Clone)]
Expand Down
6 changes: 3 additions & 3 deletions src/error/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@


use crate::actors;
use crate::actors::producers::zerlog::ProduceNotif;
use crate::workers;
use crate::workers::producers::zerlog::ProduceNotif;
use crate::consts::APP_NAME;
use crate::models::http::Response;

Expand Down Expand Up @@ -618,7 +618,7 @@ impl From<(Vec<u8>, u16, ErrorKind, String)> for HoopoeErrorResponse{
impl HoopoeErrorResponse{

pub async fn new(code: u16, msg: Vec<u8>, kind: ErrorKind, method_name: &str,
producer_actor: Option<&Addr<actors::producers::zerlog::ZerLogProducerActor>>) -> Self{
producer_actor: Option<&Addr<workers::producers::zerlog::ZerLogProducerActor>>) -> Self{

let string_kind = &kind.to_string();
let mut err = HoopoeErrorResponse::from((msg.clone(), code, kind, method_name.to_string()));
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/purchase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use actix::Addr;
use crate::*;
use self::actors::producers::notif::NotifProducerActor;
use self::workers::producers::notif::NotifProducerActor;



Expand Down
2 changes: 1 addition & 1 deletion src/lockers/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ use serde::{Deserialize, Serialize};
use interfaces::purchase::ProductExt;
use crate::{consts::PURCHASE_DEMO_LOCK_MUTEX, *};

use self::actors::producers::notif::NotifProducerActor;
use self::workers::producers::notif::NotifProducerActor;



Expand Down
5 changes: 2 additions & 3 deletions src/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use self::types::{LapinPoolConnection, RedisPoolConnection};
| shared state storage
|----------------------
| redis
| redis async
| redis actor
| redis async pubsub conn
| redis actix actor
| redis distlock (locker)
| diesel postgres
| seaorm
| rmq
|
Expand Down
6 changes: 3 additions & 3 deletions src/server/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ macro_rules! bootsteap_http {
.wrap(Logger::new("%a %{User-Agent}i %t %P %r %s %b %T %D"))
.wrap(middleware::Compress::default())
/*
INIT WS SUBSCRIBE SERVICE
INIT WS SUBSCRIBE SERVICE CONTROLLER
*/
.service(
actix_web::web::scope("/v1/stream")
.configure(services::stream::init)
)
/*
INIT HEALTH SERIVE
INIT HEALTH SERIVE CONTROLLER
*/
.service(
actix_web::web::scope("/v1/health")
.configure(services::health::init)
)
/*
INIT EVENTS SERIVE
INIT EVENTS SERIVE CONTROLLER
*/
.service(
actix_web::web::scope("/v1/events")
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

use actix::prelude::*;
use deadpool_lapin::lapin::protocol::exchange;
use crate::actors::cqrs::mutators::notif::*;
use crate::actors::producers::zerlog::ZerLogProducerActor;
use crate::actors::producers::notif::ProduceNotif;
use crate::workers::cqrs::mutators::notif::*;
use crate::workers::producers::zerlog::ZerLogProducerActor;
use crate::workers::producers::notif::ProduceNotif;
use crate::models::event::NotifData;
use redis::{AsyncCommands, RedisResult};
use std::error::Error;
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use chrono::{DateTime, FixedOffset};
use deadpool_redis::{Connection, Manager, Pool};
use redis::{AsyncCommands, Commands};
use sea_orm::{ConnectionTrait, DatabaseConnection, EntityTrait, QueryFilter, Statement, Value};
use crate::actors::{consumers, producers::zerlog::ZerLogProducerActor};
use crate::workers::{consumers, producers::zerlog::ZerLogProducerActor};
use crate::types::RedisPoolConnection;
use crate::s3::Storage;
use crate::consts::PING_INTERVAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use consts::STORAGE_IO_ERROR_CODE;
use deadpool_redis::{Connection, Manager, Pool};
use redis::{AsyncCommands, Commands};
use sea_orm::{ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Statement, Value};
use crate::actors::producers::zerlog::ZerLogProducerActor;
use crate::{actors::consumers, models::event::NotifData};
use crate::workers::producers::zerlog::ZerLogProducerActor;
use crate::{workers::consumers, models::event::NotifData};
use crate::types::RedisPoolConnection;
use crate::s3::Storage;
use crate::consts::PING_INTERVAL;
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Serialize, Deserialize};
use actix::prelude::*;
use std::sync::Arc;
use actix::{Actor, AsyncContext, Context};
use crate::actors::producers::zerlog::ZerLogProducerActor;
use crate::workers::producers::zerlog::ZerLogProducerActor;
use crate::entities::hoops;
use crate::models::event::HoopEvent;
use crate::s3::Storage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use actix::prelude::*;
use std::future::IntoFuture;
use std::sync::Arc;
use actix::{Actor, AsyncContext, Context};
use crate::actors::consumers::notif;
use crate::actors::producers::zerlog::ZerLogProducerActor;
use crate::workers::consumers::notif;
use crate::workers::producers::zerlog::ZerLogProducerActor;
use crate::entities::hoops;
use crate::actors::producers::notif::ProduceNotif;
use crate::workers::producers::notif::ProduceNotif;
use crate::models::event::NotifData;
use crate::interfaces::passport;
use crate::s3::Storage;
Expand Down Expand Up @@ -165,12 +165,13 @@ impl NotifMutatorActor{

}

pub async fn update(&mut self, notif_id: i32){
pub async fn update(&mut self, notif_id: i32, notif_data: NotifData){

let storage = self.app_storage.as_ref().clone().unwrap();
let db = storage.get_seaorm_pool().await.unwrap();
let redis_pool = storage.get_redis_pool().await.unwrap();


// probably true the is_seen flag
// ...

Expand Down
3 changes: 2 additions & 1 deletion src/actors/mod.rs β†’ src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@


/*
app components based on actor worker:
background actor worker which execute async tasks atomically with
tokio spawn, select, rmq and mpsc jobq channels over mutex:
we have producer and consumer actors per each data streamer
we have mutator and accessor actors per each db model
we have ws session and server actors per each route
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use actix::{Actor, Addr, AsyncContext, Context};
use crate::actors::ws::sessions::hoop::HoopSession;
use crate::workers::ws::sessions::hoop::HoopSession;
use crate::s3::Storage;
use crate::consts::PING_INTERVAL;

Expand Down
File renamed without changes.
File renamed without changes.

0 comments on commit fe30de8

Please sign in to comment.