Skip to content

Commit

Permalink
hanging around the code fixing things not much heavy!
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed May 14, 2024
1 parent f66084e commit db80297
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 87 deletions.
20 changes: 4 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,14 @@ rslock = "0.3.0"
tonic = "0.11.0"
futures = "0.3"
deadpool-lapin = { version = "0.12.1", features = ["serde", "rt_tokio_1"] }
deadpool-redis = "0.15.0"
deadpool-redis = "0.15.1"
futures-util = "0.3"
serde_json = "1"
once_cell = "1"
uuid = { version = "1", features = ["serde", "v4"] }
chrono = { version = "0.4", features = ["serde"] }
log = "0.4"
dotenv = "0.15"
rand_chacha = "0.3"
chacha20 = "0.9"
web3 = "0.19"
pretty_env_logger = "0.5.0"
sanitize-filename = "0.5"
log4rs = "1.3.0"
Expand All @@ -69,6 +66,8 @@ prost = "0.12"
prost-types = "0.12"
themis = "0.14.0"
base58 = "0.2.0"
rand_chacha = "0.3"
chacha20 = "0.9"

[[bin]]
name = "hoopoe"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ i'm hoopoe, the social event platform to hoop!
> [!TIP]
> any notification coming from different components or other service actor workers must be done accroding to the following steps:
- **step0)** a register notif api call can be written to register either a producer or a consumer in the bakcground.
- **step0)** a register notif api can be written to register either a producer or a consumer in the bakcground.

- **step1)** producer service actor sends `NotifData` to exchange.

Expand Down Expand Up @@ -198,7 +198,7 @@ cargo run --bin hooper -- --help

- **step9)** each internal image name inside your compose file must be prefixed with your docker hub registry endpoint which currently the hub has setup to `docker.youwho.club` endpoint, doing so tells docker to pull images from there cause as we know this subdoamin is already pointing to the docker registry hosted on `localhost:5000` on VPS.

> **current hub registry is set to `docker.youwho.club`.**
> **current hub registry is set to `docker.youwho.club` and the `infra` folder on the VPS would be the place where the `docker-compose.yml` file is in**
#### β˜• What's happening inside the `cicd.yml` file?

Expand Down
22 changes: 14 additions & 8 deletions hooper/grpc/event/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ use wallexerr::misc::Wallet;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use log::*;

// future objects must be pinned to use them as object safe trait along with dynamic dispatching
// response stream contains an item of type Result<EventResponse, Status>, the stream response
// is a pinned future trait object which sendable, think of response stream as an iterator object
// contains multiple future objects of pack of u8 bytes that must get streamed over each by calling
// next() method so this is called streaming over future objects.
// if Future<Output = T> is an asynchronous version of T, then Stream<Item = T> is an asynchronous
// version of Iterator<Item = T>. A stream represents a sequence of value-producing events that occur
// asynchronously to the caller.
/* -Λ‹Λβœ„β”ˆβ”ˆβ”ˆβ”ˆ
future objects must be pinned to use them as object safe trait along with dynamic dispatching
response stream contains an item of type Result<EventResponse, Status>, the stream response
is a pinned future trait object which sendable, think of response stream as an iterator object
contains multiple future objects of pack of u8 bytes that must get streamed over each by calling
next() method so this is called streaming over future objects.
if Future<Output = T> is an asynchronous version of T, then Stream<Item = T> is an asynchronous
version of Iterator<Item = T>. A stream represents a sequence of value-producing events that occur
asynchronously to the caller.
*/
type ResponseStream = std::pin::Pin<Box<dyn Stream<Item = Result<EventResponse, Status>> + Send >>;
type EventResult<T> = Result<TonicResponse<T>, Status>;

