Skip to content

Commit

Permalink
Merge branch 'multi-stream-integration-v5' of https://github.com/open…
Browse files Browse the repository at this point in the history
…observe/openobserve into multi-stream-integration-v5
  • Loading branch information
bjp232004 committed May 7, 2024
2 parents a407ebb + dd86db7 commit 3b1088f
Show file tree
Hide file tree
Showing 21 changed files with 620 additions and 113 deletions.
6 changes: 3 additions & 3 deletions src/handler/http/router/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use rust_embed_for_web::RustEmbed;

#[derive(RustEmbed)]
#[folder = "web/dist/"]
#[gzip]
#[br]
#[gzip = false]
#[br = false]
struct WebAssets;

#[route("/{path:.*}", method = "GET", method = "HEAD")]
Expand All @@ -39,7 +39,7 @@ pub async fn serve(path: web::Path<String>) -> EmbedResponse<EmbedableFileRespon

WebAssets::get(path)
.into_response()
.use_compression(Compress::IfPrecompressed)
.use_compression(Compress::Never)
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions src/infra/src/db/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,12 @@ impl super::Db for Etcd {
key: &str,
with_prefix: bool,
_need_watch: bool,
_start_dt: Option<i64>,
start_dt: Option<i64>,
) -> Result<()> {
let key = format!("{}{}", self.prefix, key);
let mut key = format!("{}{}", self.prefix, key);
if start_dt.is_some() {
key = format!("{}/{}", key, start_dt.unwrap());
}
let mut client = get_etcd_client().await.clone();
let opt = with_prefix.then(|| DeleteOptions::new().with_prefix());
let _ = client.delete(key.as_str(), opt).await?.deleted();
Expand Down
1 change: 1 addition & 0 deletions src/infra/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub async fn get_super_cluster() -> &'static Box<dyn Db> {

pub async fn init() -> Result<()> {
etcd::init().await;
create_table().await?;
Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion src/infra/src/db/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ impl super::Db for MysqlDb {

let (min_dt, max_dt) = start_dt.unwrap();
let (module, key1, key2) = super::parse_key(prefix);
let mut sql = "SELECT id, module, key1, key2, start_dt, '' AS value FROM meta".to_string();
let mut sql =
"SELECT id, module, key1, key2, start_dt, value AS value FROM meta".to_string();
if !module.is_empty() {
sql = format!("{} WHERE module = '{}'", sql, module);
}
Expand Down
16 changes: 13 additions & 3 deletions src/infra/src/db/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,29 @@ impl super::Db for NatsDb {
key: &str,
with_prefix: bool,
_need_watch: bool,
_start_dt: Option<i64>,
start_dt: Option<i64>,
) -> Result<()> {
let (bucket, new_key) = get_bucket_by_key(&self.prefix, key).await?;
let with_prefix = if start_dt.is_some() {
false
} else {
with_prefix
};
let new_key = if start_dt.is_some() {
format!("{}/{}", new_key, start_dt.unwrap())
} else {
new_key.to_string()
};
if !with_prefix {
let key = key_encode(new_key);
let key = key_encode(&new_key);
bucket.purge(key).await?;
return Ok(());
}
let mut del_keys = Vec::new();
let mut keys = bucket.keys().await?.boxed();
while let Some(key) = keys.try_next().await? {
let decoded_key = key_decode(&key);
if decoded_key.starts_with(new_key) {
if decoded_key.starts_with(&new_key) {
del_keys.push(key);
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/infra/src/db/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,22 @@ impl super::Db for SqliteDb {
) -> Result<()> {
// event watch
if need_watch {
let with_prefix = if start_dt.is_some() {
false
} else {
with_prefix
};
let new_key = if start_dt.is_some() {
format!("{}/{}", key, start_dt.unwrap())
} else {
key.to_string()
};
// find all keys then send event
let items = if with_prefix {
let db = super::get_db().await;
db.list_keys(key).await?
} else {
vec![key.to_string()]
vec![new_key.to_string()]
};
let tx = CHANNEL.watch_tx.clone();
tokio::task::spawn(async move {
Expand Down
2 changes: 1 addition & 1 deletion src/infra/src/file_list/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zinc Labs Inc.
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
Expand Down
2 changes: 1 addition & 1 deletion src/infra/src/file_list/mysql.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zinc Labs Inc.
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
Expand Down
2 changes: 1 addition & 1 deletion src/infra/src/file_list/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zinc Labs Inc.
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
Expand Down
2 changes: 1 addition & 1 deletion src/infra/src/file_list/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zinc Labs Inc.
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
Expand Down
2 changes: 1 addition & 1 deletion src/infra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ pub mod storage;

pub async fn init() -> Result<(), anyhow::Error> {
db::init().await?;
db::create_table().await?;
cache::init().await?;
file_list::create_table().await?;
queue::init().await?;
scheduler::init().await?;
schema::init().await?;
// because of asynchronous, we need to wait for a while
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(())
Expand Down
13 changes: 2 additions & 11 deletions src/infra/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,11 @@ pub struct Trigger {

/// Initializes the scheduler - creates table and index
pub async fn init() -> Result<()> {
create_table().await?;
create_table_index().await?;
CLIENT.create_table().await?;
CLIENT.create_table_index().await?;
Ok(())
}

/// Creates the Scheduled Jobs table
pub async fn create_table() -> Result<()> {
CLIENT.create_table().await
}

pub async fn create_table_index() -> Result<()> {
CLIENT.create_table_index().await
}

/// Pushes a Trigger job into the queue
#[inline]
pub async fn push(trigger: Trigger) -> Result<()> {
Expand Down
73 changes: 73 additions & 0 deletions src/infra/src/schema/history/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use async_trait::async_trait;
use config::{
meta::{meta_store::MetaStore, stream::StreamType},
CONFIG,
};
use datafusion::arrow::datatypes::Schema;
use once_cell::sync::Lazy;

use crate::errors::Result;

pub mod mysql;
pub mod postgres;
pub mod sqlite;

static CLIENT: Lazy<Box<dyn SchemaHistory>> = Lazy::new(connect);

pub fn connect() -> Box<dyn SchemaHistory> {
match CONFIG.common.meta_store.as_str().into() {
MetaStore::Sqlite => Box::<sqlite::SqliteSchemaHistory>::default(),
MetaStore::Etcd => Box::<sqlite::SqliteSchemaHistory>::default(),
MetaStore::Nats => Box::<sqlite::SqliteSchemaHistory>::default(),
MetaStore::MySQL => Box::<mysql::MysqlSchemaHistory>::default(),
MetaStore::PostgreSQL => Box::<postgres::PostgresSchemaHistory>::default(),
}
}

#[async_trait]
pub trait SchemaHistory: Sync + Send + 'static {
async fn create_table(&self) -> Result<()>;
async fn create_table_index(&self) -> Result<()>;
async fn create(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
start_dt: i64,
schema: Schema,
) -> Result<()>;
}

pub async fn init() -> Result<()> {
CLIENT.create_table().await?;
CLIENT.create_table_index().await?;
Ok(())
}

#[inline]
pub async fn create(
org_id: &str,
stream_type: StreamType,
stream_name: &str,
start_dt: i64,
schema: Schema,
) -> Result<()> {
CLIENT
.create(org_id, stream_type, stream_name, start_dt, schema)
.await
}
135 changes: 135 additions & 0 deletions src/infra/src/schema/history/mysql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use async_trait::async_trait;
use config::{meta::stream::StreamType, utils::json};
use datafusion::arrow::datatypes::Schema;

use crate::{
db::mysql::CLIENT,
errors::{Error, Result},
};

pub struct MysqlSchemaHistory {}

impl MysqlSchemaHistory {
pub fn new() -> Self {
Self {}
}
}

impl Default for MysqlSchemaHistory {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl super::SchemaHistory for MysqlSchemaHistory {
async fn create_table(&self) -> Result<()> {
create_table().await
}

async fn create_table_index(&self) -> Result<()> {
create_table_index().await
}

async fn create(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
start_dt: i64,
schema: Schema,
) -> Result<()> {
let value = json::to_string(&schema)?;
let pool = CLIENT.clone();
match sqlx::query(
r#"
INSERT IGNORE INTO schema_history (org, stream_type, stream_name, start_dt, value)
VALUES (?, ?, ?, ?, ?);
"#,
)
.bind(org_id)
.bind(stream_type.to_string())
.bind(stream_name)
.bind(start_dt)
.bind(value)
.execute(&pool)
.await
{
Err(sqlx::Error::Database(e)) => {
if e.is_unique_violation() {
Ok(())
} else {
Err(Error::Message(e.to_string()))
}
}
Err(e) => Err(e.into()),
Ok(_) => Ok(()),
}
}
}

pub async fn create_table() -> Result<()> {
let pool = CLIENT.clone();
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS schema_history
(
id BIGINT not null primary key AUTO_INCREMENT,
org VARCHAR(100) not null,
stream_type VARCHAR(32) not null,
stream_name VARCHAR(256) not null,
start_dt BIGINT not null,
value LONGTEXT not null
);
"#,
)
.execute(&pool)
.await?;

Ok(())
}

pub async fn create_table_index() -> Result<()> {
let pool = CLIENT.clone();
let sqls = vec![
(
"schema_history",
"CREATE INDEX schema_history_org_idx on schema_history (org);",
),
(
"schema_history",
"CREATE INDEX schema_history_stream_idx on schema_history (org, stream_type, stream_name);",
),
(
"schema_history",
"CREATE UNIQUE INDEX schema_history_stream_version_idx on schema_history (org, stream_type, stream_name, start_dt);",
),
];
for (table, sql) in sqls {
if let Err(e) = sqlx::query(sql).execute(&pool).await {
if e.to_string().contains("Duplicate key") {
// index already exists
continue;
}
log::error!("[MYSQL] create table {} index error: {}", table, e);
return Err(e.into());
}
}

Ok(())
}

0 comments on commit 3b1088f

Please sign in to comment.