Skip to content

Remove in memory ingestion and query #413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 10 additions & 91 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,21 @@
*/

mod file_writer;
mod mem_writer;

use std::{
collections::HashMap,
sync::{Mutex, RwLock},
};

use crate::{
option::CONFIG,
storage::staging::{self, ReadBuf},
};

use self::{errors::StreamWriterError, file_writer::FileWriter};
use arrow_array::RecordBatch;
use chrono::{NaiveDateTime, Utc};
use derive_more::{Deref, DerefMut};
use mem_writer::MemWriter;
use once_cell::sync::Lazy;

type InMemWriter = MemWriter<8192>;

pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);

pub enum StreamWriter {
Mem(InMemWriter),
Disk(FileWriter, InMemWriter),
}

impl StreamWriter {
pub fn push(
&mut self,
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
) -> Result<(), StreamWriterError> {
match self {
StreamWriter::Mem(mem) => {
mem.push(schema_key, rb);
}
StreamWriter::Disk(disk, mem) => {
disk.push(stream_name, schema_key, &rb)?;
mem.push(schema_key, rb);
}
}
Ok(())
}
}

// Each entry in writer table is initialized with some context
// This is helpful for generating prefix when writer is finalized
pub struct WriterContext {
stream_name: String,
time: NaiveDateTime,
}

#[derive(Deref, DerefMut, Default)]
pub struct WriterTable(RwLock<HashMap<String, (Mutex<StreamWriter>, WriterContext)>>);
pub struct WriterTable(RwLock<HashMap<String, Mutex<FileWriter>>>);

impl WriterTable {
// append to a existing stream
Expand All @@ -87,36 +45,26 @@ impl WriterTable {
let hashmap_guard = self.read().unwrap();

match hashmap_guard.get(stream_name) {
Some((stream_writer, _)) => {
Some(stream_writer) => {
stream_writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
.push(stream_name, schema_key, &record)?;
}
None => {
drop(hashmap_guard);
let mut map = self.write().unwrap();
// check for race condition
// if map contains entry then just
if let Some((writer, _)) = map.get(stream_name) {
if let Some(writer) = map.get(stream_name) {
writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
.push(stream_name, schema_key, &record)?;
} else {
// there is no entry so this can be inserted safely
let context = WriterContext {
stream_name: stream_name.to_owned(),
time: Utc::now().naive_utc(),
};
let mut writer = if CONFIG.parseable.in_mem_ingestion {
StreamWriter::Mem(InMemWriter::default())
} else {
StreamWriter::Disk(FileWriter::default(), InMemWriter::default())
};

writer.push(stream_name, schema_key, record)?;
map.insert(stream_name.to_owned(), (Mutex::new(writer), context));
let mut writer = FileWriter::default();
writer.push(stream_name, schema_key, &record)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
};
Expand All @@ -131,40 +79,11 @@ impl WriterTable {
let mut table = self.write().unwrap();
let map = std::mem::take(&mut *table);
drop(table);
for (writer, context) in map.into_values() {
for writer in map.into_values() {
let writer = writer.into_inner().unwrap();
match writer {
StreamWriter::Mem(mem) => {
let rb = mem.finalize();
let mut read_bufs = staging::MEMORY_READ_BUFFERS.write().unwrap();

read_bufs
.entry(context.stream_name)
.or_insert(Vec::default())
.push(ReadBuf {
time: context.time,
buf: rb,
});
}
StreamWriter::Disk(disk, _) => disk.close_all(),
}
writer.close_all();
}
}

pub fn clone_read_buf(&self, stream_name: &str) -> Option<ReadBuf> {
let hashmap_guard = self.read().unwrap();
let (writer, context) = hashmap_guard.get(stream_name)?;
let writer = writer.lock().unwrap();
let mem = match &*writer {
StreamWriter::Mem(mem) => mem,
StreamWriter::Disk(_, mem) => mem,
};

Some(ReadBuf {
time: context.time,
buf: mem.recordbatch_cloned(),
})
}
}

pub mod errors {
Expand Down
118 changes: 0 additions & 118 deletions server/src/event/writer/mem_writer.rs

This file was deleted.

44 changes: 3 additions & 41 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,23 @@
*
*/

pub mod table_provider;

use chrono::TimeZone;
use chrono::{DateTime, Utc};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::TableProvider;
use datafusion::prelude::*;
use itertools::Itertools;
use serde_json::Value;
use std::path::Path;
use std::sync::Arc;

use crate::event::STREAM_WRITERS;
use crate::option::CONFIG;
use crate::storage::staging::{ReadBuf, MEMORY_READ_BUFFERS};
use crate::storage::ObjectStorageError;
use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
use crate::utils::TimePeriod;
use crate::validator;

use self::error::{ExecuteError, ParseError};
use self::table_provider::QueryTableProvider;

type Key = &'static str;
fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
Expand Down Expand Up @@ -90,18 +84,10 @@ impl Query {
);

let prefixes = self.get_prefixes();
let table = QueryTableProvider::new(
prefixes,
storage,
get_all_read_buf(&self.stream_name, self.start, self.end),
Arc::clone(&self.schema),
);
let Some(table) = storage.query_table(prefixes, Arc::clone(&self.schema))? else { return Ok((Vec::new(), Vec::new())) };

ctx.register_table(
&*self.stream_name,
Arc::new(table) as Arc<dyn TableProvider>,
)
.map_err(ObjectStorageError::DataFusionError)?;
ctx.register_table(&*self.stream_name, Arc::new(table))
.map_err(ObjectStorageError::DataFusionError)?;
// execute the query and collect results
let df = ctx.sql(self.query.as_str()).await?;
// dataframe qualifies name by adding table name before columns. \
Expand Down Expand Up @@ -181,30 +167,6 @@ pub mod error {
}
}

fn get_all_read_buf(stream_name: &str, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<ReadBuf> {
let now = Utc::now();
let include_mutable = start <= now && now <= end;
// copy from mutable buffer
let mut queryable_read_buffer = Vec::new();

if let Some(mem) = MEMORY_READ_BUFFERS.read().unwrap().get(stream_name) {
for read_buffer in mem {
let time = read_buffer.time;
if start.naive_utc() <= time && time <= end.naive_utc() {
queryable_read_buffer.push(read_buffer.clone())
}
}
}

if include_mutable {
if let Some(x) = STREAM_WRITERS.clone_read_buf(stream_name) {
queryable_read_buffer.push(x);
}
}

queryable_read_buffer
}

#[cfg(test)]
mod tests {
use super::time_from_path;
Expand Down
Loading