-
Notifications
You must be signed in to change notification settings - Fork 0
/
pg_server.go
148 lines (136 loc) · 4.49 KB
/
pg_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package main
import (
"context"
"database/sql"
"database/sql/driver"
"github.com/marcboeker/go-duckdb"
"github.com/sirupsen/logrus"
"github.com/supercaracal/scram-sha-256/pkg/pgpasswd"
"net"
"net/http"
"sync"
)
type ClickhouseOptions struct {
Enabled bool
Listen string
}
type serverOptions struct {
DbPath string
Listen string
ClickhouseOptions ClickhouseOptions
UseHack bool
Auth bool
}
type PgServer struct {
Connector *duckdb.Connector
conn *sql.DB
backends sync.Map
enableAuth bool
}
func duckdbInit(execer driver.ExecerContext) error {
var statements = []string{
`create view if not exists pg_type as select type_oid as oid,case when logical_type like '%TIMESTAMP_%' then 'TIMESTAMP' when logical_type = 'DECIMAL' then 'NUMERIC' when logical_type='BOOLEAN' then 'bool' else logical_type end as typname from duckdb_types where oid is not null;`,
`create view if not exists pg_matviews as select '' as matviewname , '' as schemaname limit 0;`,
`create view if not exists information_schema.constraint_column_usage as select '' constraint_name limit 0;`,
`create function if not exists array_positions(a,b) as 0;`,
`create function if not exists timezone() as 'utc';`,
`create function if not exists currentDatabase() as current_schema();`,
`create schema if not exists system;`,
`create view if not exists system.databases as
select schema_name as name
from information_schema.schemata
where catalog_name not in ('system', 'temp');`,
`create view if not exists system.tables as
select table_name as name,
table_schema as database,
'uuid' as uuid,
'duckdb' as engine,
0 as is_temporary,
table_comment as comment
from information_schema.tables
where table_type = 'BASE TABLE';`,
`create view if not exists system.columns as
select table_schema as database,
table_name as table,
column_name as name,
data_type as type,
column_comment as comment,
data_type default_kind,
column_default as default_expression
from information_schema.columns;`,
`create view if not exists system.functions as
select proname as name, prokind = 'a' as is_aggregate
from pg_proc;`,
}
for _, stmt := range statements {
if _, err := execer.ExecContext(context.Background(), stmt, nil); err != nil {
return err
}
}
return nil
}
func (s *PgServer) Start(options serverOptions) error {
var duckConnector *duckdb.Connector
var err error
if options.UseHack {
duckConnector, err = duckdb.NewConnector(options.DbPath, duckdbInit)
} else {
duckConnector, err = duckdb.NewConnector(options.DbPath, nil)
}
if err != nil {
return err
}
logrus.Infof("Open DuckDB database at %s", options.DbPath)
s.Connector = duckConnector
s.conn = sql.OpenDB(s.Connector)
if options.Auth {
s.enableAuth = true
_, err = s.conn.ExecContext(context.Background(), "create schema if not exists duckserver;")
_, err = s.conn.ExecContext(context.Background(), "create table if not exists duckserver.users (username text primary key, password text);")
}
if options.ClickhouseOptions.Enabled {
go s.StartClickhouseHttp(options.ClickhouseOptions)
}
lis, err := net.Listen("tcp", options.Listen)
if err != nil {
return err
}
logrus.Infof("Listening postgresql wire protocol on %s", options.Listen)
for {
conn, err := lis.Accept()
if err != nil {
continue
}
pgConn := newPgConn(conn, s)
pgConn.Run()
}
}
func (s *PgServer) CreateUser(user, password string) error {
pass, err := pgpasswd.Encrypt([]byte(password))
if err != nil {
return err
}
_, err = s.conn.ExecContext(context.Background(), "insert into duckserver.users (username, password) values ($1, $2)", user, pass)
return err
}
func (s *PgServer) GetPassword(user string) (string, error) {
var pass string
err := s.conn.QueryRowContext(context.Background(),
"select password from duckserver.users where username = $1", user).Scan(&pass)
return pass, err
}
func (s *PgServer) StartClickhouseHttp(options ClickhouseOptions) {
chServer := ChServer{conn: sql.OpenDB(s.Connector), connector: s.Connector, pgServer: s}
logrus.Infof("Listening clickhouse http protocol on %s", options.Listen)
logrus.Fatal(http.ListenAndServe(options.Listen, &chServer))
}
func (s *PgServer) Close(key [8]byte) {
s.backends.Delete(key)
}
func (s *PgServer) CancelRequest(key [8]byte) {
if backend, ok := s.backends.Load(key); ok {
if backend.(*PgConn).cancel != nil {
backend.(*PgConn).cancel()
}
}
}