Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl MagicValidator {
let task_scheduler = TaskSchedulerService::new(
&task_scheduler_db_path,
&config.task_scheduler,
dispatch.transaction_scheduler.clone(),
config.listen.http(),
dispatch
.tasks_service
.take()
Expand Down
24 changes: 24 additions & 0 deletions magicblock-config/src/types/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@ impl Default for BindAddress {
}
}

impl BindAddress {
fn as_connect_addr(&self) -> SocketAddr {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};

match self.0.ip() {
IpAddr::V4(ip) if ip.is_unspecified() => {
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), self.0.port())
}
IpAddr::V6(ip) if ip.is_unspecified() => {
SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), self.0.port())
}
_ => self.0,
}
}

pub fn http(&self) -> String {
format!("http://{}", self.as_connect_addr())
}

pub fn websocket(&self) -> String {
format!("ws://{}", self.as_connect_addr())
}
}

/// A connection to one or more remote clusters (e.g., "devnet").
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "kebab-case", untagged)]
Expand Down
9 changes: 1 addition & 8 deletions magicblock-task-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,12 @@ magicblock-program = { workspace = true }
rusqlite = { workspace = true }
solana-instruction = { workspace = true }
solana-message = { workspace = true }
solana-program = { workspace = true }
solana-pubkey = { workspace = true }
solana-pubsub-client = { workspace = true }
solana-rpc-client = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-signature = { workspace = true }
solana-transaction = { workspace = true }
solana-transaction-error = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true, features = ["time"] }

[dev-dependencies]
magicblock-magic-program-api = { workspace = true }
test-kit = { workspace = true }
guinea = { workspace = true }
solana-account = { workspace = true }
32 changes: 13 additions & 19 deletions magicblock-task-scheduler/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use solana_instruction::Instruction;
use solana_pubkey::Pubkey;
use tokio::sync::Mutex;

use crate::errors::TaskSchedulerError;
use crate::errors::TaskSchedulerResult;

/// Represents a task in the database
/// Uses i64 for all timestamps and IDs to avoid overflows
Expand Down Expand Up @@ -65,7 +65,7 @@ impl SchedulerDatabase {
path.join("task_scheduler.sqlite")
}

pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, TaskSchedulerError> {
pub fn new<P: AsRef<Path>>(path: P) -> TaskSchedulerResult<Self> {
let conn = Connection::open(path)?;

// Create tables
Expand Down Expand Up @@ -108,10 +108,7 @@ impl SchedulerDatabase {
})
}