Expand Down Expand Up @@ -48,6 +50,10 @@ impl EventServer{

fn restart() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>{

// if there was an already server is started
// then restart it
// ...

Ok(())

}
Expand Down
15 changes: 7 additions & 8 deletions hooper/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ macro_rules! bootstrap_grpc {
// ...
) => {
{
tokio::spawn(async move{
let addr = format!("{}:{}",
std::env::var("HOST").unwrap(),
std::env::var("GRPC_PORT").unwrap().parse::<u16>().unwrap()
).parse::<SocketAddr>().unwrap();
EventServer::start(addr).await;
});
let addr = format!("{}:{}",
std::env::var("HOST").unwrap(),
std::env::var("GRPC_PORT").unwrap().parse::<u16>().unwrap()
).parse::<SocketAddr>().unwrap();

EventServer::start(addr).await;

loop{}
Ok(())
}
};
}
4 changes: 2 additions & 2 deletions hooper/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ macro_rules! bootstrap_tcp {
data decryption we can accept the client connections cause it's
secure and safe.
*/
tokio::spawn(async move{

let (mut secure_cell, wallet) = consts::SECURECELLCONFIG_TCPWALLET.to_owned(); // this must be shared between clients and server to stablish a secure connection
let tcp_addr = format!(
"{}:{}",
Expand All @@ -255,8 +255,8 @@ macro_rules! bootstrap_tcp {
// let tcp_listener_actor_address = listener_actor.start(); // this will be run but shows the above error
// --------------------
listener_actor.start_streaming().await;
});

// keep the app constantly running
loop{}
}
};
Expand Down
10 changes: 3 additions & 7 deletions src/actors/consumers/notif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Actor for NotifConsumerActor{
// check something constantly, schedule to be executed
// at a certain time in the background
// ...

});

});
Expand Down Expand Up @@ -223,12 +223,10 @@ impl NotifConsumerActor{
match get_notif_event{
Ok(notif_event) => {

// caching in redis
match redis_pool.get().await{
Ok(mut redis_conn) => {

// key : String::from(notif_receiver.id)
// value: Vec<NotifData>
// key: String::from(notif_receiver.id) | value: Vec<NotifData>
let redis_notif_key = format!("notif_owner:{}", &notif_event.receiver_info);

// -Λ‹Λβœ„β”ˆβ”ˆβ”ˆβ”ˆ extend notifs
Expand All @@ -252,7 +250,6 @@ impl NotifConsumerActor{
"NotifConsumerActor.decode_serde_redis", // method
Some(&zerlog_producer_actor)
).await;

return; // terminate the caller
}
}
Expand All @@ -270,7 +267,6 @@ impl NotifConsumerActor{
Some(&zerlog_producer_actor)
).await;


// we can't get the key means this is the first time we're creating the key
// or the key is expired already, we'll create a new key either way and put
// the init message in there.
Expand Down Expand Up @@ -318,7 +314,7 @@ impl NotifConsumerActor{
})
.await
{
Ok(_) => {},
Ok(_) => { () },
Err(e) => {
let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let err_instance = crate::error::HoopoeErrorResponse::new(
Expand Down
8 changes: 3 additions & 5 deletions src/apis/http/v1/events/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use super::*;


/* -Λ‹Λβœ„β”ˆβ”ˆβ”ˆβ”ˆ
>_ this route is used to either produce a data to RMQ exchange
>_ this route is used mainly to either produce a data to RMQ exchange
or start consuming in the background, look at the postman
collection for more details.
*/
Expand All @@ -23,7 +23,6 @@ pub(self) async fn register_notif(
let get_producer_info = register_notif_req.clone().producer_info;
let get_consumer_info = register_notif_req.clone().consumer_info;


match req.check_token_time(app_state.clone(), "write").await{
Ok(token_time) => {

Expand All @@ -49,7 +48,7 @@ pub(self) async fn register_notif(
match cloned_app_state.clone().actors.as_ref().unwrap()
.producer_actors.notif_actor.send(cloned_notif).await
{
Ok(_) => {},
Ok(_) => { () },
Err(e) => {
let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let err_instance = crate::error::HoopoeErrorResponse::new(
Expand Down Expand Up @@ -94,7 +93,7 @@ pub(self) async fn register_notif(
match cloned_app_state.clone().actors.as_ref().unwrap()
.consumer_actors.notif_actor.send(cloned_notif).await
{
Ok(_) => {},
Ok(_) => { () },
Err(e) => {
let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let err_instance = crate::error::HoopoeErrorResponse::new(
Expand Down Expand Up @@ -133,7 +132,6 @@ pub(self) async fn register_notif(
StatusCode::NOT_ACCEPTABLE,
None::<Cookie<'_>>,
}

}

},
Expand Down
4 changes: 3 additions & 1 deletion src/apis/http/v1/health/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ pub(self) async fn mint_demo(

let product = pinfo.to_owned(); // received product info from user

let notif_producer_actor = app_state.as_ref().actors.clone().unwrap().producer_actors.notif_actor;

tokio::spawn(async move{

// some lock-free logics:
Expand All @@ -163,7 +165,7 @@ pub(self) async fn mint_demo(

});

let (minting_exclusion, mut product_receiver) = product.atomic_purchase_status().await;
let (minting_exclusion, mut product_receiver) = product.atomic_purchase_status(notif_producer_actor).await;

match minting_exclusion{
true => { // product is being minted and is locked
Expand Down
7 changes: 4 additions & 3 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ async fn main() -> std::io::Result<()>{
dotenv::dotenv().expect("expected .env file be there!");
env::set_var("RUST_LOG", "trace");
env_logger::init_from_env(Env::default().default_filter_or("info"));
// env::set_var("RUST_LOG", "actix_web=debug");


/* -Λ‹Λβœ„β”ˆβ”ˆβ”ˆβ”ˆ initializing appstate actor workers
>_ run actor workers, app_state contains the whole app data
which will be used globally during the execution of the app
Expand All @@ -187,7 +187,8 @@ async fn main() -> std::io::Result<()>{
} else{
Migrator::up(&connection, None).await.unwrap();
}



/* -Λ‹Λβœ„β”ˆβ”ˆβ”ˆβ”ˆ bootstrapping http server
>_
*/
Expand All @@ -197,7 +198,7 @@ async fn main() -> std::io::Result<()>{
chrono::Local::now().naive_local());

bootsteap_http!{
app_state.clone(),
app_state.clone(), // sharing the whole app state data between actix threads and apis
}


Expand Down
6 changes: 4 additions & 2 deletions src/interfaces/purchase.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@


use actix::Addr;
use crate::*;
use self::actors::producers::notif::NotifProducerActor;



#[trait_variant::make(ProductExtSend: Send)]
pub trait ProductExt{
type Product;
async fn atomic_purchase_status(&self) -> (bool, tokio::sync::mpsc::Receiver<Self::Product>);
async fn mint(&mut self) -> (bool, Self::Product);
async fn atomic_purchase_status(&self, notif_producer_actor: Addr<NotifProducerActor>) -> (bool, tokio::sync::mpsc::Receiver<Self::Product>);
async fn mint(&mut self, notif_producer_actor: Addr<NotifProducerActor>) -> (bool, Self::Product);
}
Loading

0 comments on commit db80297

Please sign in to comment.