Skip to content

Commit

Permalink
Merge 337733b into fb4335c
Browse files Browse the repository at this point in the history
  • Loading branch information
maciej committed Sep 24, 2018
2 parents fb4335c + 337733b commit 2031a53
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 32 deletions.
12 changes: 11 additions & 1 deletion clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,34 @@ type dbInit struct {

type chSuite struct {
suite.Suite
conn *sql.DB
conn *sql.DB
connWithCompression *sql.DB
}

func (s *chSuite) SetupSuite() {
dsn := os.Getenv("TEST_CLICKHOUSE_DSN")
if len(dsn) == 0 {
dsn = "http://localhost:8123/default"
}

conn, err := sql.Open("clickhouse", dsn)
s.Require().NoError(err)
s.Require().NoError(initialzer.Do(conn))
s.conn = conn

connWithCompression, err := sql.Open("clickhouse", dsn+"?enable_http_compression=1")
s.Require().NoError(err)
s.connWithCompression = connWithCompression
}

func (s *chSuite) TearDownSuite() {
s.conn.Close()
_, err := s.conn.Query("SELECT 1")
s.EqualError(err, "sql: database is closed")

s.connWithCompression.Close()
_, err = s.connWithCompression.Query("SELECT 1")
s.EqualError(err, "sql: database is closed")
}

func (d *dbInit) Do(conn *sql.DB) error {
Expand Down
38 changes: 21 additions & 17 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import (

// Config is a configuration parsed from a DSN string
type Config struct {
User string
Password string
Scheme string
Host string
Database string
Timeout time.Duration
IdleTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Location *time.Location
Debug bool
UseDBLocation bool
Params map[string]string
User string
Password string
Scheme string
Host string
Database string
Timeout time.Duration
IdleTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Location *time.Location
Debug bool
UseDBLocation bool
GzipCompression bool
Params map[string]string
}

// NewConfig creates a new config with default values
Expand All @@ -32,6 +33,7 @@ func NewConfig() *Config {
Host: "localhost:8123",
IdleTimeout: time.Hour,
Location: time.UTC,
Params: make(map[string]string),
}
}

Expand All @@ -55,6 +57,9 @@ func (cfg *Config) FormatDSN() string {
if cfg.Location != time.UTC && cfg.Location != nil {
query.Set("location", cfg.Location.String())
}
if cfg.GzipCompression {
query.Set("enable_http_compression", "1")
}
if cfg.Debug {
query.Set("debug", "1")
}
Expand Down Expand Up @@ -147,11 +152,10 @@ func parseDSNParams(cfg *Config, params map[string][]string) (err error) {
cfg.Debug, err = strconv.ParseBool(v[0])
case "default_format", "query", "database":
err = fmt.Errorf("unknown option '%s'", k)
case "enable_http_compression":
cfg.GzipCompression, err = strconv.ParseBool(v[0])
cfg.Params[k] = v[0]
default:
// lazy init
if cfg.Params == nil {
cfg.Params = make(map[string]string)
}
cfg.Params[k] = v[0]
}
if err != nil {
Expand Down
43 changes: 29 additions & 14 deletions conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clickhouse

import (
"compress/gzip"
"context"
"database/sql"
"database/sql/driver"
Expand All @@ -17,16 +18,17 @@ import (

// conn implements an interface sql.Conn
type conn struct {
url *url.URL
user *url.Userinfo
location *time.Location
useDBLocation bool
transport *http.Transport
cancel context.CancelFunc
txCtx context.Context
stmts []*stmt
logger *log.Logger
closed int32
url *url.URL
user *url.Userinfo
location *time.Location
useDBLocation bool
useGzipCompression bool
transport *http.Transport
cancel context.CancelFunc
txCtx context.Context
stmts []*stmt
logger *log.Logger
closed int32
}

func newConn(cfg *Config) *conn {
Expand All @@ -35,9 +37,10 @@ func newConn(cfg *Config) *conn {
logger = log.New(os.Stderr, "clickhouse: ", log.LstdFlags)
}
c := &conn{
url: cfg.url(map[string]string{"default_format": "TabSeparatedWithNamesAndTypes"}, false),
location: cfg.Location,
useDBLocation: cfg.UseDBLocation,
url: cfg.url(map[string]string{"default_format": "TabSeparatedWithNamesAndTypes"}, false),
location: cfg.Location,
useDBLocation: cfg.UseDBLocation,
useGzipCompression: cfg.GzipCompression,
transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: cfg.Timeout,
Expand Down Expand Up @@ -210,7 +213,15 @@ func (c *conn) doRequest(ctx context.Context, req *http.Request) (io.ReadCloser,
return nil, err
}

return resp.Body, nil
respReadCloser := resp.Body
if resp.Header.Get("Content-Encoding") == "gzip" {
respReadCloser, err = gzip.NewReader(respReadCloser)
if err != nil {
return nil, err
}
}

return respReadCloser, nil
}

func (c *conn) buildRequest(query string, params []driver.Value, readonly bool) (*http.Request, error) {
Expand All @@ -235,6 +246,10 @@ func (c *conn) buildRequest(query string, params []driver.Value, readonly bool)
p, _ := c.user.Password()
req.SetBasicAuth(c.user.Username(), p)
}
if c.useGzipCompression {
req.Header.Set("Accept-Encoding", "gzip")
}

return req, err
}

Expand Down
19 changes: 19 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (s *connSuite) TestQuery() {
},
}

// Tests on regular connection
for _, tc := range testCases {
rows, err := s.conn.Query(tc.query, tc.args...)
if !s.NoError(err) {
Expand All @@ -60,6 +61,24 @@ func (s *connSuite) TestQuery() {
}
s.NoError(rows.Close())
}

// Tests on connections with enabled compression
for _, tc := range testCases {
rows, err := s.connWithCompression.Query(tc.query, tc.args...)
if !s.NoError(err) {
continue
}
if len(tc.expected) == 0 {
s.False(rows.Next())
s.NoError(rows.Err())
} else {
v, err := scanValues(rows, tc.expected[0])
if s.NoError(err) {
s.Equal(tc.expected, v)
}
}
s.NoError(rows.Close())
}
}

func (s *connSuite) TestExec() {
Expand Down

0 comments on commit 2031a53

Please sign in to comment.