Skip to content

Commit 34e8059

Browse files
refactor
1 parent ba7c3a7 commit 34e8059

File tree

1 file changed

+49
-39
lines changed

1 file changed

+49
-39
lines changed

src/migration/stream_metadata_migration.rs

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,8 @@ pub fn v4_v5(mut stream_metadata: Value, stream_name: &str) -> Value {
179179
}
180180

181181
pub fn v5_v6(mut stream_metadata: Value) -> Value {
182-
let stream_metadata_map: &mut serde_json::Map<String, Value> =
183-
stream_metadata.as_object_mut().unwrap();
182+
let stream_metadata_map = stream_metadata.as_object_mut().unwrap();
183+
184184
stream_metadata_map.insert(
185185
"objectstore-format".to_owned(),
186186
Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()),
@@ -189,54 +189,52 @@ pub fn v5_v6(mut stream_metadata: Value) -> Value {
189189
"version".to_owned(),
190190
Value::String(storage::CURRENT_SCHEMA_VERSION.into()),
191191
);
192-
if let Some(log_source) = stream_metadata_map.remove("log_source") {
193-
if let Some(format_str) = log_source.as_str() {
194-
let transformed_format = match format_str {
195-
"Kinesis" => "kinesis",
196-
"OtelLogs" => "otel-logs",
197-
"OtelTraces" => "otel-traces",
198-
"OtelMetrics" => "otel-metrics",
199-
"Pmeta" => "pmeta",
200-
"Json" => "json",
201-
_ => "json",
202-
};
203192

204-
let log_source_entry = json!({
205-
"log_source_format": transformed_format,
206-
"fields": []
207-
});
193+
// Transform or add log_source
194+
let log_source_entry = match stream_metadata_map.remove("log_source") {
195+
Some(log_source) => transform_log_source(log_source),
196+
None => default_log_source_entry(),
197+
};
208198

209-
stream_metadata_map.insert("log_source".to_owned(), json!([log_source_entry]));
210-
} else {
211-
let default_entry = json!({
212-
"log_source_format": "json",
213-
"fields": []
214-
});
199+
stream_metadata_map.insert("log_source".to_owned(), json!([log_source_entry]));
215200

216-
stream_metadata_map.insert("log_source".to_owned(), json!([default_entry]));
217-
}
218-
} else {
219-
let default_entry = json!({
220-
"log_source_format": "json",
201+
stream_metadata
202+
}
203+
204+
fn transform_log_source(log_source: Value) -> Value {
205+
if let Some(format_str) = log_source.as_str() {
206+
let transformed_format = map_log_source_format(format_str);
207+
json!({
208+
"log_source_format": transformed_format,
221209
"fields": []
222-
});
210+
})
211+
} else {
212+
default_log_source_entry()
213+
}
214+
}
223215

224-
stream_metadata_map.insert("log_source".to_owned(), json!([default_entry]));
216+
fn map_log_source_format(format_str: &str) -> &str {
217+
match format_str {
218+
"Kinesis" => "kinesis",
219+
"OtelLogs" => "otel-logs",
220+
"OtelTraces" => "otel-traces",
221+
"OtelMetrics" => "otel-metrics",
222+
"Pmeta" => "pmeta",
223+
"Json" => "json",
224+
_ => "json",
225225
}
226+
}
226227

227-
stream_metadata
228+
fn default_log_source_entry() -> Value {
229+
json!({
230+
"log_source_format": "json",
231+
"fields": []
232+
})
228233
}
229234

230235
pub fn rename_log_source_v6(mut stream_metadata: Value) -> Value {
231-
let mut format_mapping = HashMap::new();
232-
format_mapping.insert("Kinesis", "kinesis");
233-
format_mapping.insert("OtelLogs", "otel-logs");
234-
format_mapping.insert("OtelTraces", "otel-traces");
235-
format_mapping.insert("OtelMetrics", "otel-metrics");
236-
format_mapping.insert("Pmeta", "pmeta");
237-
format_mapping.insert("Json", "json");
236+
let format_mapping = create_format_mapping();
238237

239-
// Transform log_source_format in each log_source entry if it exists
240238
if let Some(log_sources) = stream_metadata
241239
.get_mut("log_source")
242240
.and_then(|v| v.as_array_mut())
@@ -251,9 +249,21 @@ pub fn rename_log_source_v6(mut stream_metadata: Value) -> Value {
251249
}
252250
}
253251
}
252+
254253
stream_metadata
255254
}
256255

256+
fn create_format_mapping() -> HashMap<&'static str, &'static str> {
257+
HashMap::from([
258+
("Kinesis", "kinesis"),
259+
("OtelLogs", "otel-logs"),
260+
("OtelTraces", "otel-traces"),
261+
("OtelMetrics", "otel-metrics"),
262+
("Pmeta", "pmeta"),
263+
("Json", "json"),
264+
])
265+
}
266+
257267
fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value {
258268
let manifest_list = snapshot.get("manifest_list").unwrap();
259269
let mut new_manifest_list = Vec::new();

0 commit comments

Comments
 (0)