-
-
Notifications
You must be signed in to change notification settings - Fork 495
/
ChangeSetPersister.ts
174 lines (142 loc) · 7.21 KB
/
ChangeSetPersister.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import { MetadataStorage } from '../metadata';
import { AnyEntity, Dictionary, EntityData, EntityMetadata, EntityProperty, FilterQuery, IPrimaryKey } from '../typings';
import { EntityIdentifier } from '../entity';
import { ChangeSet, ChangeSetType } from './ChangeSet';
import { QueryResult, Transaction } from '../connections';
import { Configuration, OptimisticLockError, Utils } from '../utils';
import { IDatabaseDriver } from '../drivers';
import { Hydrator } from '../hydration';
export class ChangeSetPersister {
constructor(private readonly driver: IDatabaseDriver,
private readonly identifierMap: Map<string, EntityIdentifier>,
private readonly metadata: MetadataStorage,
private readonly hydrator: Hydrator,
private readonly config: Configuration) { }
async executeInserts<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
changeSets.forEach(changeSet => this.processProperties(changeSet));
for (const changeSet of changeSets) {
await this.persistNewEntity(changeSet, ctx);
}
}
async executeUpdates<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
changeSets.forEach(changeSet => this.processProperties(changeSet));
for (const changeSet of changeSets) {
await this.persistManagedEntity(changeSet, ctx);
}
}
async executeDeletes<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
const meta = changeSets[0].entity.__helper!.__meta;
const pk = Utils.getPrimaryKeyHash(meta.primaryKeys);
if (meta.compositePK) {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKeys);
await this.driver.nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
} else {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKey as Dictionary);
await this.driver.nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
}
}
private processProperties<T extends AnyEntity<T>>(changeSet: ChangeSet<T>): void {
const meta = this.metadata.find(changeSet.name)!;
for (const prop of Object.values(meta.properties)) {
this.processProperty(changeSet, prop);
}
}
private async persistNewEntity<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, ctx?: Transaction): Promise<void> {
const meta = this.metadata.find(changeSet.name)!;
const wrapped = changeSet.entity.__helper!;
const res = await this.driver.nativeInsert(changeSet.name, changeSet.payload, ctx);
const hasPrimaryKey = Utils.isDefined(wrapped.__primaryKey, true);
this.mapReturnedValues(changeSet.entity, res, meta);
if (!hasPrimaryKey) {
this.mapPrimaryKey(meta, res.insertId, changeSet);
}
this.markAsPopulated(changeSet, meta);
wrapped.__initialized = true;
wrapped.__managed = true;
await this.processOptimisticLock(meta, changeSet, res, ctx);
changeSet.persisted = true;
}
private async persistManagedEntity<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, ctx?: Transaction): Promise<void> {
const meta = this.metadata.find(changeSet.name)!;
const res = await this.updateEntity(meta, changeSet, ctx);
this.mapReturnedValues(changeSet.entity, res, meta);
await this.processOptimisticLock(meta, changeSet, res, ctx);
changeSet.persisted = true;
}
private mapPrimaryKey<T extends AnyEntity<T>>(meta: EntityMetadata<T>, value: IPrimaryKey, changeSet: ChangeSet<T>): void {
const prop = meta.properties[meta.primaryKeys[0]];
const insertId = prop.customType ? prop.customType.convertToJSValue(value, this.driver.getPlatform()) : value;
const wrapped = changeSet.entity.__helper!;
wrapped.__primaryKey = Utils.isDefined(wrapped.__primaryKey, true) ? wrapped.__primaryKey : insertId;
this.identifierMap.get(wrapped.__uuid)!.setValue(changeSet.entity[prop.name] as unknown as IPrimaryKey);
}
/**
* Sets populate flag to new entities so they are serialized like if they were loaded from the db
*/
private markAsPopulated<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, meta: EntityMetadata<T>) {
if (!this.config.get('populateAfterFlush')) {
return;
}
changeSet.entity.__helper!.populated();
Object.values(meta.properties).forEach(prop => {
const value = changeSet.entity[prop.name];
if (Utils.isEntity(value, true)) {
(value as AnyEntity).__helper!.populated();
} else if (Utils.isCollection(value)) {
value.populated();
}
});
}
private async updateEntity<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSet: ChangeSet<T>, ctx?: Transaction): Promise<QueryResult> {
if (!meta.versionProperty || !changeSet.entity[meta.versionProperty]) {
return this.driver.nativeUpdate(changeSet.name, changeSet.entity.__helper!.__primaryKey as Dictionary, changeSet.payload, ctx);
}
const cond = {
...Utils.getPrimaryKeyCond<T>(changeSet.entity, meta.primaryKeys),
[meta.versionProperty]: changeSet.entity[meta.versionProperty],
} as FilterQuery<T>;
return this.driver.nativeUpdate(changeSet.name, cond, changeSet.payload, ctx);
}
private async processOptimisticLock<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSet: ChangeSet<T>, res: QueryResult | undefined, ctx?: Transaction) {
if (!meta.versionProperty) {
return;
}
if (changeSet.type === ChangeSetType.UPDATE && res && !res.affectedRows) {
throw OptimisticLockError.lockFailed(changeSet.entity);
}
const e = await this.driver.findOne<T>(meta.name!, changeSet.entity.__helper!.__primaryKey, {
fields: [meta.versionProperty],
}, ctx);
changeSet.entity[meta.versionProperty] = e![meta.versionProperty];
}
private processProperty<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, prop: EntityProperty<T>): void {
const value = changeSet.payload[prop.name];
if (value as unknown instanceof EntityIdentifier) {
changeSet.payload[prop.name] = value.getValue();
}
if (prop.onCreate && changeSet.type === ChangeSetType.CREATE) {
changeSet.entity[prop.name] = changeSet.payload[prop.name] = prop.onCreate(changeSet.entity);
if (prop.primary) {
this.mapPrimaryKey(changeSet.entity.__helper!.__meta, changeSet.entity[prop.name] as unknown as IPrimaryKey, changeSet);
}
}
if (prop.onUpdate && changeSet.type === ChangeSetType.UPDATE) {
changeSet.entity[prop.name] = changeSet.payload[prop.name] = prop.onUpdate(changeSet.entity);
}
}
/**
* Maps values returned via `returning` statement (postgres) or the inserted id (other sql drivers).
* No need to handle composite keys here as they need to be set upfront.
*/
private mapReturnedValues<T extends AnyEntity<T>>(entity: T, res: QueryResult, meta: EntityMetadata<T>): void {
if (res.row && Object.keys(res.row).length > 0) {
const data = Object.values<EntityProperty>(meta.properties).reduce((data, prop) => {
if (prop.fieldNames && res.row![prop.fieldNames[0]] && !Utils.isDefined(entity[prop.name], true)) {
data[prop.name] = res.row![prop.fieldNames[0]];
}
return data;
}, {} as Dictionary);
this.hydrator.hydrate<T>(entity, meta, data as EntityData<T>, false, true);
}
}
}