Skip to content

Commit

Permalink
feat: logs can be downloaded directly from server/frontend if using s…
Browse files Browse the repository at this point in the history
…hared volume
  • Loading branch information
rubenfiszel committed May 12, 2024
1 parent a3be6e9 commit a3a66d0
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 66 deletions.
109 changes: 62 additions & 47 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use serde_json::value::RawValue;
use std::collections::HashMap;
#[cfg(feature = "prometheus")]
use std::sync::atomic::Ordering;
use tokio::io::AsyncReadExt;
#[cfg(feature = "prometheus")]
use tokio::time::Instant;
use windmill_common::flow_status::{JobResult, RestartedFrom};
use windmill_common::jobs::{
format_completed_job_result, format_result, CompletedJobWithFormattedResult, FormattedResult,
ENTRYPOINT_OVERRIDE,
};
use windmill_common::worker::TMP_DIR;

#[cfg(all(feature = "enterprise", feature = "parquet"))]
use windmill_common::scripts::PREVIEW_IS_CODEBASE_HASH;
Expand Down Expand Up @@ -664,17 +666,17 @@ async fn get_job_internal(
async fn get_logs_from_store(
log_offset: i32,
logs: &str,
log_file_index: Option<Vec<String>>,
log_file_index: &Option<Vec<String>>,
) -> Option<error::Result<Body>> {
if log_offset > 0 {
if let Some(file_index) = log_file_index {
if let Some(file_index) = log_file_index.clone() {
tracing::debug!("Getting logs from store: {file_index:?}");
if let Some(os) = OBJECT_STORE_CACHE_SETTINGS.read().await.clone() {
tracing::debug!("object store client present, streaming from there");

let logs = logs.to_string();
let stream = async_stream::stream! {
for file_p in file_index {
for file_p in file_index.clone() {
let file_p_2 = file_p.clone();
let file = os.get(&object_store::path::Path::from(file_p)).await;
if let Ok(file) = file {
Expand All @@ -696,6 +698,40 @@ async fn get_logs_from_store(
}
return None;
}

async fn get_logs_from_disk(
log_offset: i32,
logs: &str,
log_file_index: &Option<Vec<String>>,
) -> Option<error::Result<Body>> {
if log_offset > 0 {
if let Some(file_index) = log_file_index.clone() {
for file_p in &file_index {
if !tokio::fs::metadata(format!("{TMP_DIR}/{file_p}"))
.await
.is_ok()
{
return None;
}
}

let logs = logs.to_string();
let stream = async_stream::stream! {
for file_p in file_index.clone() {
let mut file = tokio::fs::File::open(format!("{TMP_DIR}/{file_p}")).await.map_err(to_anyhow)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await.map_err(to_anyhow)?;
yield Ok(bytes::Bytes::from(buffer)) as anyhow::Result<bytes::Bytes>;
}

yield Ok(bytes::Bytes::from(logs))
};
return Some(Ok(Body::from_stream(stream)));
}
}
return None;
}

async fn get_job_logs(
Extension(db): Extension<DB>,
Path((w_id, id)): Path<(String, Uuid)>,
Expand All @@ -714,7 +750,11 @@ async fn get_job_logs(
if let Some(record) = record {
let logs = record.logs.unwrap_or_default();
#[cfg(all(feature = "enterprise", feature = "parquet"))]
if let Some(r) = get_logs_from_store(record.log_offset, &logs, record.log_file_index).await
if let Some(r) = get_logs_from_store(record.log_offset, &logs, &record.log_file_index).await
{
return r;
}
if let Some(r) = get_logs_from_disk(record.log_offset, &logs, &record.log_file_index).await
{
return r;
}
Expand All @@ -734,7 +774,10 @@ async fn get_job_logs(

let logs = text.logs.unwrap_or_default();
#[cfg(all(feature = "enterprise", feature = "parquet"))]
if let Some(r) = get_logs_from_store(text.log_offset, &logs, text.log_file_index).await {
if let Some(r) = get_logs_from_store(text.log_offset, &logs, &text.log_file_index).await {
return r;
}
if let Some(r) = get_logs_from_disk(text.log_offset, &logs, &text.log_file_index).await {
return r;
}
Ok(Body::from(logs))
Expand Down Expand Up @@ -3649,45 +3692,20 @@ pub struct JobUpdate {
pub flow_status: Option<serde_json::Value>,
}

// #[cfg(all(feature = "enterprise", feature = "parquet"))]
// async fn get_logs_from_store(
// log_offset: i32,
// logs: &str,
// log_file_index: Option<Vec<String>>,
// ) -> Option<error::Result<Body>> {
// if log_offset > 0 {
// if let Some(file_index) = log_file_index {
// tracing::debug!("Getting logs from store: {file_index:?}");
// if let Some(os) = OBJECT_STORE_CACHE_SETTINGS.read().await.clone() {
// tracing::debug!("object store client present, streaming from there");

// let logs = logs.to_string();
// let stream = async_stream::stream! {
// for file_p in file_index {
// let file_p_2 = file_p.clone();
// let file = os.get(&object_store::path::Path::from(file_p)).await;
// if let Ok(file) = file {
// if let Ok(bytes) = file.bytes().await {
// yield Ok(bytes::Bytes::from(bytes)) as object_store::Result<bytes::Bytes>;
// }
// } else {
// tracing::debug!("error getting file from store: {file_p_2}: {}", file.err().unwrap());
// }
// }

// yield Ok(bytes::Bytes::from(logs))
// };
// return Some(Ok(Body::from_stream(stream)));
// } else {
// tracing::debug!("object store client not present, cannot stream logs from store");
// }
// }
// }
// return None;
// }

#[cfg(all(feature = "enterprise", feature = "parquet"))]
async fn get_log_file(Path((_w_id, file_p)): Path<(String, String)>) -> error::Result<Response> {
let local_file = format!("{TMP_DIR}/logs/{file_p}");
if tokio::fs::metadata(&local_file).await.is_ok() {
let mut file = tokio::fs::File::open(local_file).await.map_err(to_anyhow)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await.map_err(to_anyhow)?;
let res = Response::builder()
.header(http::header::CONTENT_TYPE, "text/plain")
.body(Body::from(bytes::Bytes::from(buffer)))
.unwrap();
return Ok(res);
}

#[cfg(all(feature = "enterprise", feature = "parquet"))]
if let Some(os) = OBJECT_STORE_CACHE_SETTINGS.read().await.clone() {
let file = os
.get(&object_store::path::Path::from(format!("logs/{file_p}")))
Expand Down Expand Up @@ -3717,12 +3735,9 @@ async fn get_log_file(Path((_w_id, file_p)): Path<(String, String)>) -> error::R
"Object store client not present, cannot stream logs from store".to_string(),
));
}
}

#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
async fn get_log_file(Path((_w_id, file_p)): Path<(String, String)>) -> error::Result<Response> {
return Err(error::Error::NotFound(format!(
"Get log file is an EE feature: {}",
"File not found on server logs volume /tmp/windmill/logs and no distributed logs s3 storage for {}",
file_p
)));
}
Expand Down
2 changes: 2 additions & 0 deletions backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ lazy_static::lazy_static! {

}

pub const TMP_DIR: &str = "/tmp/windmill";

pub async fn reload_custom_tags_setting(db: &DB) -> error::Result<()> {
let q = sqlx::query!(
"SELECT value FROM global_settings WHERE name = $1",
Expand Down
8 changes: 4 additions & 4 deletions backend/windmill-worker/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS;
use windmill_common::s3_helpers::{
get_etag_or_empty, LargeFileStorage, ObjectStoreResource, S3Object,
};
use windmill_common::worker::{CLOUD_HOSTED, WORKER_CONFIG};
use windmill_common::worker::{CLOUD_HOSTED, TMP_DIR, WORKER_CONFIG};
use windmill_common::{
error::{self, Error},
jobs::QueuedJob,
Expand Down Expand Up @@ -70,7 +70,7 @@ use futures::{

use crate::{
AuthedClient, AuthedClientBackgroundTask, JOB_DEFAULT_TIMEOUT, MAX_RESULT_SIZE,
MAX_TIMEOUT_DURATION, MAX_WAIT_FOR_SIGINT, MAX_WAIT_FOR_SIGTERM, ROOT_CACHE_DIR, TMP_DIR,
MAX_TIMEOUT_DURATION, MAX_WAIT_FOR_SIGINT, MAX_WAIT_FOR_SIGTERM, ROOT_CACHE_DIR,
};

pub async fn build_args_map<'a>(
Expand Down Expand Up @@ -717,9 +717,9 @@ async fn compact_logs(
);

let mut new_current_logs = match compact_kind {
CompactLogs::NoS3 => format!("[windmill] worker {worker_name}: Logs length has exceeded a threshold\n[windmill] Previous logs have been saved to disk at {path}, add object storage in the instance settings to save it on distributed storage and allow direct download from Windmill\n"),
CompactLogs::NoS3 => format!("[windmill] No object storage set in instance settings. Previous logs have been saved to disk at {path}\n"),
CompactLogs::S3 => format!("[windmill] Previous logs have been saved to object storage at {path}\n"),
CompactLogs::NotEE => format!("[windmill] worker {worker_name}: Logs length has exceeded a threshold\n[windmill] Previous logs have been saved to disk at {path}\n[windmill] Upgrade to EE and add object storage to save it persistentely on distributed storage and allow direct download from Windmill\n"),
CompactLogs::NotEE => format!("[windmill] Previous logs have been saved to disk at {path}\n"),
};
new_current_logs.push_str(&current_logs);

Expand Down
7 changes: 4 additions & 3 deletions backend/windmill-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* LICENSE-AGPL for a copy of the license.
*/

use windmill_common::worker::TMP_DIR;

use anyhow::Result;
use const_format::concatcp;
#[cfg(feature = "prometheus")]
Expand Down Expand Up @@ -185,10 +187,9 @@ pub async fn create_token_for_owner(
Ok(token)
}

pub const TMP_DIR: &str = "/tmp/windmill";
pub const TMP_LOGS_DIR: &str = "/tmp/windmill/logs";
pub const TMP_LOGS_DIR: &str = concatcp!(TMP_DIR, "/logs");

pub const ROOT_CACHE_DIR: &str = "/tmp/windmill/cache/";
pub const ROOT_CACHE_DIR: &str = concatcp!(TMP_DIR, "/cache/");
pub const LOCK_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "lock");
pub const PIP_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "pip");
pub const TAR_PIP_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "tar/pip");
Expand Down
7 changes: 6 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ services:
depends_on:
db:
condition: service_healthy
volumes:
- worker_logs:/tmp/windmill/logs

windmill_worker:
image: ${WM_IMAGE}
Expand All @@ -60,6 +62,7 @@ services:
# mount the docker socket to allow to run docker containers from within the workers
- /var/run/docker.sock:/var/run/docker.sock
- worker_dependency_cache:/tmp/windmill/cache
- worker_logs:/tmp/windmill/logs

## This worker is specialized for "native" jobs. Native jobs run in-process and thus are much more lightweight than other jobs
windmill_worker_native:
Expand All @@ -80,7 +83,8 @@ services:
depends_on:
db:
condition: service_healthy

volumes:
- worker_logs:/tmp/windmill/logs
## This worker is specialized for reports or scraping jobs. It is assigned the "reports" worker group which has an init script that installs chromium and can be targeted by using the "chromium" worker tag.
# windmill_worker_reports:
# image: ${WM_IMAGE}
Expand Down Expand Up @@ -142,4 +146,5 @@ services:
volumes:
db_data: null
worker_dependency_cache: null
worker_logs: null
lsp_cache: null
45 changes: 39 additions & 6 deletions frontend/src/lib/components/LogViewer.svelte
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
<script lang="ts" context="module">
const s3LogPrefix = '[windmill] Previous logs have been saved to object storage at logs/'
const s3LogPrefixes = [
'[windmill] Previous logs have been saved to object storage at logs/',
'[windmill] Previous logs have been saved to disk at logs/',
'[windmill] No object storage set in instance settings. Previous logs have been saved to disk at '
]
</script>

<script lang="ts">
Expand All @@ -10,6 +14,7 @@
import AnsiUp from 'ansi_up'
import NoWorkerWithTagWarning from './runs/NoWorkerWithTagWarning.svelte'
import { JobService } from '$lib/gen'
import Tooltip from './Tooltip.svelte'
export let content: string | undefined
export let isLoading: boolean
Expand Down Expand Up @@ -41,12 +46,39 @@
LOG_LIMIT = LOG_INC
}
$: downloadStartUrl = truncatedContent.startsWith(s3LogPrefix)
? truncatedContent.substring(s3LogPrefix.length, truncatedContent.indexOf('\n'))
: undefined
function findPrefixIndex(truncateContent: string): number | undefined {
let index = s3LogPrefixes.findIndex((x) => truncateContent.startsWith(x))
if (index == -1) {
return undefined
}
return index
}
function findStartUrl(truncateContent: string, prefixIndex: number | undefined = undefined) {
if (prefixIndex == undefined) {
return undefined
}
return prefixIndex && truncateContent
? truncateContent.substring(s3LogPrefixes[prefixIndex]?.length, truncateContent.indexOf('\n'))
: undefined
}
function tooltipText(prefixIndex: number | undefined) {
if (prefixIndex == undefined) {
return 'No path/file detected to download from'
} else if (prefixIndex == 0) {
return 'Download the previous logs from the instance configured object store'
} else if (prefixIndex == 1) {
return 'Attempt to download the logs from disk. Assume there is a shared disk between the workers and the server at /tmp/windmill/logs. Upgrade to EE to use an object store such as S3 instead of a shared volume.'
} else if (prefixIndex == 2) {
return 'Attempt to download the logs from disk. Assume there is a shared disk between the workers and the server at /tmp/windmill/logs. Since you are on EE, you can alternatively use an object store such as S3 configured in the instance settings instead of a shared volume..'
}
}
$: truncatedContent = truncateContent(content, loadedFromObjectStore, LOG_LIMIT)
$: prefixIndex = findPrefixIndex(truncatedContent)
$: downloadStartUrl = findStartUrl(truncatedContent, prefixIndex)
function truncateContent(
jobContent: string | undefined,
loadedFromObjectStore: string,
Expand Down Expand Up @@ -134,7 +166,8 @@
>{#if content}{@const len =
(content?.length ?? 0) +
(loadedFromObjectStore?.length ?? 0)}{#if downloadStartUrl}<button
on:click={getStoreLogs}>Show more...</button
on:click={getStoreLogs}
>Show more... <Tooltip>{tooltipText(prefixIndex)}</Tooltip></button
><br
/>{:else if len > LOG_LIMIT}(truncated to the last {LOG_LIMIT} characters)... <button
on:click={() => showMoreTruncate(len)}>Show more</button
Expand Down Expand Up @@ -200,7 +233,7 @@
>{#if content}{@const len =
(content?.length ?? 0) +
(loadedFromObjectStore?.length ?? 0)}{#if downloadStartUrl}<button on:click={getStoreLogs}
>Show more...</button
>Show more... &nbsp;<Tooltip>{tooltipText(prefixIndex)}</Tooltip></button
><br />{:else if len > LOG_LIMIT}(truncated to the last {LOG_LIMIT} characters)<br
/><button on:click={() => showMoreTruncate(len)}>Show more..</button><br />{/if}<span
>{@html html}</span
Expand Down
21 changes: 16 additions & 5 deletions frontend/src/lib/components/TestJobLoader.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@
running: `running` in job && job.running,
logOffset: job.logs?.length ?? 0
})
job.logs = (job.logs ?? '').concat(getUpdate.new_logs ?? '')
if ((job.logs ?? '').length == 0) {
job.logs = getUpdate.new_logs ?? ''
logOffset = getUpdate.log_offset ?? 0
}
}
}
Expand Down Expand Up @@ -167,19 +171,26 @@
if (currentId === id) {
try {
if (job && `running` in job) {
const offset = logOffset == 0 ? (job.logs?.length ? job.logs?.length + 1 : 0) : logOffset
let previewJobUpdates = await JobService.getJobUpdates({
workspace: workspace!,
id,
running: job.running,
logOffset: logOffset == 0 ? (job.logs?.length ? job.logs?.length + 1 : 0) : logOffset
logOffset: offset
})
if (previewJobUpdates.new_logs) {
if (offset == 0) {
job.logs = previewJobUpdates.new_logs ?? ''
} else {
job.logs = (job?.logs ?? '').concat(previewJobUpdates.new_logs)
}
}
if (previewJobUpdates.log_offset) {
logOffset = previewJobUpdates.log_offset ?? 0
}
if (previewJobUpdates.new_logs) {
job.logs = (job?.logs ?? '').concat(previewJobUpdates.new_logs)
}
if (previewJobUpdates.flow_status) {
job.flow_status = previewJobUpdates.flow_status as FlowStatus
}
Expand Down

0 comments on commit a3a66d0

Please sign in to comment.