1818import reactor .core .publisher .Flux ;
1919import reactor .core .publisher .Mono ;
2020
21- import java .util .List ;
22-
2321import org .reactivestreams .Publisher ;
2422
25- import org .springframework .dao .TransientDataAccessResourceException ;
2623import org .springframework .data .r2dbc .convert .R2dbcConverter ;
2724import org .springframework .data .r2dbc .core .DatabaseClient ;
28- import org .springframework .data .r2dbc .core .PreparedOperation ;
25+ import org .springframework .data .r2dbc .core .R2dbcEntityOperations ;
26+ import org .springframework .data .r2dbc .core .R2dbcEntityTemplate ;
2927import org .springframework .data .r2dbc .core .ReactiveDataAccessStrategy ;
30- import org .springframework .data .r2dbc .core .StatementMapper ;
3128import org .springframework .data .r2dbc .query .Criteria ;
29+ import org .springframework .data .r2dbc .query .Query ;
3230import org .springframework .data .relational .core .mapping .RelationalPersistentProperty ;
3331import org .springframework .data .relational .core .sql .Functions ;
3432import org .springframework .data .relational .core .sql .Select ;
3836import org .springframework .data .relational .core .sql .render .SqlRenderer ;
3937import org .springframework .data .relational .repository .query .RelationalEntityInformation ;
4038import org .springframework .data .repository .reactive .ReactiveCrudRepository ;
39+ import org .springframework .data .util .Lazy ;
4140import org .springframework .transaction .annotation .Transactional ;
4241import org .springframework .util .Assert ;
4342
5150public class SimpleR2dbcRepository <T , ID > implements ReactiveCrudRepository <T , ID > {
5251
5352 private final RelationalEntityInformation <T , ID > entity ;
54- private final DatabaseClient databaseClient ;
55- private final R2dbcConverter converter ;
56- private final ReactiveDataAccessStrategy accessStrategy ;
53+ private final R2dbcEntityOperations entityOperations ;
54+ private final Lazy <RelationalPersistentProperty > idProperty ;
55+
56+ /**
57+ * Create a new {@link SimpleR2dbcRepository}.
58+ *
59+ * @param entity
60+ * @param entityOperations
61+ * @param converter
62+ * @since 1.1
63+ */
64+ SimpleR2dbcRepository (RelationalEntityInformation <T , ID > entity , R2dbcEntityOperations entityOperations ,
65+ R2dbcConverter converter ) {
66+
67+ this .entity = entity ;
68+ this .entityOperations = entityOperations ;
69+ this .idProperty = Lazy .of (() -> converter //
70+ .getMappingContext () //
71+ .getRequiredPersistentEntity (this .entity .getJavaType ()) //
72+ .getRequiredIdProperty ());
73+ }
5774
75+ /**
76+ * Create a new {@link SimpleR2dbcRepository}.
77+ *
78+ * @param entity
79+ * @param databaseClient
80+ * @param converter
81+ * @param accessStrategy
82+ */
5883 public SimpleR2dbcRepository (RelationalEntityInformation <T , ID > entity , DatabaseClient databaseClient ,
5984 R2dbcConverter converter , ReactiveDataAccessStrategy accessStrategy ) {
85+
6086 this .entity = entity ;
61- this .databaseClient = databaseClient ;
62- this .converter = converter ;
63- this .accessStrategy = accessStrategy ;
87+ this .entityOperations = new R2dbcEntityTemplate (databaseClient );
88+ this .idProperty = Lazy .of (() -> converter //
89+ .getMappingContext () //
90+ .getRequiredPersistentEntity (this .entity .getJavaType ()) //
91+ .getRequiredIdProperty ());
6492 }
6593
6694 /* (non-Javadoc)
@@ -73,28 +101,10 @@ public <S extends T> Mono<S> save(S objectToSave) {
73101 Assert .notNull (objectToSave , "Object to save must not be null!" );
74102
75103 if (this .entity .isNew (objectToSave )) {
76-
77- return this .databaseClient .insert () //
78- .into (this .entity .getJavaType ()) //
79- .table (this .entity .getTableName ()).using (objectToSave ) //
80- .map (this .converter .populateIdIfNecessary (objectToSave )) //
81- .first () //
82- .defaultIfEmpty (objectToSave );
104+ return this .entityOperations .insert (objectToSave );
83105 }
84106
85- return this .databaseClient .update () //
86- .table (this .entity .getJavaType ()) //
87- .table (this .entity .getTableName ()).using (objectToSave ) //
88- .fetch ().rowsUpdated ().handle ((rowsUpdated , sink ) -> {
89-
90- if (rowsUpdated == 0 ) {
91- sink .error (new TransientDataAccessResourceException (
92- String .format ("Failed to update table [%s]. Row with Id [%s] does not exist." ,
93- this .entity .getTableName (), this .entity .getId (objectToSave ))));
94- } else {
95- sink .next (objectToSave );
96- }
97- });
107+ return this .entityOperations .update (objectToSave );
98108 }
99109
100110 /* (non-Javadoc)
@@ -129,20 +139,7 @@ public Mono<T> findById(ID id) {
129139
130140 Assert .notNull (id , "Id must not be null!" );
131141
132- List <SqlIdentifier > columns = this .accessStrategy .getAllColumns (this .entity .getJavaType ());
133- String idProperty = getIdProperty ().getName ();
134-
135- StatementMapper mapper = this .accessStrategy .getStatementMapper ().forType (this .entity .getJavaType ());
136- StatementMapper .SelectSpec selectSpec = mapper .createSelect (this .entity .getTableName ()) //
137- .withProjection (columns ) //
138- .withCriteria (Criteria .where (idProperty ).is (id ));
139-
140- PreparedOperation <?> operation = mapper .getMappedObject (selectSpec );
141-
142- return this .databaseClient .execute (operation ) //
143- .as (this .entity .getJavaType ()) //
144- .fetch () //
145- .one ();
142+ return this .entityOperations .selectOne (getIdQuery (id ), this .entity .getJavaType ());
146143 }
147144
148145 /* (non-Javadoc)
@@ -161,18 +158,7 @@ public Mono<Boolean> existsById(ID id) {
161158
162159 Assert .notNull (id , "Id must not be null!" );
163160
164- String idProperty = getIdProperty ().getName ();
165-
166- StatementMapper mapper = this .accessStrategy .getStatementMapper ().forType (this .entity .getJavaType ());
167- StatementMapper .SelectSpec selectSpec = mapper .createSelect (this .entity .getTableName ()).withProjection (idProperty ) //
168- .withCriteria (Criteria .where (idProperty ).is (id ));
169-
170- PreparedOperation <?> operation = mapper .getMappedObject (selectSpec );
171-
172- return this .databaseClient .execute (operation ) //
173- .map ((r , md ) -> r ) //
174- .first () //
175- .hasElement ();
161+ return this .entityOperations .exists (getIdQuery (id ), this .entity .getJavaType ());
176162 }
177163
178164 /* (non-Javadoc)
@@ -188,7 +174,7 @@ public Mono<Boolean> existsById(Publisher<ID> publisher) {
188174 */
189175 @ Override
190176 public Flux <T > findAll () {
191- return this .databaseClient .select (). from ( this .entity .getJavaType ()). fetch (). all ( );
177+ return this .entityOperations .select (Query . empty (), this .entity .getJavaType ());
192178 }
193179
194180 /* (non-Javadoc)
@@ -216,17 +202,9 @@ public Flux<T> findAllById(Publisher<ID> idPublisher) {
216202 return Flux .empty ();
217203 }
218204
219- List <SqlIdentifier > columns = this .accessStrategy .getAllColumns (this .entity .getJavaType ());
220205 String idProperty = getIdProperty ().getName ();
221206
222- StatementMapper mapper = this .accessStrategy .getStatementMapper ().forType (this .entity .getJavaType ());
223- StatementMapper .SelectSpec selectSpec = mapper .createSelect (this .entity .getTableName ()) //
224- .withProjection (columns ) //
225- .withCriteria (Criteria .where (idProperty ).in (ids ));
226-
227- PreparedOperation <?> operation = mapper .getMappedObject (selectSpec );
228-
229- return this .databaseClient .execute (operation ).as (this .entity .getJavaType ()).fetch ().all ();
207+ return this .entityOperations .select (Query .query (Criteria .where (idProperty ).in (ids )), this .entity .getJavaType ());
230208 });
231209 }
232210
@@ -235,17 +213,7 @@ public Flux<T> findAllById(Publisher<ID> idPublisher) {
235213 */
236214 @ Override
237215 public Mono <Long > count () {
238-
239- Table table = Table .create (this .accessStrategy .toSql (this .entity .getTableName ()));
240- Select select = StatementBuilder //
241- .select (Functions .count (table .column (this .accessStrategy .toSql (getIdProperty ().getColumnName ())))) //
242- .from (table ) //
243- .build ();
244-
245- return this .databaseClient .execute (SqlRenderer .toString (select )) //
246- .map ((r , md ) -> r .get (0 , Long .class )) //
247- .first () //
248- .defaultIfEmpty (0L );
216+ return this .entityOperations .count (Query .empty (), this .entity .getJavaType ());
249217 }
250218
251219 /* (non-Javadoc)
@@ -257,13 +225,7 @@ public Mono<Void> deleteById(ID id) {
257225
258226 Assert .notNull (id , "Id must not be null!" );
259227
260- return this .databaseClient .delete () //
261- .from (this .entity .getJavaType ()) //
262- .table (this .entity .getTableName ()) //
263- .matching (Criteria .where (getIdProperty ().getName ()).is (id )) //
264- .fetch () //
265- .rowsUpdated () //
266- .then ();
228+ return this .entityOperations .delete (getIdQuery (id ), this .entity .getJavaType ()).then ();
267229 }
268230
269231 /* (non-Javadoc)
@@ -274,20 +236,16 @@ public Mono<Void> deleteById(ID id) {
274236 public Mono <Void > deleteById (Publisher <ID > idPublisher ) {
275237
276238 Assert .notNull (idPublisher , "The Id Publisher must not be null!" );
277- StatementMapper statementMapper = this .accessStrategy .getStatementMapper ().forType (this .entity .getJavaType ());
278239
279240 return Flux .from (idPublisher ).buffer ().filter (ids -> !ids .isEmpty ()).concatMap (ids -> {
280241
281242 if (ids .isEmpty ()) {
282243 return Flux .empty ();
283244 }
284245
285- return this .databaseClient .delete () //
286- .from (this .entity .getJavaType ()) //
287- .table (this .entity .getTableName ()) //
288- .matching (Criteria .where (getIdProperty ().getName ()).in (ids )) //
289- .fetch () //
290- .rowsUpdated ();
246+ String idProperty = getIdProperty ().getName ();
247+
248+ return this .entityOperations .delete (Query .query (Criteria .where (idProperty ).in (ids )), this .entity .getJavaType ());
291249 }).then ();
292250 }
293251
@@ -336,14 +294,14 @@ public Mono<Void> deleteAll(Publisher<? extends T> objectPublisher) {
336294 @ Override
337295 @ Transactional
338296 public Mono <Void > deleteAll () {
339- return this .databaseClient .delete (). from ( this .entity .getTableName ()).then ();
297+ return this .entityOperations .delete (Query . empty (), this .entity .getJavaType ()).then ();
340298 }
341299
342300 private RelationalPersistentProperty getIdProperty () {
301+ return this .idProperty .get ();
302+ }
343303
344- return this .converter //
345- .getMappingContext () //
346- .getRequiredPersistentEntity (this .entity .getJavaType ()) //
347- .getRequiredIdProperty ();
304+ private Query getIdQuery (Object id ) {
305+ return Query .query (Criteria .where (getIdProperty ().getName ()).is (id ));
348306 }
349307}
0 commit comments