/
write_records_to_mongodb_database.rs
83 lines (67 loc) · 1.96 KB
/
write_records_to_mongodb_database.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use anyhow::Result;
use mongodb::{
bson::{doc, oid::ObjectId},
sync::Client,
};
use serde::{Deserialize, Serialize};
use spring_batch_rs::{
core::{
item::{ItemProcessor, ItemProcessorResult},
step::{Step, StepBuilder, StepInstance},
},
item::csv::csv_reader::CsvItemReaderBuilder,
item::mongodb::{mongodb_reader::WithObjectId, mongodb_writer::MongodbItemWriterBuilder},
};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Book {
#[serde(rename = "oid")]
id: ObjectId,
title: String,
author: String,
}
impl WithObjectId for Book {
fn get_id(&self) -> ObjectId {
self.id
}
}
#[derive(Serialize, Deserialize, Clone)]
struct FormattedBook {
title: String,
author: String,
}
#[derive(Default)]
struct FormatBookProcessor {}
impl ItemProcessor<Book, FormattedBook> for FormatBookProcessor {
fn process(&self, item: &Book) -> ItemProcessorResult<FormattedBook> {
let book = FormattedBook {
title: item.title.replace(" ", "_").to_uppercase(),
author: item.author.replace(" ", "_").to_uppercase(),
};
Ok(book)
}
}
fn main() -> Result<()> {
let url = format!("mongodb://127.0.0.1:27017/");
let client: Client = Client::with_uri_str(&url).unwrap();
let db = client.database("test");
let book_collection = db.collection::<FormattedBook>("books");
// Prepare reader
let csv = "title,author
Shining,Stephen King
UN SAC DE BILLES,JOSEPH JOFFO";
let reader = CsvItemReaderBuilder::new()
.has_headers(true)
.from_reader(csv.as_bytes());
// Prepare writer
let writer = MongodbItemWriterBuilder::new()
.collection(&book_collection)
.build();
// Execute process
let step: StepInstance<FormattedBook, FormattedBook> = StepBuilder::new()
.reader(&reader)
.writer(&writer)
.chunk(3)
.build();
let _result = step.execute();
Ok(())
}