Skip to content
This repository has been archived by the owner on Jun 29, 2023. It is now read-only.

Commit

Permalink
fix(core): Fix async panic on blocking #19
Browse files Browse the repository at this point in the history
Replaced internal threads with tokio spawn functions
  • Loading branch information
kjuulh committed Feb 20, 2023
1 parent 45d6462 commit 75bc17e
Show file tree
Hide file tree
Showing 21 changed files with 66 additions and 61 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ sha2 = "0.10.6"
tar = "0.4.38"
tempfile = "3.3.0"
color-eyre = "0.6.2"
tokio = { version = "1.25.0", features = ["full"] }
1 change: 1 addition & 0 deletions crates/dagger-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ serde_json = "1.0.93"
sha2 = "0.10.6"
tar = "0.4.38"
tempfile = "3.3.0"
tokio = { version = "1.25.0", features = ["full"] }
22 changes: 11 additions & 11 deletions crates/dagger-core/src/cli_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
io::{BufRead, BufReader},
path::PathBuf,
process::{Child, Stdio},
sync::{mpsc::sync_channel, Arc},
sync::Arc,
};

use crate::{config::Config, connect_params::ConnectParams};
Expand All @@ -20,26 +20,26 @@ impl CliSession {
}
}

pub fn connect(
pub async fn connect(
&self,
config: &Config,
cli_path: &PathBuf,
) -> eyre::Result<(ConnectParams, Child)> {
self.inner.connect(config, cli_path)
self.inner.connect(config, cli_path).await
}
}

#[derive(Debug)]
struct InnerCliSession {}

