-
Notifications
You must be signed in to change notification settings - Fork 1
/
dbToDbTask.go
327 lines (278 loc) · 9.5 KB
/
dbToDbTask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
// Copyright (c) 2016 OpenM++
// This code is licensed under the MIT license (see LICENSE.txt for details)
package main
import (
"database/sql"
"errors"
"strconv"
"github.com/openmpp/go/ompp/config"
"github.com/openmpp/go/ompp/db"
"github.com/openmpp/go/ompp/omppLog"
)
// copy modeling task metadata and run history from source database to destination database
func dbToDbTask(modelName string, modelDigest string, runOpts *config.RunOptions) error {
// get task name and id
taskName := runOpts.String(taskNameArgKey)
taskId := runOpts.Int(taskIdArgKey, 0)
// conflicting options: use task id if positive else use task name
if runOpts.IsExist(taskNameArgKey) && runOpts.IsExist(taskIdArgKey) {
if taskId > 0 {
omppLog.Log("dbcopy options conflict. Using task id: ", taskId, " ignore task name: ", taskName)
taskName = ""
} else {
omppLog.Log("dbcopy options conflict. Using task name: ", taskName, " ignore task id: ", taskId)
taskId = 0
}
}
if taskId < 0 || taskId == 0 && taskName == "" {
return errors.New("dbcopy invalid argument(s) for task id: " + runOpts.String(taskIdArgKey) + " and/or task name: " + runOpts.String(taskNameArgKey))
}
// validate source and destination
csInp, dnInp := db.IfEmptyMakeDefaultReadOnly(modelName, runOpts.String(fromSqliteArgKey), runOpts.String(dbConnStrArgKey), runOpts.String(dbDriverArgKey))
csOut, dnOut := db.IfEmptyMakeDefault(modelName, runOpts.String(toSqliteArgKey), runOpts.String(toDbConnStrArgKey), runOpts.String(toDbDriverArgKey))
if csInp == csOut && dnInp == dnOut {
return errors.New("source same as destination: cannot overwrite model in database")
}
// open source database connection and check is it valid
srcDb, _, err := db.Open(csInp, dnInp, false)
if err != nil {
return err
}
defer srcDb.Close()
if err := db.CheckOpenmppSchemaVersion(srcDb); err != nil {
return err
}
// open destination database and check is it valid
dstDb, dbFacet, err := db.Open(csOut, dnOut, true)
if err != nil {
return err
}
defer dstDb.Close()
if err := db.CheckOpenmppSchemaVersion(dstDb); err != nil {
return err
}
// source: get model metadata
srcModel, err := db.GetModel(srcDb, modelName, modelDigest)
if err != nil {
return err
}
modelName = srcModel.Model.Name // set model name: it can be empty and only model digest specified
// get task metadata by id or name
var taskRow *db.TaskRow
if taskId > 0 {
if taskRow, err = db.GetTask(srcDb, taskId); err != nil {
return err
}
if taskRow == nil {
return errors.New("modeling task not found, task id: " + strconv.Itoa(taskId))
}
} else {
if taskRow, err = db.GetTaskByName(srcDb, srcModel.Model.ModelId, taskName); err != nil {
return err
}
if taskRow == nil {
return errors.New("modeling task not found: " + taskName)
}
}
meta, err := db.GetTaskFull(srcDb, taskRow, true, "") // get task full metadata, including task run history
if err != nil {
return err
}
// destination: get model metadata
dstModel, err := db.GetModel(dstDb, modelName, modelDigest)
if err != nil {
return err
}
// destination: get list of languages
dstLang, err := db.GetLanguages(dstDb)
if err != nil {
return err
}
// copy to destiantion model runs from task run history
var runIdLst []int
var isRunNotFound, isRunNotCompleted bool
for j := range meta.TaskRun {
nextRun:
for k := range meta.TaskRun[j].TaskRunSet {
// check is this run id already processed
runId := meta.TaskRun[j].TaskRunSet[k].RunId
for i := range runIdLst {
if runId == runIdLst[i] {
continue nextRun
}
}
runIdLst = append(runIdLst, runId)
// find model run metadata by id
runRow, err := db.GetRun(srcDb, runId)
if err != nil {
return err
}
if runRow == nil {
isRunNotFound = true
continue // skip: run not found
}
// run must be completed: status success, error or exit
if !db.IsRunCompleted(runRow.Status) {
isRunNotCompleted = true
continue // skip: run not completed
}
rm, err := db.GetRunFullText(srcDb, runRow, true, "") // get full model run metadata
if err != nil {
return err
}
// convert model run db rows into "public" format
runPub, err := rm.ToPublic(srcDb, srcModel)
if err != nil {
return err
}
if theCfg.isNoDigestCheck {
runPub.ModelDigest = "" // model digest validation disabled
}
// copy source model run metadata, parameter values, output results into destination database
_, err = copyRunDbToDb(srcDb, dstDb, dbFacet, srcModel, dstModel, rm.Run.RunId, runPub, dstLang)
if err != nil {
return err
}
}
}
// find workset by set id and save it's metadata to json and workset parameters to csv
var wsIdLst []int
var isSetNotFound, isSetNotReadOnly bool
var fws = func(dbConn *sql.DB, setId int) error {
// check is workset already processed
for i := range wsIdLst {
if setId == wsIdLst[i] {
return nil
}
}
wsIdLst = append(wsIdLst, setId)
// get workset by id
wsRow, err := db.GetWorkset(dbConn, setId)
if err != nil {
return err
}
if wsRow == nil { // exit: workset not found
isSetNotFound = true
return nil
}
if !wsRow.IsReadonly { // exit: workset not readonly
isSetNotReadOnly = true
return nil
}
wm, err := db.GetWorksetFull(dbConn, wsRow, "") // get full workset metadata
if err != nil {
return err
}
// convert workset db rows into "public" format
setPub, err := wm.ToPublic(srcDb, srcModel)
if err != nil {
return err
}
if theCfg.isNoDigestCheck {
setPub.ModelDigest = "" // model digest validation disabled
}
// copy source workset metadata and parameters into destination database
_, err = copyWorksetDbToDb(srcDb, dstDb, srcModel, dstModel, wm.Set.SetId, setPub, dstLang)
if err != nil {
return err
}
return nil
}
// save task body worksets
for k := range meta.Set {
if err = fws(srcDb, meta.Set[k]); err != nil {
return err
}
}
// save worksets from model run history
for j := range meta.TaskRun {
for k := range meta.TaskRun[j].TaskRunSet {
if err = fws(srcDb, meta.TaskRun[j].TaskRunSet[k].SetId); err != nil {
return err
}
}
}
// display warnings if any worksets not found or not readonly
// display warnings if any model runs not exists or not completed
if isSetNotFound {
omppLog.Log("Warning: task ", meta.Task.Name, " workset(s) not found, copy of task incomplete")
}
if isSetNotReadOnly {
omppLog.Log("Warning: task ", meta.Task.Name, " workset(s) not readonly, copy of task incomplete")
}
if isRunNotFound {
omppLog.Log("Warning: task ", meta.Task.Name, " model run(s) not found, copy of task run history incomplete")
}
if isRunNotCompleted {
omppLog.Log("Warning: task ", meta.Task.Name, " model run(s) not completed, copy of task run history incomplete")
}
// convert task db rows into "public" format
pub, err := meta.ToPublic(srcDb, srcModel)
if err != nil {
return err
}
// rename destination task
if runOpts.IsExist(taskNewNameArgKey) {
pub.Name = runOpts.String(taskNewNameArgKey)
}
// copy source task metadata into destination database
_, err = copyTaskDbToDb(srcDb, dstDb, srcModel, dstModel, meta.Task.TaskId, pub, dstLang)
if err != nil {
return err
}
return nil
}
// copyTaskListDbToDb do copy all modeling tasks from source to destination database
func copyTaskListDbToDb(
srcDb *sql.DB, dstDb *sql.DB, srcModel *db.ModelMeta, dstModel *db.ModelMeta, dstLang *db.LangMeta) error {
// source: get all modeling tasks metadata in all languages
srcTl, err := db.GetTaskFullList(srcDb, srcModel.Model.ModelId, true, "")
if err != nil {
return err
}
if len(srcTl) <= 0 {
return nil
}
// copy task metadata from source to destination database by using "public" format
for k := range srcTl {
// convert task metadata db rows into "public"" format
pub, err := srcTl[k].ToPublic(srcDb, srcModel)
if err != nil {
return err
}
// save into destination database
_, err = copyTaskDbToDb(srcDb, dstDb, srcModel, dstModel, srcTl[k].Task.TaskId, pub, dstLang)
if err != nil {
return err
}
}
return nil
}
// copyTaskDbToDb do copy modeling task metadata and task run history from source to destination database
// it return destination task id (task id in destination database)
func copyTaskDbToDb(
srcDb *sql.DB, dstDb *sql.DB, srcModel *db.ModelMeta, dstModel *db.ModelMeta, srcId int, pub *db.TaskPub, dstLang *db.LangMeta) (int, error) {
// validate parameters
if pub == nil {
return 0, errors.New("invalid (empty) source modeling task metadata, source task not found or not exists")
}
// destination: convert from "public" format into destination db rows
dstTask, isSetNotFound, isTaskRunNotFound, err := pub.FromPublic(dstDb, dstModel, true)
if err != nil {
return 0, err
}
if isSetNotFound {
omppLog.Log("Warning: task ", dstTask.Task.Name, " worksets not found, copy of task incomplete")
}
if isTaskRunNotFound {
omppLog.Log("Warning: task ", dstTask.Task.Name, " worksets or model runs not found, copy of task run history incomplete")
}
// destination: save modeling task metadata
err = dstTask.UpdateTaskFull(dstDb, dstModel, dstLang)
if err != nil {
return 0, err
}
dstId := dstTask.Task.TaskId
omppLog.Log("Modeling task from ", srcId, " ", pub.Name, " to ", dstId)
return dstId, nil
}