/
column.go
130 lines (124 loc) · 3.39 KB
/
column.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package sqlavro
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/khezen/avro"
)
func sqlColumn2AVRO(columnName string, dataType SQLType, isNullable bool, defaultValue []byte, numPrecision, numScale, charBytesLen int) (*avro.RecordFieldSchema, error) {
fieldType, err := sqlColumn2AVROType(columnName, dataType, isNullable, numPrecision, numScale, charBytesLen)
if err != nil {
return nil, err
}
if len(defaultValue) > 0 {
defaultValue = sqlDefault2AVRODefault(dataType, defaultValue)
}
if isNullable {
if defaultValue == nil || strings.EqualFold("null", strings.ToLower(string(defaultValue))) {
fieldType = avro.UnionSchema([]avro.Schema{avro.TypeNull, fieldType})
} else {
fieldType = avro.UnionSchema([]avro.Schema{fieldType, avro.TypeNull})
}
}
var rawDefault *json.RawMessage
if len(defaultValue) > 0 {
rawDefault = new(json.RawMessage)
*rawDefault = defaultValue
}
formattedName := formatString(columnName)
var aliases []string
if formattedName != columnName {
aliases = []string{columnName}
}
return &avro.RecordFieldSchema{
Name: formattedName,
Aliases: aliases,
Type: fieldType,
Default: rawDefault,
}, nil
}
func sqlColumn2AVROType(columnName string, dataType SQLType, isNullable bool, numPrecision, numScale, charBytesLen int) (fieldType avro.Schema, err error) {
switch dataType {
case Bit:
return &avro.FixedSchema{
Name: columnName,
Type: avro.TypeFixed,
Size: numPrecision,
}, nil
case Char, NChar, VarChar, NVarChar,
Text, TinyText, MediumText, LongText,
Enum, Set:
return avro.TypeString, nil
case Blob, MediumBlob, LongBlob:
return avro.TypeBytes, nil
case TinyInt, SmallInt, MediumInt, Int, Year:
return avro.TypeInt32, nil
case BigInt:
return avro.TypeInt64, nil
case Float:
return avro.TypeFloat32, nil
case Double:
return avro.TypeFloat64, nil
case Decimal:
return &avro.DerivedPrimitiveSchema{
Type: avro.TypeBytes,
LogicalType: avro.LogicalTypeDecimal,
Precision: &numPrecision,
Scale: &numScale,
}, nil
case Date:
return &avro.DerivedPrimitiveSchema{
Type: avro.TypeInt32,
LogicalType: avro.LogicalTypeDate,
}, nil
case Time:
return &avro.DerivedPrimitiveSchema{
Type: avro.TypeInt32,
LogicalType: avro.LogicalTypeTime,
}, nil
case DateTime:
return &avro.DerivedPrimitiveSchema{
Type: avro.TypeInt32,
Documentation: string(DateTime),
LogicalType: avro.LogicalTypeTimestamp,
}, nil
case Timestamp:
return &avro.DerivedPrimitiveSchema{
Type: avro.TypeInt32,
Documentation: string(Timestamp),
LogicalType: avro.LogicalTypeTimestamp,
}, nil
default:
return nil, avro.ErrUnsupportedType
}
}
func sqlDefault2AVRODefault(dataType SQLType, sqlDefaultValue []byte) (avroDefault []byte) {
switch dataType {
case Char, NChar, VarChar, NVarChar,
Text, TinyText, MediumText, LongText,
Enum, Set:
return []byte(fmt.Sprintf(`"%s"`, string(sqlDefaultValue)))
case Date, Time, DateTime, Timestamp:
var format string
switch dataType {
case Date:
format = "2006-01-02"
case Time:
format = "15:04:05"
case DateTime, Timestamp:
format = "2006-01-02 15:04:05"
}
t, err := time.Parse(format, string(sqlDefaultValue))
if err != nil {
return nil
}
if dataType == Time {
t = t.AddDate(1970, 0, 0)
}
return []byte(strconv.Itoa(int(t.Unix())))
default:
return sqlDefaultValue
}
}