impl InnerCliSession {
pub fn connect(
pub async fn connect(
&self,
config: &Config,
cli_path: &PathBuf,
) -> eyre::Result<(ConnectParams, Child)> {
let proc = self.start(config, cli_path)?;
let params = self.get_conn(proc)?;
let params = self.get_conn(proc).await?;
Ok(params)
}

Expand Down Expand Up @@ -70,7 +70,7 @@ impl InnerCliSession {
return Ok(proc);
}

fn get_conn(
async fn get_conn(
&self,
mut proc: std::process::Child,
) -> eyre::Result<(ConnectParams, std::process::Child)> {
Expand All @@ -84,22 +84,22 @@ impl InnerCliSession {
.take()
.ok_or(eyre::anyhow!("could not acquire stderr from child process"))?;

let (sender, receiver) = sync_channel(1);
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);

std::thread::spawn(move || {
tokio::spawn(async move {
let stdout_bufr = BufReader::new(stdout);
for line in stdout_bufr.lines() {
let out = line.as_ref().unwrap();
if let Ok(conn) = serde_json::from_str::<ConnectParams>(&out) {
sender.send(conn).unwrap();
sender.send(conn).await.unwrap();
}
if let Ok(line) = line {
println!("dagger: {}", line);
}
}
});

std::thread::spawn(|| {
tokio::spawn(async move {
let stderr_bufr = BufReader::new(stderr);
for line in stderr_bufr.lines() {
if let Ok(line) = line {
Expand All @@ -109,7 +109,7 @@ impl InnerCliSession {
}
});

let conn = receiver.recv()?;
let conn = receiver.recv().await.ok_or(eyre::anyhow!("could not receive ok signal from dagger-engine"))?;

Ok((conn, proc))
}
Expand Down
20 changes: 10 additions & 10 deletions crates/dagger-core/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Downloader {
Ok(path)
}

pub fn get_cli(&self) -> eyre::Result<PathBuf> {
pub async fn get_cli(&self) -> eyre::Result<PathBuf> {
let version = &self.version;
let mut cli_bin_path = self.cache_dir()?;
cli_bin_path.push(format!("{CLI_BIN_PREFIX}{version}"));
Expand All @@ -129,7 +129,7 @@ impl Downloader {

if !cli_bin_path.exists() {
cli_bin_path = self
.download(cli_bin_path)
.download(cli_bin_path).await
.context("failed to download CLI from archive")?;
}

Expand All @@ -145,8 +145,8 @@ impl Downloader {
Ok(cli_bin_path)
}

fn download(&self, path: PathBuf) -> eyre::Result<PathBuf> {
let expected_checksum = self.expected_checksum()?;
async fn download(&self, path: PathBuf) -> eyre::Result<PathBuf> {
let expected_checksum = self.expected_checksum().await?;

let mut bytes = vec![];
let actual_hash = self.extract_cli_archive(&mut bytes)?;
Expand All @@ -165,15 +165,15 @@ impl Downloader {
Ok(path)
}

fn expected_checksum(&self) -> eyre::Result<String> {
async fn expected_checksum(&self) -> eyre::Result<String> {
let archive_url = &self.archive_url();
let archive_path = PathBuf::from(&archive_url);
let archive_name = archive_path
.file_name()
.ok_or(eyre::anyhow!("could not get file_name from archive_url"))?;
let resp = reqwest::blocking::get(self.checksum_url())?;
let resp = reqwest::get(self.checksum_url()).await?;
let resp = resp.error_for_status()?;
for line in resp.text()?.lines() {
for line in resp.text().await?.lines() {
let mut content = line.split_whitespace();
let checksum = content
.next()
Expand Down Expand Up @@ -240,9 +240,9 @@ impl Downloader {
mod test {
use super::Downloader;

#[test]
fn download() {
let cli_path = Downloader::new("0.3.10".into()).unwrap().get_cli().unwrap();
#[tokio::test]
async fn download() {
let cli_path = Downloader::new("0.3.10".into()).unwrap().get_cli().await.unwrap();

assert_eq!(
Some("dagger-0.3.10"),
Expand Down
16 changes: 8 additions & 8 deletions crates/dagger-core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ impl Engine {
Self {}
}

fn from_cli(&self, cfg: &Config) -> eyre::Result<(ConnectParams, Child)> {
let cli = Downloader::new("0.3.12".into())?.get_cli()?;
async fn from_cli(&self, cfg: &Config) -> eyre::Result<(ConnectParams, Child)> {
let cli = Downloader::new("0.3.12".into())?.get_cli().await?;

let cli_session = CliSession::new();

Ok(cli_session.connect(cfg, &cli)?)
Ok(cli_session.connect(cfg, &cli).await?)
}

pub fn start(&self, cfg: &Config) -> eyre::Result<(ConnectParams, Child)> {
pub async fn start(&self, cfg: &Config) -> eyre::Result<(ConnectParams, Child)> {
// TODO: Add from existing session as well
self.from_cli(cfg)
self.from_cli(cfg).await
}
}

Expand All @@ -32,10 +32,10 @@ mod tests {
use super::Engine;

// TODO: these tests potentially have a race condition
#[test]
fn engine_can_start() {
#[tokio::test]
async fn engine_can_start() {
let engine = Engine::new();
let params = engine.start(&Config::new(None, None, None, None)).unwrap();
let params = engine.start(&Config::new(None, None, None, None)).await.unwrap();

assert_ne!(
params.0,
Expand Down
12 changes: 6 additions & 6 deletions crates/dagger-core/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::introspection::IntrospectionResponse;
use crate::{config::Config, engine::Engine, session::Session};

pub fn get_schema() -> eyre::Result<IntrospectionResponse> {
pub async fn get_schema() -> eyre::Result<IntrospectionResponse> {
let cfg = Config::new(None, None, None, None);

//TODO: Implement context for proc
let (conn, _proc) = Engine::new().start(&cfg)?;
let (conn, _proc) = Engine::new().start(&cfg).await?;
let session = Session::new();
let req_builder = session.start(&cfg, &conn)?;
let schema = session.schema(req_builder)?;
let schema = session.schema(req_builder).await?;

Ok(schema)
}
Expand All @@ -17,8 +17,8 @@ pub fn get_schema() -> eyre::Result<IntrospectionResponse> {
mod tests {
use super::get_schema;

#[test]
fn can_get_schema() {
let _ = get_schema().unwrap();
#[tokio::test]
async fn can_get_schema() {
let _ = get_schema().await.unwrap();
}
}
10 changes: 5 additions & 5 deletions crates/dagger-core/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use graphql_client::GraphQLQuery;
use reqwest::{
blocking::{Client, RequestBuilder},
header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE},
Client, RequestBuilder,
};

use crate::{config::Config, connect_params::ConnectParams, introspection::IntrospectionResponse};
Expand Down Expand Up @@ -37,22 +37,22 @@ impl Session {
Ok(req_builder)
}

pub fn schema(&self, req_builder: RequestBuilder) -> eyre::Result<IntrospectionResponse> {
pub async fn schema(&self, req_builder: RequestBuilder) -> eyre::Result<IntrospectionResponse> {
let request_body: graphql_client::QueryBody<()> = graphql_client::QueryBody {
variables: (),
query: introspection_query::QUERY,
operation_name: introspection_query::OPERATION_NAME,
};

let res = req_builder.json(&request_body).send()?;
let res = req_builder.json(&request_body).send().await?;

if res.status().is_success() {
// do nothing
} else if res.status().is_server_error() {
return Err(eyre::anyhow!("server error!"));
} else {
let status = res.status();
let error_message = match res.text() {
let error_message = match res.text().await {
Ok(msg) => match serde_json::from_str::<serde_json::Value>(&msg) {
Ok(json) => {
format!("HTTP {}\n{}", status, serde_json::to_string_pretty(&json)?)
Expand All @@ -64,7 +64,7 @@ impl Session {
return Err(eyre::anyhow!(error_message));
}

let json: IntrospectionResponse = res.json()?;
let json: IntrospectionResponse = res.json().await?;

Ok(json)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/dagger-sdk/examples/build-the-application/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use dagger_sdk::HostDirectoryOpts;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let client = dagger_sdk::connect()?;
let client = dagger_sdk::connect().await?;

let host_source_dir = client.host().directory_opts(
"examples/build-the-application/app",
Expand Down
2 changes: 1 addition & 1 deletion crates/dagger-sdk/examples/caching/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use rand::Rng;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let client = dagger_sdk::connect()?;
let client = dagger_sdk::connect().await?;

let host_source_dir = client.host().directory_opts(
"./examples/caching/app",
Expand Down
2 changes: 1 addition & 1 deletion crates/dagger-sdk/examples/existing-dockerfile/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use rand::Rng;
async fn main() -> eyre::Result<()> {
let mut rng = rand::thread_rng();

let client = dagger_sdk::connect()?;
let client = dagger_sdk::connect().await?;

let context_dir = client
.host()
Expand Down
2 changes: 1 addition & 1 deletion crates/dagger-sdk/examples/first-pipeline/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[tokio::main]
async fn main() -> eyre::Result<()> {
let client = dagger_sdk::connect()?;
let client = dagger_sdk::connect().await?;

let version = client
.container()
Expand Down
2 changes: 1 addition & 1 deletion crates/dagger-sdk/examples/multi-stage-build/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use rand::Rng;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let client = dagger_sdk::connect()?;
let client = dagger_sdk::connect().await?;

let host_source_dir = client.host().directory_opts(
"examples/publish-the-application/app",
Expand Down
2 changes: 1 addition & 1 deletion crates/dagger-sdk/examples/publish-the-application/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use rand::Rng;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let client = dagger_sdk::connect()?;
let client = dagger_sdk::connect().await?;
let output = "examples/publish-the-application/app/build";

let host_source_dir = client.host().directory_opts(
Expand Down
2 changes: 1 addition & 1 deletion crates/dagger-sdk/examples/test-the-application/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use dagger_sdk::HostDirectoryOpts;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let client = dagger_sdk::connect()?;
let client = dagger_sdk::connect().await?;

let host_source_dir = client.host().directory_opts(
"examples/test-the-application/app",
Expand Down
10 changes: 5 additions & 5 deletions crates/dagger-sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use crate::querybuilder::query;

pub type DaggerConn = Arc<Query>;

pub fn connect() -> eyre::Result<DaggerConn> {
pub async fn connect() -> eyre::Result<DaggerConn> {
let cfg = Config::default();
let (conn, proc) = DaggerEngine::new().start(&cfg)?;
let (conn, proc) = DaggerEngine::new().start(&cfg).await?;

Ok(Arc::new(Query {
conn,
Expand Down Expand Up @@ -44,8 +44,8 @@ pub fn graphql_client(conn: &ConnectParams) -> gql_client::Client {
mod test {
use super::connect;

#[test]
fn test_connect() {
let _ = connect().unwrap();
#[tokio::test]
async fn test_connect() {
let _ = connect().await.unwrap();
}
}
2 changes: 1 addition & 1 deletion crates/dagger-sdk/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ pub struct SecretId(String);
pub struct SocketId(String);
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct BuildArg {
pub name: String,
pub value: String,
pub name: String,
}
pub struct CacheVolume {
pub proc: Arc<Child>,
Expand Down
2 changes: 1 addition & 1 deletion crates/dagger-sdk/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use dagger_sdk::{connect, ContainerExecOptsBuilder};

#[tokio::test]
async fn test_example_container() {
let client = connect().unwrap();
let client = connect().await.unwrap();

let alpine = client.container().from("alpine:3.16.2");

Expand Down
Loading

0 comments on commit 75bc17e

Please sign in to comment.