diff --git a/server/src/event.rs b/server/src/event.rs index b8b4672b6..3b06ca4e4 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -230,7 +230,7 @@ impl Event { // schema is then enforced on rest of the events sent to this log stream. fn process_first_event( &self, - mut event: json::Reader, + event: json::Reader, schema: Schema, ) -> Result<(), EventError> { // note for functions _schema_with_map and _set_schema_with_map, @@ -256,8 +256,7 @@ impl Event { // Store record batch on local cache log::info!("creating local writer for this first event"); - let rb = event.next()?.ok_or(EventError::MissingRecord)?; - STREAM_WRITERS::append_to_local(stream_name, &rb)?; + self.process_event(event)?; log::info!("schema is set in memory map for logstream {}", stream_name); _set_schema_with_map(stream_name, schema.clone(), &mut stream_metadata); @@ -298,10 +297,7 @@ impl Event { mut event: json::Reader, ) -> Result<(), EventError> { let rb = event.next()?.ok_or(EventError::MissingRecord)?; - let stream_name = &self.stream_name; - - STREAM_WRITERS::append_to_local(stream_name, &rb)?; - + STREAM_WRITERS::append_to_local(&self.stream_name, &rb)?; Ok(()) }