/
readParameter.go
405 lines (347 loc) · 10.4 KB
/
readParameter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
// Copyright (c) 2016 OpenM++
// This code is licensed under the MIT license (see LICENSE.txt for details)
package db
import (
"database/sql"
"errors"
"strconv"
)
// ReadParameterTo read input parameter rows (sub id, dimensions, value) from workset or model run results and process each row by cvtTo().
func ReadParameterTo(dbConn *sql.DB, modelDef *ModelMeta, layout *ReadParamLayout, cvtTo func(src interface{}) (bool, error)) (*ReadPageLayout, error) {
// validate parameters
if modelDef == nil {
return nil, errors.New("invalid (empty) model metadata, look like model not found")
}
if layout == nil {
return nil, errors.New("invalid (empty) parameter read layout")
}
if layout.Name == "" {
return nil, errors.New("invalid (empty) parameter name")
}
// find parameter id by name
var param *ParamMeta
if k, ok := modelDef.ParamByName(layout.Name); ok {
param = &modelDef.Param[k]
} else {
return nil, errors.New("parameter not found: " + layout.Name)
}
// if this is workset parameter then:
// if source workset exist
// check readonly status: if layout.IsEditSet then read-only must be false
// if parameter not in workset then select base run id, it must be >0
var srcRunId int
var isWsParam bool
if !layout.IsFromSet {
srcRunId = layout.FromId // this is parameter from existing run
} else {
// validate workset: it must exist
setRow, err := GetWorkset(dbConn, layout.FromId)
if err != nil {
return nil, err
}
if setRow == nil {
return nil, errors.New("workset not found, id: " + strconv.Itoa(layout.FromId))
}
// workset readonly status must be compatible with (oposite to) "edit workset" status
if layout.IsEditSet && setRow.IsReadonly {
return nil, errors.New("cannot edit parameter " + param.Name + " from read-only workset, id: " + strconv.Itoa(layout.FromId))
}
// check is this workset contain the parameter
err = SelectFirst(dbConn,
"SELECT COUNT(*) FROM workset_parameter"+
" WHERE set_id = "+strconv.Itoa(layout.FromId)+
" AND parameter_hid = "+strconv.Itoa(param.ParamHid),
func(row *sql.Row) error {
var n int
if err := row.Scan(&n); err != nil {
return err
}
isWsParam = n != 0
return nil
})
switch {
case err == sql.ErrNoRows: // unknown error: should never be there
return nil, errors.New("cannot count parameter " + param.Name + " in workset, id: " + strconv.Itoa(layout.FromId))
case err != nil:
return nil, err
}
// if parameter not in that workset then workset must have base run
if !isWsParam {
if setRow.BaseRunId <= 0 {
return nil, errors.New("workset does not contain parameter " + param.Name + " and not run-based, workset id: " + strconv.Itoa(layout.FromId))
}
srcRunId = setRow.BaseRunId
}
}
// if parameter from run (or from workset base run) then:
// check if model run exist and model run completed is completed or in progress
if !isWsParam {
runRow, err := GetRun(dbConn, srcRunId)
if err != nil {
return nil, err
}
if runRow == nil {
return nil, errors.New("model run not found, id: " + strconv.Itoa(srcRunId))
}
if !IsRunCompleted(runRow.Status) && runRow.Status != ProgressRunStatus {
return nil, errors.New("model run not completed, id: " + strconv.Itoa(srcRunId))
}
}
// make sql to select parameter from model run or workset:
// SELECT sub_id, dim0, dim1, param_value
// FROM ageSex_p2012_817
// WHERE run_id = (SELECT base_run_id FROM run_parameter WHERE run_id = 1234 AND parameter_hid = 1)
// AND sub_id = 7
// AND dim1 IN (1, 2, 3, 4)
// ORDER BY 1, 2, 3
// or:
// SELECT sub_id, dim0, dim1, param_value
// FROM ageSex_w2012_817
// AND sub_id = 7
// WHERE set_id = 9876
// AND dim1 IN (1, 2, 3, 4)
// ORDER BY 1, 2, 3
q := "SELECT sub_id, "
for k := range param.Dim {
q += param.Dim[k].colName + ", "
}
q += "param_value FROM "
if isWsParam {
q += param.DbSetTable +
" WHERE set_id = " + strconv.Itoa(layout.FromId)
} else {
q += param.DbRunTable +
" WHERE run_id =" +
" (SELECT base_run_id FROM run_parameter" +
" WHERE run_id = " + strconv.Itoa(srcRunId) +
" AND parameter_hid = " + strconv.Itoa(param.ParamHid) + ")"
}
// append sub-value id filter
if layout.IsSubId {
q += " AND sub_id = " + strconv.Itoa(layout.SubId)
}
// append dimension enum code filters, if specified
for k := range layout.Filter {
// find dimension index by name
dix := -1
for j := range param.Dim {
if param.Dim[j].Name == layout.Filter[k].Name {
dix = j
break
}
}
if dix < 0 {
return nil, errors.New("parameter " + param.Name + " does not have dimension " + layout.Filter[k].Name)
}
f, err := makeWhereFilter(
&layout.Filter[k], "", param.Dim[dix].colName, param.Dim[dix].typeOf, false, param.Dim[dix].Name, "parameter "+param.Name)
if err != nil {
return nil, err
}
q += " AND " + f
}
// append dimension enum id filters, if specified
for k := range layout.FilterById {
// find dimension index by name
dix := -1
for j := range param.Dim {
if param.Dim[j].Name == layout.FilterById[k].Name {
dix = j
break
}
}
if dix < 0 {
return nil, errors.New("parameter " + param.Name + " does not have dimension " + layout.FilterById[k].Name)
}
f, err := makeWhereIdFilter(
&layout.FilterById[k], "", param.Dim[dix].colName, param.Dim[dix].typeOf, param.Dim[dix].Name, "parameter "+param.Name)
if err != nil {
return nil, err
}
q += " AND " + f
}
// append order by
q += makeOrderBy(param.Rank, layout.OrderBy, 1)
// prepare db-row scan conversion buffer: sub_id, dimensions, value
// and define conversion function to make new cell from scan buffer
scanBuf, fc := scanSqlRowToCellParam(param)
// if full page requested:
// select rows into the list buffer and write rows from the list into output stream
if layout.IsFullPage {
// make a list of output cells
cLst, lt, e := SelectToList(dbConn, q, layout.ReadPageLayout,
func(rows *sql.Rows) (interface{}, error) {
if e := rows.Scan(scanBuf...); e != nil {
return nil, e
}
// make new cell from conversion buffer
c := CellParam{cellIdValue: cellIdValue{DimIds: make([]int, param.Rank)}}
if e := fc(&c); e != nil {
return nil, e
}
return c, nil
})
if e != nil {
return nil, e
}
// write page into output stream
for c := cLst.Front(); c != nil; c = c.Next() {
if _, e := cvtTo(c.Value); e != nil {
return nil, e
}
}
return lt, nil // done: return output page layout
}
// else: select rows and write it into output stream without buffering
// adjust page layout: starting offset and page size
nStart := layout.Offset
if nStart < 0 {
nStart = 0
}
nSize := layout.Size
if nSize < 0 {
nSize = 0
}
var nRow int64
lt := ReadPageLayout{
Offset: nStart,
Size: 0,
IsLastPage: false,
}
// select parameter cells: (sub id, dimension(s) enum ids, parameter value)
err := SelectRowsTo(dbConn, q,
func(rows *sql.Rows) (bool, error) {
// if page size is limited then select only a page of rows
nRow++
if nSize > 0 && nRow > nStart+nSize {
return false, nil
}
if nRow <= nStart {
return true, nil
}
// select next row
if e := rows.Scan(scanBuf...); e != nil {
return false, e
}
lt.Size++
// make new cell from conversion buffer
c := CellParam{cellIdValue: cellIdValue{DimIds: make([]int, param.Rank)}}
if e := fc(&c); e != nil {
return false, e
}
return cvtTo(c) // process cell
})
if err != nil {
return nil, err
}
// check for the empty result page or last page
if lt.Size <= 0 {
lt.Offset = nRow
}
lt.IsLastPage = nSize <= 0 || nSize > 0 && nRow <= nStart+nSize
return <, nil
}
// trxReadParameterTo read input parameter rows (sub id, dimensions, value) from workset or model run results and process each row by cvtTo().
func trxReadParameterTo(trx *sql.Tx, param *ParamMeta, query string, cvtTo func(src interface{}) error) error {
// select parameter cells: (sub id, dimension(s) enum ids, parameter value)
scanBuf, fc := scanSqlRowToCellParam(param)
err := TrxSelectRows(trx, query,
func(rows *sql.Rows) error {
// select next row
if e := rows.Scan(scanBuf...); e != nil {
return e
}
// make new cell from conversion buffer
var c = CellParam{cellIdValue: cellIdValue{DimIds: make([]int, param.Rank)}}
if e := fc(&c); e != nil {
return e
}
return cvtTo(c) // process cell
})
return err
}
// prepare to scan sql rows and convert each row to CellParam
// retun scan buffer to be popualted by rows.Scan() and closure to that buffer into CellParam
func scanSqlRowToCellParam(param *ParamMeta) ([]interface{}, func(*CellParam) error) {
var nSub int
d := make([]int, param.Rank)
var v interface{}
var vs string
var vf sql.NullFloat64
var cvt func(c *CellParam) error
var scanBuf []interface{}
scanBuf = append(scanBuf, &nSub)
for k := 0; k < param.Rank; k++ {
scanBuf = append(scanBuf, &d[k])
}
switch {
case param.typeOf.IsBool():
scanBuf = append(scanBuf, &v)
cvt = func(c *CellParam) error {
c.SubId = nSub
copy(c.DimIds, d)
c.IsNull = false // logical parameter expected to be NOT NULL
is := false
switch vn := v.(type) {
case nil: // 2018: unexpected today, may be in the future
is = false
c.IsNull = true
case bool:
is = vn
case int64:
is = vn != 0
case uint64:
is = vn != 0
case int32:
is = vn != 0
case uint32:
is = vn != 0
case int16:
is = vn != 0
case uint16:
is = vn != 0
case int8:
is = vn != 0
case uint8:
is = vn != 0
case uint:
is = vn != 0
case float32: // oracle (very unlikely)
is = vn != 0.0
case float64: // oracle (often)
is = vn != 0.0
case int:
is = vn != 0
default:
return errors.New("invalid parameter value type, expected: integer")
}
c.Value = is
return nil
}
case param.typeOf.IsString():
scanBuf = append(scanBuf, &vs)
cvt = func(c *CellParam) error {
c.SubId = nSub
copy(c.DimIds, d)
c.IsNull = false
c.Value = vs
return nil
}
case param.typeOf.IsFloat():
scanBuf = append(scanBuf, &vf)
cvt = func(c *CellParam) error {
c.SubId = nSub
copy(c.DimIds, d)
c.IsNull = !vf.Valid
c.Value = 0.0
if !c.IsNull {
c.Value = vf.Float64
}
return nil
}
default:
scanBuf = append(scanBuf, &v)
cvt = func(c *CellParam) error { c.SubId = nSub; copy(c.DimIds, d); c.IsNull = false; c.Value = v; return nil }
}
return scanBuf, cvt
}