/
createTable.go
250 lines (227 loc) · 7.81 KB
/
createTable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package cmd
import (
"errors"
"fmt"
"os"
"strings"
"github.com/jackc/pgx"
"github.com/spf13/cobra"
parser "github.com/stephane-martin/w3c-extendedlog-parser"
)
var tableName string
var partitionKey string
var parentPartitionKey string
var rangeStart string
var rangeEnd string
var noIndex bool
var noFullTextIndex bool
func pgKey(key string) string {
key = strings.Replace(key, "-", "_", -1)
key = strings.Replace(key, "(", "$", -1)
key = strings.Replace(key, ")", "$", -1)
return key
}
var createTableCmd = &cobra.Command{
Use: "create-table",
Short: "Create a table in postgres with an adequate schema to store access logs",
Run: func(cmd *cobra.Command, args []string) {
var fieldsNames []string
fieldsLine = strings.TrimSpace(fieldsLine)
fname := strings.TrimSpace(filename)
if len(fieldsLine) == 0 && len(fname) == 0 {
fatal(errors.New("Specify fields with --filename or --fields"))
}
if len(fieldsLine) != 0 && len(fname) != 0 {
fatal(errors.New("--fields and --filename options are exclusive"))
}
if len(parentPartitionKey) > 0 && len(partitionKey) > 0 {
fatal(errors.New("--partition and --parent are exclusive"))
}
if len(parentPartitionKey) > 0 && (len(rangeStart) == 0 || len(rangeEnd) == 0) {
fatal(errors.New("if --parent is set, --start and --end must be specified too"))
}
if len(fieldsLine) > 0 {
fieldsNames = strings.Split(fieldsLine, " ")
} else {
f, err := os.Open(fname)
fatal(err)
p := parser.NewFileParser(f)
err = p.ParseHeader()
f.Close()
fatal(err)
fieldsNames = p.FieldNames()
}
if len(fieldsNames) == 0 {
fatal(errors.New("field names not found"))
}
hasGMT := false
for _, name := range fieldsNames {
if name == "gmttime" {
hasGMT = true
break
}
}
if !hasGMT {
fieldsNames = append([]string{"gmttime"}, fieldsNames...)
}
fieldsNames = append([]string{"id"}, fieldsNames...)
excludes := make(map[string]bool)
for _, fName := range excludedFields {
excludes[strings.ToLower(fName)] = true
}
excludes["date"] = true
excludes["time"] = true
// todo: parse user agent
createStmt := ""
if len(parentPartitionKey) == 0 {
createStmt = buildCreateStmt(tableName, fieldsNames, excludes, partitionKey)
} else {
createStmt = buildCreateChildStmt(tableName, parentPartitionKey, rangeStart, rangeEnd)
}
dbURI = strings.TrimSpace(dbURI)
tableName = strings.TrimSpace(tableName)
if len(dbURI) == 0 || len(tableName) == 0 {
fatal(errors.New("Empty uri or tablename"))
}
if !validName(tableName) {
fatal(errors.New("invalid table name"))
}
config, err := pgx.ParseConnectionString(dbURI)
fatal(err)
conn, err := pgx.Connect(config)
fatal(err)
defer conn.Close()
fmt.Fprintln(os.Stderr, createStmt)
_, err = conn.Exec(createStmt)
fatal(err)
fmt.Fprintf(os.Stderr, "table '%s' has been created\n", tableName)
if noIndex || len(partitionKey) > 0 {
return
}
createIndexStmt := ""
for _, fieldName := range fieldsNames {
createIndexStmt = buildIndexStmt(tableName, fieldName, excludes, len(parentPartitionKey) > 0, noFullTextIndex)
if len(createIndexStmt) == 0 {
continue
}
fmt.Fprintln(os.Stderr, createIndexStmt)
_, err = conn.Exec(createIndexStmt)
fatal(err)
fmt.Fprintf(os.Stderr, "Index has been created on %s\n", fieldName)
}
},
}
func buildCreateChildStmt(tName string, parent string, start string, end string) string {
return fmt.Sprintf(
"CREATE TABLE %s PARTITION OF %s FOR VALUES FROM ('%s') TO ('%s');",
tName, parent, start, end,
)
}
func buildCreateStmt(tName string, fNames []string, excludes map[string]bool, pKey string) string {
columns := make(map[string]string, len(fNames)+1)
for _, fName := range fNames {
if fName == "id" && len(pKey) == 0 {
columns["id"] = "UUID PRIMARY KEY"
continue
}
if fName == "id" {
columns["id"] = "UUID"
continue
}
if fName == "gmttime" {
columns["gmttime"] = "TIMESTAMP WITH TIME ZONE NULL"
continue
}
switch parser.GuessType(fName) {
case parser.MyDate:
columns[pgKey(fName)] = "DATE NULL"
case parser.MyIP:
columns[pgKey(fName)] = "INET NULL"
case parser.MyTime:
columns[pgKey(fName)] = "TIME NULL"
case parser.MyTimestamp:
columns[pgKey(fName)] = "TIMESTAMP WITH TIME ZONE NULL"
case parser.MyURI:
columns[pgKey(fName)] = "TEXT DEFAULT '' NOT NULL"
case parser.Float64:
columns[pgKey(fName)] = "DOUBLE PRECISION NULL"
case parser.Int64:
columns[pgKey(fName)] = "BIGINT NULL"
case parser.Bool:
columns[pgKey(fName)] = "BOOLEAN NULL"
case parser.String:
columns[pgKey(fName)] = "TEXT DEFAULT '' NOT NULL"
default:
columns[pgKey(fName)] = "TEXT DEFAULT '' NOT NULL"
}
}
createStmt := "CREATE TABLE %s (\n"
for _, fName := range fNames {
if excludes[strings.ToLower(fName)] {
continue
}
createStmt += fmt.Sprintf(" %s %s,\n", pgKey(fName), columns[pgKey(fName)])
}
// remove last ,
createStmt = strings.Trim(createStmt, ",\n")
// add a PARTITION if requested
if len(pKey) > 0 {
createStmt += fmt.Sprintf(") PARTITION BY RANGE (%s);", pKey)
} else {
createStmt += ");"
}
return fmt.Sprintf(createStmt, tName)
}
func buildIndexStmt(tName string, fName string, excludes map[string]bool, isChild bool, nofulltext bool) string {
if excludes[strings.ToLower(fName)] {
return ""
}
if fName == "id" {
// primary key
if isChild {
return fmt.Sprintf("CREATE INDEX %s_%s_idx ON %s (%s);", tName, pgKey(fName), tName, pgKey(fName))
}
return ""
}
if fName == "cs-uri-query" || fName == "cs(referer)" || fName == "cs-uri-path" {
// fields too large for a BTREE index
return ""
}
if fName == "cs(user-agent)" {
if nofulltext {
return fmt.Sprintf("CREATE INDEX %s_%s_idx ON %s (%s);", tName, pgKey(fName), tName, pgKey(fName))
}
return fmt.Sprintf(
"CREATE INDEX %s_full_useragent_idx ON %s USING GIN (to_tsvector('english', %s));",
tName,
tName,
pgKey(fName),
)
}
switch parser.GuessType(fName) {
case parser.MyDate, parser.MyTime, parser.MyTimestamp:
return fmt.Sprintf("CREATE INDEX %s_%s_idx ON %s (%s);", tName, pgKey(fName), tName, pgKey(fName))
case parser.MyIP:
return fmt.Sprintf("CREATE INDEX %s_%s_idx ON %s USING GIST (%s inet_ops);", tName, pgKey(fName), tName, pgKey(fName))
case parser.Float64, parser.Int64, parser.Bool:
return fmt.Sprintf("CREATE INDEX %s_%s_idx ON %s (%s);", tName, pgKey(fName), tName, pgKey(fName))
case parser.String, parser.MyURI:
return fmt.Sprintf("CREATE INDEX %s_%s_idx ON %s (%s);", tName, pgKey(fName), tName, pgKey(fName))
default:
return ""
}
}
func init() {
rootCmd.AddCommand(createTableCmd)
createTableCmd.Flags().StringVar(&tableName, "tablename", "accesslogs", "name of table to be created in pgsql")
createTableCmd.Flags().StringVar(&fieldsLine, "fields", "", "specify the fields that will be present in the access logs")
createTableCmd.Flags().StringVar(&filename, "filename", "", "specify the log file from which to extract the fields")
createTableCmd.Flags().StringVar(&dbURI, "uri", "", "the URI of the postgresql server to connect to")
createTableCmd.Flags().BoolVar(&noIndex, "noindex", false, "if set, do not create indices in pgsql")
createTableCmd.Flags().BoolVar(&noFullTextIndex, "nofulltext", false, "if set, do not create a full text search index on user-agent field")
createTableCmd.Flags().StringVar(&partitionKey, "partition", "", "if set, create a partitioned table using the given column name")
createTableCmd.Flags().StringVar(&parentPartitionKey, "parent", "", "if set, create the table as a child partition of that parent")
createTableCmd.Flags().StringVar(&rangeStart, "start", "", "range start for the child partition")
createTableCmd.Flags().StringVar(&rangeEnd, "end", "", "range end for the child partition")
createTableCmd.Flags().StringArrayVar(&excludedFields, "exclude", []string{}, "exclude that field from collection (can be repeated)")
}