-
Notifications
You must be signed in to change notification settings - Fork 10
/
TranslationCorporaStore.php
334 lines (301 loc) 路 11 KB
/
TranslationCorporaStore.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
<?php
/**
* @copyright See AUTHORS.txt
* @license GPL-2.0-or-later
*/
declare( strict_types = 1 );
namespace ContentTranslation\Store;
use ContentTranslation\Entity\TranslationUnit;
use ContentTranslation\LoadBalancer;
use LogicException;
use stdClass;
use Wikimedia\Rdbms\IDatabase;
use Wikimedia\Rdbms\IExpression;
use Wikimedia\Rdbms\LBFactory;
use Wikimedia\Rdbms\LikeValue;
use Wikimedia\Rdbms\Platform\ISQLPlatform;
use Wikimedia\Rdbms\SelectQueryBuilder;
/**
* The TranslationCorporaStore service represents the Data Access Layer for the parallel corpora. More
* specifically, this service relies on the database load balancer to interact with the "cx_corpora"
* table, to insert, update, delete or fetch data to this table. This class exposes methods that
* usually accepts TranslationUnit entity objects as arguments (when it's an insert/update query),
* or return TranslationUnit entity objects (for read operations). This service is mostly used inside
* TranslationCorporaManager service to interact with the database.
*/
class TranslationCorporaStore {
/** @var LoadBalancer */
private $lb;
/** @var LBFactory */
private $lbFactory;
public const TABLE_NAME = 'cx_corpora';
public function __construct( LoadBalancer $lb, LBFactory $lbFactory ) {
$this->lb = $lb;
$this->lbFactory = $lbFactory;
}
/**
* Update a translation unit.
*
* @param TranslationUnit $translationUnit
* @param string $timestamp
*/
private function updateTranslationUnit( TranslationUnit $translationUnit, string $timestamp ): void {
$dbw = $this->lb->getConnection( DB_PRIMARY );
$dbw->newUpdateQueryBuilder()
->update( self::TABLE_NAME )
->set( [
'cxc_sequence_id' => $translationUnit->getSequenceId(),
'cxc_timestamp' => $dbw->timestamp(),
'cxc_content' => $translationUnit->getContent()
] )
->where( [
'cxc_translation_id' => $translationUnit->getTranslationId(),
'cxc_section_id' => $translationUnit->getSectionId(),
'cxc_origin' => $translationUnit->getOrigin(),
// Sometimes we get "duplicates" entries which differ in timestamp.
// Then any updates to those sections would fail (duplicate key for
// a unique index), if we did not limit this call to only one of them.
'cxc_timestamp' => $dbw->timestamp( $timestamp ),
] )
->caller( __METHOD__ )
->execute();
if ( $dbw->affectedRows() < 1 ) {
// Possible reasons:
// * concurrent request has already updated the row with new timestamp
// * no change (saving same thing twice in the same second)
// * translation has been deleted
throw new LogicException( 'Failed to update a translation section' );
}
}
/**
* Insert a translation unit.
*
* @param TranslationUnit $translationUnit
*/
private function insertTranslationUnit( TranslationUnit $translationUnit ): void {
$dbw = $this->lb->getConnection( DB_PRIMARY );
$dbw->newInsertQueryBuilder()
->insertInto( self::TABLE_NAME )
->row( [
'cxc_translation_id' => $translationUnit->getTranslationId(),
'cxc_section_id' => $translationUnit->getSectionId(),
'cxc_origin' => $translationUnit->getOrigin(),
'cxc_sequence_id' => $translationUnit->getSequenceId(),
'cxc_timestamp' => $dbw->timestamp(),
'cxc_content' => $translationUnit->getContent()
] )
->caller( __METHOD__ )
->execute();
}
/**
* Delete translation units and categories associated with the given translation identifier.
*
* @param int|int[] $translationId
*/
public function deleteTranslationData( $translationId ): void {
$dbw = $this->lb->getConnection( DB_PRIMARY );
$dbw->newDeleteQueryBuilder()
->deleteFrom( self::TABLE_NAME )
->where( [ 'cxc_translation_id' => $translationId ] )
->caller( __METHOD__ )
->execute();
}
/**
* Given the "parent" translation id and the base section id (in the "${revision}_${sectionNumber}"
* form), this method deletes all the translation units that belong to that section translation,
* from the "cx_corpora" table.
*
* NOTE: The "cxc_section_id" field inside "cx_corpora" table is in the following form for
* section translation parallel corpora units: "${baseSectionId}_${subSectionId}", where
* "subSectionId" is given by the cxserver as the section HTML element id (e.g. "cxSourceSection4").
* This is why we use a "LIKE" query in the following form, here: "${baseSectionId}_%"
*
* @param int $translationId the id of the "parent" translation inside "cx_translations" table
* @param string $baseSectionId the "cxsx_section_id" as stored inside "cx_section_translations" table
* @return void
*/
public function deleteTranslationDataBySectionId( int $translationId, string $baseSectionId ): void {
$dbw = $this->lb->getConnection( DB_PRIMARY );
$dbw->newDeleteQueryBuilder()
->deleteFrom( self::TABLE_NAME )
->where( [
'cxc_translation_id' => $translationId,
$dbw->expr( 'cxc_section_id', IExpression::LIKE,
new LikeValue( $baseSectionId, '_', $dbw->anyString() ) ),
] )
->caller( __METHOD__ )
->execute();
}
/**
* Given a translation id, this method returns the count of the parallel corpora
* translation units, associated with this translation id.
*
* @param int $translationId the id of the translation inside "cx_translations" table
* @return int
*/
public function countByTranslationId( int $translationId ): int {
$dbr = $this->lb->getConnection( DB_REPLICA );
return $dbr->newSelectQueryBuilder()
->select( ISQLPlatform::ALL_ROWS )
->from( self::TABLE_NAME )
->where( [ 'cxc_translation_id' => $translationId ] )
->caller( __METHOD__ )
->fetchRowCount();
}
/**
* Delete translation units and categories associated with the given translation identifier
* in a manner that avoids creating excessive database lag.
*
* @param int|int[] $ids
* @param int $batchSize
*/
public function deleteTranslationDataGently( $ids, int $batchSize = 1000 ): void {
$dbw = $this->lb->getConnection( DB_PRIMARY );
while ( true ) {
$rowsToDelete = $dbw->newSelectQueryBuilder()
->select( 'cxc_id' )
->from( self::TABLE_NAME )
->where( [ 'cxc_translation_id' => $ids ] )
->limit( $batchSize )
->caller( __METHOD__ )
->fetchFieldValues();
if ( !$rowsToDelete ) {
break;
}
$dbw->newDeleteQueryBuilder()
->deleteFrom( self::TABLE_NAME )
->where( [ 'cxc_id' => $rowsToDelete ] )
->caller( __METHOD__ )
->execute();
$this->lbFactory->waitForReplication();
}
}
/**
* @param int $translationId
* @return TranslationUnit[]
*/
public function findByTranslationId( int $translationId ): array {
$dbr = $this->lb->getConnection( DB_REPLICA );
$resultSet = $dbr->newSelectQueryBuilder()
->select( [
'cxc_translation_id',
'cxc_origin',
'cxc_section_id',
'cxc_timestamp',
'cxc_sequence_id',
'cxc_content',
] )
->from( self::TABLE_NAME )
->where( [ 'cxc_translation_id' => $translationId ] )
->caller( __METHOD__ )
->fetchResultSet();
$units = [];
foreach ( $resultSet as $row ) {
$units[] = $this->createTranslationUnitFromRow( $row );
}
return $units;
}
/**
* Given a translation id, this method returns an integer, indicating
* the count of the translated subsections (paragraphs) for that translation.
*
* @param int $translationId
* @return int count of translated subsections
*/
public function countTranslatedSubSectionsByTranslationId( int $translationId ): int {
$dbr = $this->lb->getConnection( DB_REPLICA );
$row = $dbr->newSelectQueryBuilder()
->select( [
'cxc_translation_id',
'cxc_section_id',
'count' => 'COUNT(DISTINCT cxc_section_id)',
] )
->from( self::TABLE_NAME )
->where( [ 'cxc_translation_id' => $translationId ] )
->groupBy( [ 'cxc_translation_id' ] )
->caller( __METHOD__ )
->fetchRow();
return (int)$row->count;
}
/**
* Saves the translation unit. If the record exists, updates it, otherwise creates it.
*
* @param TranslationUnit $translationUnit
* @param bool $isNewTranslation Whether these are for a brand-new Translation record
*/
public function save( TranslationUnit $translationUnit, bool $isNewTranslation ): void {
$fname = __METHOD__;
// Update the latest row if there is one instead of making a new one
$conditions = [
'cxc_translation_id' => $translationUnit->getTranslationId(),
'cxc_section_id' => $translationUnit->getSectionId(),
'cxc_origin' => $translationUnit->getOrigin()
];
if ( $isNewTranslation ) {
// T134245: brand new translations can also insert corpora data in the same
// request. The doFind() query uses only a subset of a unique cx_corpora index,
// causing SH gap locks. Worse, is that the leftmost values comes from the
// auto-incrementing translation_id. This puts gap locks on the range of
// (MAX(cxc_translation_id),+infinity), which could make the whole API prone
// to deadlocks and timeouts. Bypass this problem by remembering if the parent
// translation row is brand new and skipping doFind() in such cases.
$existing = false;
} else {
// Note that the only caller of this method will have already locked the
// parent Translation row, serializing simultaneous duplicate submissions at
// this point. Without that row lock, the two transaction might both acquire
// SH gap locks in doFind() and then deadlock in create() trying to get IX gap
// locks (if no duplicate rows were found).
$options = [];
$dbr = $this->lb->getConnection( DB_REPLICA );
$existing = $this->doFind( $dbr, $conditions, $options, $fname );
}
if ( $existing ) {
$dbw = $this->lb->getConnection( DB_PRIMARY );
$dbw->doAtomicSection(
__METHOD__,
function ( IDatabase $dbw ) use ( $translationUnit, $conditions, $fname ) {
// Lock the record for updating it. This time we use $dbw - primary db.
// This is to avoid the unnecessary gap locking with 'for update' query
// when the record does not exist.
$options = [ 'FOR UPDATE' ];
$existing = $this->doFind( $dbw, $conditions, $options, $fname );
$this->updateTranslationUnit( $translationUnit, $existing->getTimestamp() );
}
);
} else {
$this->insertTranslationUnit( $translationUnit );
}
}
private function doFind( IDatabase $db, $conditions, $options, $method ): ?TranslationUnit {
$row = $db->newSelectQueryBuilder()
->select( [
'cxc_translation_id',
'cxc_section_id',
'cxc_origin',
'cxc_timestamp',
'cxc_sequence_id',
'cxc_content'
] )
->from( self::TABLE_NAME )
->where( $conditions )
->orderBy( 'cxc_timestamp', SelectQueryBuilder::SORT_DESC )
->options( $options )
->caller( $method )
->fetchRow();
if ( $row ) {
return $this->createTranslationUnitFromRow( $row );
}
return null;
}
private function createTranslationUnitFromRow( stdClass $row ): TranslationUnit {
return new TranslationUnit(
$row->cxc_section_id,
$row->cxc_origin,
(int)$row->cxc_sequence_id, // cxc_sequence_id can be null
(string)$row->cxc_content, // cxc_content can be null
(int)$row->cxc_translation_id,
$row->cxc_timestamp
);
}
}