/
AbstractSqlManager.java
381 lines (335 loc) · 14.8 KB
/
AbstractSqlManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
/**
*
* Copyright (c) 2006-2016, Speedment, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); You may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.internal.core.manager.sql;
import com.speedment.Speedment;
import com.speedment.config.db.Column;
import com.speedment.config.db.Dbms;
import com.speedment.config.db.PrimaryKeyColumn;
import com.speedment.config.db.Table;
import com.speedment.config.db.parameters.DbmsType;
import com.speedment.internal.core.manager.AbstractManager;
import com.speedment.db.MetaResult;
import com.speedment.internal.core.manager.metaresult.SqlMetaResultImpl;
import com.speedment.db.AsynchronousQueryResult;
import com.speedment.db.DbmsHandler;
import com.speedment.db.SqlFunction;
import com.speedment.exception.SpeedmentException;
import com.speedment.config.db.mapper.TypeMapper;
import com.speedment.db.DatabaseNamingConvention;
import com.speedment.internal.core.stream.builder.ReferenceStreamBuilder;
import com.speedment.internal.core.stream.builder.pipeline.PipelineImpl;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Optional;
import com.speedment.internal.util.LazyString;
import com.speedment.stream.StreamDecorator;
import static com.speedment.internal.util.document.DocumentDbUtil.dbmsTypeOf;
import static com.speedment.internal.util.document.DocumentUtil.ancestor;
import static com.speedment.internal.core.stream.OptionalUtil.unwrap;
import static com.speedment.util.NullUtil.requireNonNulls;
import static java.util.Objects.requireNonNull;
/**
*
* @author pemi
*
* @param <ENTITY> Entity type for this Manager
*/
public abstract class AbstractSqlManager<ENTITY> extends AbstractManager<ENTITY> implements SqlManager<ENTITY> {
private final LazyString sqlColumnList;
private final LazyString sqlColumnListQuestionMarks;
private SqlFunction<ResultSet, ENTITY> entityMapper;
protected AbstractSqlManager(Speedment speedment) {
super(speedment);
this.sqlColumnList = LazyString.create();
this.sqlColumnListQuestionMarks = LazyString.create();
}
@Override
public Stream<ENTITY> nativeStream(StreamDecorator decorator) {
final AsynchronousQueryResult<ENTITY> asynchronousQueryResult = decorator.apply(dbmsHandler().executeQueryAsync(sqlSelect(), Collections.emptyList(), entityMapper.unWrap()));
final SqlStreamTerminator<ENTITY> terminator = new SqlStreamTerminator<>(this, asynchronousQueryResult, decorator);
final Supplier<BaseStream<?, ?>> initialSupplier = () -> decorator.applyOnInitial(asynchronousQueryResult.stream());
final Stream<ENTITY> result = decorator.applyOnFinal(new ReferenceStreamBuilder<>(new PipelineImpl<>(initialSupplier), terminator));
// Make sure we are closing the ResultSet, Statement and Connection later
result.onClose(asynchronousQueryResult::close);
return result;
}
public <T> Stream<T> synchronousStreamOf(String sql, List<Object> values, SqlFunction<ResultSet, T> rsMapper) {
requireNonNulls(sql, values, rsMapper);
return dbmsHandler().executeQuery(sql, values, rsMapper);
}
/**
* Counts the number of elements in the current table by querying the
* database.
*
* @return the number of elements in the table
*/
public long count() {
return synchronousStreamOf(
"select count(*) from " + sqlTableReference(),
Collections.emptyList(),
rs -> rs.getLong(1)
).findAny().get();
}
/**
* Returns a {@code SELECT/FROM} SQL statement with the full column list and
* the current table specified in accordance to the current {@link DbmsType}.
* The specified statement will not have any trailing spaces or semicolons.
* <p>
* <b>Example:</b>
* <code>SELECT `id`, `name` FROM `myschema`.`users`</code>
*
* @return the SQL statement
*/
public String sqlSelect() {
return "select " + sqlColumnList() + " from " + sqlTableReference();
}
@Override
public SqlFunction<ResultSet, ENTITY> getEntityMapper() {
return entityMapper;
}
@Override
public void setEntityMapper(SqlFunction<ResultSet, ENTITY> entityMapper) {
this.entityMapper = requireNonNull(entityMapper);
}
@Override
public ENTITY persist(ENTITY entity) throws SpeedmentException {
return persistHelp(entity, Optional.empty());
}
@Override
public ENTITY persist(ENTITY entity, Consumer<MetaResult<ENTITY>> listener) throws SpeedmentException {
requireNonNulls(entity, listener);
return persistHelp(entity, Optional.of(listener));
}
@Override
public ENTITY update(ENTITY entity) {
requireNonNull(entity);
return updateHelper(entity, Optional.empty());
}
@Override
public ENTITY update(ENTITY entity, Consumer<MetaResult<ENTITY>> listener) throws SpeedmentException {
requireNonNulls(entity, listener);
return updateHelper(entity, Optional.of(listener));
}
@Override
public ENTITY remove(ENTITY entity) {
requireNonNull(entity);
return removeHelper(entity, Optional.empty());
}
@Override
public ENTITY remove(ENTITY entity, Consumer<MetaResult<ENTITY>> listener) throws SpeedmentException {
requireNonNulls(entity, listener);
return removeHelper(entity, Optional.of(listener));
}
/**
* Short-cut for retrieving the current {@link Dbms}.
*
* @return the current dbms
*/
protected final Dbms getDbms() {
return ancestor(getTable(), Dbms.class).get();
}
/**
* Short-cut for retrieving the current {@link DbmsType}.
*
* @return the current dbms type
*/
protected final DbmsType getDbmsType() {
return dbmsTypeOf(speedment, getDbms());
}
/**
* Short-cut for retrieving the current {@link DbmsHandler}.
*
* @return the current dbms handler
*/
protected final DbmsHandler dbmsHandler() {
return speedment.getDbmsHandlerComponent().get(getDbms());
}
/**
* Short-cut for retrieving the current {@link DatabaseNamingConvention}.
*
* @return the current naming convention
*/
protected final DatabaseNamingConvention naming() {
return getDbmsType().getDatabaseNamingConvention();
}
/**
* Returns a comma separated list of column names, fully formatted in
* accordance to the current {@link DbmsType}.
*
* @return the comma separated column list
*/
private String sqlColumnList() {
return sqlColumnList.getOrCompute(() -> sqlColumnList(c -> c));
}
/**
* Returns a comma separated list of question marks (?), fully formatted in
* accordance to the current {@link DbmsType} to represent column value
* wildcards.
*
* @return the comma separated question mark list
*/
private String sqlColumnListWithQuestionMarks() {
return sqlColumnListQuestionMarks.getOrCompute(() -> sqlColumnList(c -> "?"));
}
/**
* Returns a {@code AND} separated list of {@link PrimaryKeyColumn} database
* names, formatted in accordance to the current {@link DbmsType}.
*
* @return list of fully quoted primary key column names
*/
protected String sqlColumnList(Function<String, String> postMapper) {
requireNonNull(postMapper);
return getTable().columns()
.map(naming()::fullNameOf)
.map(postMapper)
.collect(Collectors.joining(","));
}
/**
* Returns a {@code AND} separated list of {@link PrimaryKeyColumn} database
* names, formatted in accordance to the current {@link DbmsType}.
*
* @return list of fully quoted primary key column names
*/
private String sqlPrimaryKeyColumnList(Function<String, String> postMapper) {
requireNonNull(postMapper);
return getTable().primaryKeyColumns()
.map(naming()::fullNameOf)
.map(postMapper)
.collect(Collectors.joining(" AND "));
}
/**
* Returns the full name of a table formatted in accordance to the current
* {@link DbmsType}. The returned value will be within quotes if that is
* what the database expects.
*
* @return the full quoted table name
*/
protected String sqlTableReference() {
return naming().fullNameOf(getTable());
}
private final Function<ENTITY, Consumer<List<Long>>> NOTHING =
b -> l -> { /* Nothing to do for updates... */ };
private Object toDatabaseType(Column column, ENTITY entity) {
final Object javaValue = unwrap(get(entity, column));
@SuppressWarnings("unchecked")
final Object dbValue = ((TypeMapper<Object, Object>) column.findTypeMapper()).toDatabaseType(javaValue);
return dbValue;
}
private ENTITY persistHelp(ENTITY entity, Optional<Consumer<MetaResult<ENTITY>>> listener) throws SpeedmentException {
final Table table = getTable();
final StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(sqlTableReference());
sb.append(" (").append(sqlColumnList()).append(")");
sb.append(" values ");
sb.append("(").append(sqlColumnListWithQuestionMarks()).append(")");
final List<Object> values = table.columns()
.map(c -> toDatabaseType(c, entity))
.collect(Collectors.toList());
final Function<ENTITY, Consumer<List<Long>>> generatedKeyconsumer = builder -> {
return l -> {
if (!l.isEmpty()) {
final AtomicInteger cnt = new AtomicInteger();
// Just assume that they are in order, what else is there to do?
table.columns()
.filter(Column::isAutoIncrement)
.forEachOrdered(column -> {
// Cast from Long to the column target type
final Object val = speedment
.getJavaTypeMapperComponent()
.apply(column.findTypeMapper().getJavaType())
.parse(
l.get(cnt.getAndIncrement())
);
//final Object val = StandardJavaTypeMappingOld.parse(column.getMapping(), l.get(cnt.getAndIncrement()));
@SuppressWarnings("unchecked")
final Object javaValue = ((TypeMapper<Object, Object>) column.findTypeMapper()).toJavaType(val);
set(builder, column, javaValue);
});
}
};
};
executeUpdate(entity, sb.toString(), values, generatedKeyconsumer, listener);
return entity;
}
private ENTITY updateHelper(ENTITY entity, Optional<Consumer<MetaResult<ENTITY>>> listener) throws SpeedmentException {
final Table table = getTable();
final StringBuilder sb = new StringBuilder();
sb.append("update ").append(sqlTableReference()).append(" set ");
sb.append(sqlColumnList(n -> n + " = ?"));
sb.append(" where ");
sb.append(sqlPrimaryKeyColumnList(pk -> pk + " = ?"));
final List<Object> values = table.columns()
.map(c -> toDatabaseType(c, entity))
.collect(Collectors.toList());
table.primaryKeyColumns().map(pkc -> pkc.findColumn().get()).forEachOrdered(c -> values.add(get(entity, c)));
executeUpdate(entity, sb.toString(), values, NOTHING, listener);
return entity;
}
private ENTITY removeHelper(ENTITY entity, Optional<Consumer<MetaResult<ENTITY>>> listener) throws SpeedmentException {
final Table table = getTable();
final StringBuilder sb = new StringBuilder();
sb.append("delete from ").append(sqlTableReference());
sb.append(" where ");
sb.append(sqlPrimaryKeyColumnList(pk -> pk + " = ?"));
final List<Object> values = table.primaryKeyColumns()
.map(pk -> toDatabaseType(pk.findColumn().get(), entity))
.collect(Collectors.toList());
executeUpdate(entity, sb.toString(), values, NOTHING, listener);
return entity;
}
private void executeUpdate(
final ENTITY entity,
final String sql,
final List<Object> values,
final Function<ENTITY, Consumer<List<Long>>> generatedKeyconsumer,
final Optional<Consumer<MetaResult<ENTITY>>> listener
) throws SpeedmentException {
requireNonNulls(entity, sql, values, generatedKeyconsumer, listener);
final SqlMetaResultImpl<ENTITY> meta = listener.isPresent()
? new SqlMetaResultImpl<ENTITY>()
.setQuery(sql)
.setParameters(values)
: null;
try {
executeUpdate(entity, sql, values, generatedKeyconsumer);
} catch (final SQLException sqle) {
if (meta != null) {
meta.setThrowable(sqle);
}
throw new SpeedmentException(sqle);
} finally {
listener.ifPresent(c -> c.accept(meta));
}
}
private void executeUpdate(
final ENTITY entity,
final String sql,
final List<Object> values,
final Function<ENTITY, Consumer<List<Long>>> generatedKeyconsumer
) throws SQLException {
dbmsHandler().executeUpdate(sql, values, generatedKeyconsumer.apply(entity));
}
}