pub async fn insert_task(
&self,
task: &DbTask,
) -> Result<(), TaskSchedulerError> {
pub async fn insert_task(&self, task: &DbTask) -> TaskSchedulerResult<()> {
let instructions_bin = bincode::serialize(&task.instructions)?;
let authority_str = task.authority.to_string();
let now = Utc::now().timestamp_millis();
Expand Down Expand Up @@ -139,7 +136,7 @@ impl SchedulerDatabase {
&self,
task_id: i64,
last_execution: i64,
) -> Result<(), TaskSchedulerError> {
) -> TaskSchedulerResult<()> {
let now = Utc::now().timestamp_millis();

self.conn.lock().await.execute(
Expand All @@ -158,7 +155,7 @@ impl SchedulerDatabase {
&self,
task_id: i64,
error: String,
) -> Result<(), TaskSchedulerError> {
) -> TaskSchedulerResult<()> {
self.conn.lock().await.execute(
"INSERT INTO failed_scheduling (timestamp, task_id, error) VALUES (?, ?, ?)",
params![Utc::now().timestamp_millis(), task_id, error],
Expand All @@ -171,7 +168,7 @@ impl SchedulerDatabase {
&self,
task_id: i64,
error: String,
) -> Result<(), TaskSchedulerError> {
) -> TaskSchedulerResult<()> {
self.conn.lock().await.execute(
"INSERT INTO failed_tasks (timestamp, task_id, error) VALUES (?, ?, ?)",
params![Utc::now().timestamp_millis(), task_id, error],
Expand All @@ -183,7 +180,7 @@ impl SchedulerDatabase {
pub async fn unschedule_task(
&self,
task_id: i64,
) -> Result<(), TaskSchedulerError> {
) -> TaskSchedulerResult<()> {
self.conn.lock().await.execute(
"UPDATE tasks SET executions_left = 0 WHERE id = ?",
[task_id],
Expand All @@ -192,10 +189,7 @@ impl SchedulerDatabase {
Ok(())
}

pub async fn remove_task(
&self,
task_id: i64,
) -> Result<(), TaskSchedulerError> {
pub async fn remove_task(&self, task_id: i64) -> TaskSchedulerResult<()> {
self.conn
.lock()
.await
Expand All @@ -207,7 +201,7 @@ impl SchedulerDatabase {
pub async fn get_task(
&self,
task_id: i64,
) -> Result<Option<DbTask>, TaskSchedulerError> {
) -> TaskSchedulerResult<Option<DbTask>> {
let db = self.conn.lock().await;
let mut stmt = db.prepare(
"SELECT id, instructions, authority, execution_interval_millis, executions_left, last_execution_millis
Expand Down Expand Up @@ -244,7 +238,7 @@ impl SchedulerDatabase {
Ok(rows.next().transpose()?)
}

pub async fn get_tasks(&self) -> Result<Vec<DbTask>, TaskSchedulerError> {
pub async fn get_tasks(&self) -> TaskSchedulerResult<Vec<DbTask>> {
let db = self.conn.lock().await;
let mut stmt = db.prepare(
"SELECT id, instructions, authority, execution_interval_millis, executions_left, last_execution_millis
Expand Down Expand Up @@ -286,7 +280,7 @@ impl SchedulerDatabase {
Ok(tasks)
}

pub async fn get_task_ids(&self) -> Result<Vec<i64>, TaskSchedulerError> {
pub async fn get_task_ids(&self) -> TaskSchedulerResult<Vec<i64>> {
let db = self.conn.lock().await;
let mut stmt = db.prepare(
"SELECT id
Expand All @@ -300,7 +294,7 @@ impl SchedulerDatabase {

pub async fn get_failed_schedulings(
&self,
) -> Result<Vec<FailedScheduling>, TaskSchedulerError> {
) -> TaskSchedulerResult<Vec<FailedScheduling>> {
let db = self.conn.lock().await;
let mut stmt = db.prepare(
"SELECT *
Expand All @@ -321,7 +315,7 @@ impl SchedulerDatabase {

pub async fn get_failed_tasks(
&self,
) -> Result<Vec<FailedTask>, TaskSchedulerError> {
) -> TaskSchedulerResult<Vec<FailedTask>> {
let db = self.conn.lock().await;
let mut stmt = db.prepare(
"SELECT *
Expand Down
34 changes: 1 addition & 33 deletions magicblock-task-scheduler/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,15 @@ pub enum TaskSchedulerError {
#[error(transparent)]
DatabaseConnection(#[from] rusqlite::Error),

#[error(transparent)]
Pubsub(
Box<
solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError,
>,
),

#[error(transparent)]
Bincode(#[from] bincode::Error),

#[error("Task not found: {0}")]
TaskNotFound(i64),

#[error(transparent)]
Transaction(#[from] solana_transaction_error::TransactionError),

#[error("Task context not found")]
TaskContextNotFound,
Rpc(#[from] Box<solana_rpc_client_api::client_error::Error>),

#[error(transparent)]
Io(#[from] std::io::Error),

#[error("Failed to process some context requests: {0:?}")]
SchedulingRequests(Vec<TaskSchedulerError>),

#[error("Failed to serialize task context: {0:?}")]
ContextSerialization(Vec<u8>),

#[error("Failed to deserialize task context: {0:?}")]
ContextDeserialization(Vec<u8>),

#[error("Task {0} already exists and is owned by {1}, not {2}")]
UnauthorizedReplacing(i64, String, String),
}

impl From<solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError>
for TaskSchedulerError
{
fn from(
e: solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError,
) -> Self {
Self::Pubsub(Box::new(e))
}
}
29 changes: 15 additions & 14 deletions magicblock-task-scheduler/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::{
use futures_util::StreamExt;
use log::*;
use magicblock_config::config::TaskSchedulerConfig;
use magicblock_core::link::transactions::{
ScheduledTasksRx, TransactionSchedulerHandle,
};
use magicblock_core::link::transactions::ScheduledTasksRx;
use magicblock_ledger::LatestBlock;
use magicblock_program::{
args::{CancelTaskRequest, TaskRequest},
Expand All @@ -18,6 +16,7 @@ use magicblock_program::{
};
use solana_instruction::Instruction;
use solana_message::Message;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_signature::Signature;
use solana_transaction::Transaction;
use tokio::{select, task::JoinHandle, time::Duration};
Expand All @@ -34,8 +33,8 @@ use crate::{
pub struct TaskSchedulerService {
/// Database for persisting tasks
db: SchedulerDatabase,
/// Used to send transactions for execution
tx_scheduler: TransactionSchedulerHandle,
/// RPC client used to send transactions
rpc_client: RpcClient,
/// Used to receive scheduled tasks from the transaction executor
scheduled_tasks: ScheduledTasksRx,
/// Provides latest blockhash for signing transactions
Expand All @@ -52,7 +51,7 @@ pub struct TaskSchedulerService {

enum ProcessingOutcome {
Success,
Recoverable(TaskSchedulerError),
Recoverable(Box<TaskSchedulerError>),
}

// SAFETY: TaskSchedulerService is moved into a single Tokio task in `start()` and never cloned.
Expand All @@ -65,11 +64,11 @@ impl TaskSchedulerService {
pub fn new(
path: &Path,
config: &TaskSchedulerConfig,
tx_scheduler: TransactionSchedulerHandle,
rpc_url: String,
scheduled_tasks: ScheduledTasksRx,
block: LatestBlock,
token: CancellationToken,
) -> Result<Self, TaskSchedulerError> {
) -> TaskSchedulerResult<Self> {
if config.reset {
match std::fs::remove_file(path) {
Ok(_) => {}
Expand All @@ -87,7 +86,7 @@ impl TaskSchedulerService {
let db = SchedulerDatabase::new(path)?;
Ok(Self {
db,
tx_scheduler,
rpc_client: RpcClient::new(rpc_url),
scheduled_tasks,
block,
task_queue: DelayQueue::new(),
Expand Down Expand Up @@ -139,7 +138,7 @@ impl TaskSchedulerService {
schedule_request.id, e
);

return Ok(ProcessingOutcome::Recoverable(e));
return Ok(ProcessingOutcome::Recoverable(Box::new(e)));
}
}
TaskRequest::Cancel(cancel_request) => {
Expand All @@ -157,7 +156,7 @@ impl TaskSchedulerService {
cancel_request.task_id, e
);

return Ok(ProcessingOutcome::Recoverable(e));
return Ok(ProcessingOutcome::Recoverable(Box::new(e)));
}
}
};
Expand Down Expand Up @@ -321,8 +320,10 @@ impl TaskSchedulerService {
blockhash,
);

let sig = tx.signatures[0];
self.tx_scheduler.execute(tx).await?;
Ok(sig)
Ok(self
.rpc_client
.send_transaction(&tx)
.await
.map_err(Box::new)?)
}
}
Loading