Skip to content

Commit

Permalink
initial bytea conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
deankarn committed Oct 4, 2023
1 parent 52c321f commit 8bddf7f
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 269 deletions.
31 changes: 15 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ description = "A no nonsense, horizontally scalable, ordered job runner backed b
clap = "4"
tokio = { version = "1.32.0", default-features = false }
anyhow = "1.0.75"
thiserror = "1.0.48"
thiserror = "1.0.49"
serde = { version = "1.0.188" }
serde_json = { version = "1.0.107" }
tracing = "0.1.37"
Expand Down
3 changes: 1 addition & 2 deletions relay-backend-postgres/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "relay-backend-postgres"
version = "0.8.1"
version = "0.9.0"
edition.workspace = true
authors.workspace = true
description.workspace = true
Expand All @@ -12,7 +12,6 @@ relay-core.workspace = true
tracing.workspace = true
metrics.workspace = true
chrono.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
async-trait.workspace = true
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
deadpool-postgres = "0.10.5"
Expand Down
46 changes: 18 additions & 28 deletions relay-backend-postgres/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use deadpool_postgres::{
};
use metrics::{counter, histogram, increment_counter};
use pg_interval::Interval;
use relay_core::{Backend, Error, Job, Result};
use relay_core::{Backend, Error, Job as CoreJob, Result};
use rustls::client::{ServerCertVerified, ServerCertVerifier, WebPkiVerifier};
use rustls::{Certificate, OwnedTrustAnchor, RootCertStore, ServerName};
use serde_json::value::RawValue;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::io;
Expand All @@ -36,8 +35,8 @@ const MIGRATIONS: [Migration; 2] = [
),
];

/// `RawJob` represents a Relay Job for the Postgres backend.
type RawJob = Job<Box<RawValue>, Box<RawValue>>;
/// Is the Postgres backend Job type.
pub type Job = CoreJob<Vec<u8>, Vec<u8>>;

/// Postgres backing store
pub struct PgStore {
Expand Down Expand Up @@ -159,7 +158,7 @@ impl PgStore {
}

#[async_trait]
impl Backend<Box<RawValue>, Box<RawValue>> for PgStore {
impl Backend<Vec<u8>, Vec<u8>> for PgStore {
/// Creates a batch of Jobs to be processed in a single write transaction.
///
/// NOTES: If the number of jobs passed is '1' then those will return a `JobExists` error
Expand All @@ -172,7 +171,7 @@ impl Backend<Box<RawValue>, Box<RawValue>> for PgStore {
///
/// Will return `Err` if there is any communication issues with the backend Postgres DB.
#[tracing::instrument(name = "pg_enqueue", level = "debug", skip_all, fields(jobs = jobs.len()))]
async fn enqueue(&self, jobs: &[RawJob]) -> Result<()> {
async fn enqueue(&self, jobs: &[Job]) -> Result<()> {
if jobs.len() == 1 {
let job = jobs.first().unwrap();
let now = Utc::now().naive_utc();
Expand Down Expand Up @@ -217,8 +216,8 @@ impl Backend<Box<RawValue>, Box<RawValue>> for PgStore {
&job.queue,
&Interval::from_duration(chrono::Duration::seconds(i64::from(job.timeout))),
&job.max_retries,
&Json(&job.payload),
&job.state.as_ref().map(|state| Some(Json(state))),
&job.payload.as_slice(),
&job.state.as_ref().map(|state| Some(state.as_slice())),
&now,
&run_at,
],
Expand Down Expand Up @@ -290,8 +289,8 @@ impl Backend<Box<RawValue>, Box<RawValue>> for PgStore {
job.timeout,
))),
&job.max_retries,
&Json(&job.payload),
&job.state.as_ref().map(|state| Some(Json(state))),
&job.payload.as_slice(),
&job.state.as_ref().map(|state| Some(state.as_slice())),
&now,
&run_at,
],
Expand Down Expand Up @@ -331,7 +330,7 @@ impl Backend<Box<RawValue>, Box<RawValue>> for PgStore {
///
/// Will return `Err` if there is any communication issues with the backend Postgres DB.
#[tracing::instrument(name = "pg_get", level = "debug", skip_all, fields(job_id=%job_id, queue=%queue))]
async fn get(&self, queue: &str, job_id: &str) -> Result<Option<RawJob>> {
async fn get(&self, queue: &str, job_id: &str) -> Result<Option<Job>> {
let client = self.pool.get().await.map_err(|e| Error::Backend {
message: e.to_string(),
is_retryable: is_retryable_pool(e),
Expand Down Expand Up @@ -429,12 +428,7 @@ impl Backend<Box<RawValue>, Box<RawValue>> for PgStore {
/// Will return `Err` if there is any communication issues with the backend Postgres DB or the
/// Job attempting to be updated cannot be found.
#[tracing::instrument(name = "pg_heartbeat", level = "debug", skip_all, fields(job_id=%job_id, queue=%queue))]
async fn heartbeat(
&self,
queue: &str,
job_id: &str,
state: Option<Box<RawValue>>,
) -> Result<()> {
async fn heartbeat(&self, queue: &str, job_id: &str, state: Option<Vec<u8>>) -> Result<()> {
let client = self.pool.get().await.map_err(|e| Error::Backend {
message: e.to_string(),
is_retryable: is_retryable_pool(e),
Expand Down Expand Up @@ -538,7 +532,7 @@ impl Backend<Box<RawValue>, Box<RawValue>> for PgStore {
///
/// Will return `Err` if there is any communication issues with the backend Postgres DB.
#[tracing::instrument(name = "pg_next", level = "debug", skip_all, fields(num_jobs=num_jobs, queue=%queue))]
async fn next(&self, queue: &str, num_jobs: u32) -> Result<Option<Vec<RawJob>>> {
async fn next(&self, queue: &str, num_jobs: u32) -> Result<Option<Vec<Job>>> {
let client = self.pool.get().await.map_err(|e| Error::Backend {
message: e.to_string(),
is_retryable: is_retryable_pool(e),
Expand Down Expand Up @@ -652,7 +646,7 @@ impl Backend<Box<RawValue>, Box<RawValue>> for PgStore {
///
/// Will return `Err` if there is any communication issues with the backend Postgres DB.
#[tracing::instrument(name = "pg_reschedule", level = "debug", skip_all, fields(job_id=%job.id, queue=%job.queue))]
async fn reschedule(&self, job: &RawJob) -> Result<()> {
async fn reschedule(&self, job: &Job) -> Result<()> {
let now = Utc::now().naive_utc();
let run_at = if let Some(run_at) = job.run_at {
run_at.naive_utc()
Expand Down Expand Up @@ -700,7 +694,7 @@ impl Backend<Box<RawValue>, Box<RawValue>> for PgStore {
&Interval::from_duration(chrono::Duration::seconds(i64::from(job.timeout))),
&job.max_retries,
&Json(&job.payload),
&job.state.as_ref().map(|state| Some(Json(state))),
&job.state.as_ref().map(|state| Some(state.as_slice())),
&now,
&run_at,
],
Expand Down Expand Up @@ -944,18 +938,14 @@ fn is_retryable_pool(e: PoolError) -> bool {
}

#[inline]
fn row_to_job(row: &Row) -> RawJob {
RawJob {
fn row_to_job(row: &Row) -> Job {
Job {
id: row.get(0),
queue: row.get(1),
timeout: interval_seconds(row.get::<usize, Interval>(2)),
max_retries: row.get(3),
payload: row.get::<usize, Json<Box<RawValue>>>(4).0,
state: row
.get::<usize, Option<Json<Box<RawValue>>>>(5)
.map(|state| match state {
Json(state) => state,
}),
payload: row.get(4),
state: row.get(5),
run_at: Some(Utc.from_utc_datetime(&row.get(6))),
updated_at: Some(Utc.from_utc_datetime(&row.get(7))),
}
Expand Down
3 changes: 2 additions & 1 deletion relay-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "relay-core"
version = "0.5.0"
version = "0.6.0"
edition.workspace = true
authors.workspace = true
description.workspace = true
Expand All @@ -10,6 +10,7 @@ description.workspace = true
[dependencies]
thiserror.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = ["raw_value"] }
anydate = { workspace = true, features = ["serde"] }
chrono = { workspace = true, features = ["serde"] }
async-trait.workspace = true
1 change: 1 addition & 0 deletions relay-core/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anydate::serde::deserialize::anydate_utc_option;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;

/// Job defines all information needed to process a job.
#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
Expand Down
4 changes: 2 additions & 2 deletions relay-frontend-http/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "relay-frontend-http"
version = "0.8.0"
version = "0.9.0"
edition.workspace = true
authors.workspace = true
description.workspace = true
Expand All @@ -12,10 +12,10 @@ relay-core.workspace = true
anyhow.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["time", "macros","signal"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = ["raw_value"] }
tracing.workspace = true
metrics.workspace = true
serde = { workspace = true, features = ["derive"] }
reqwest = { workspace = true, features = ["rustls-tls", "json"] }
backoff-rs.workspace = true
percent-encoding.workspace = true
Expand Down

0 comments on commit 8bddf7f

Please sign in to comment.