Skip to content

Commit

Permalink
Merge branch 'master' into tsv_reader_skip_not_string
Browse files Browse the repository at this point in the history
  • Loading branch information
DoubleDi committed Apr 15, 2021
2 parents 78adbcb + 4434958 commit 7927f67
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 73 deletions.
12 changes: 6 additions & 6 deletions README.md
Expand Up @@ -40,14 +40,14 @@ http://user:password@host:8123/clicks?read_timeout=10&write_timeout=20
* LowCardinality(T)
* [Array(T) (one-dimensional)](https://clickhouse.yandex/reference_en.html#Array(T))
* [Nested(Name1 Type1, Name2 Type2, ...)](https://clickhouse.yandex/docs/en/data_types/nested_data_structures/nested/)
* IPv4, IPv6

Notes:
database/sql does not allow to use big uint64 values.
It is recommended use type `UInt64` which is provided by driver for such kind of values.
type `[]byte` are used as raw string (without quoting)
for passing value of type `[]uint8` to driver as array - please use the wrapper `clickhouse.Array`
for passing decimal value please use the wrappers `clickhouse.Decimal*`
for passing IPv4/IPv6 types use `clickhouse.IP`
* database/sql does not allow to use big uint64 values. It is recommended use type `UInt64` which is provided by driver for such kind of values.
* type `[]byte` are used as raw string (without quoting)
* for passing value of type `[]uint8` to driver as array - please use the wrapper `clickhouse.Array`
* for passing decimal value please use the wrappers `clickhouse.Decimal*`
* for passing IPv4/IPv6 types use `clickhouse.IP`

## Supported request params

Expand Down
65 changes: 21 additions & 44 deletions conn.go
Expand Up @@ -17,7 +17,7 @@ import (
"sync/atomic"
"time"

"github.com/gofrs/uuid"
"github.com/google/uuid"
)

type key int
Expand Down Expand Up @@ -203,7 +203,7 @@ func (c *conn) killQuery(req *http.Request, args []driver.Value) error {
}
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
defer cancelFunc()
req, err := c.buildRequest(ctx, query, args, false)
req, err := c.buildRequest(ctx, query, args)
if err != nil {
return err
}
Expand All @@ -223,7 +223,7 @@ func (c *conn) query(ctx context.Context, query string, args []driver.Value) (dr
if atomic.LoadInt32(&c.closed) != 0 {
return nil, driver.ErrBadConn
}
req, err := c.buildRequest(ctx, query, args, true)
req, err := c.buildRequest(ctx, query, args)
if err != nil {
return nil, err
}
Expand All @@ -245,7 +245,7 @@ func (c *conn) exec(ctx context.Context, query string, args []driver.Value) (dri
if atomic.LoadInt32(&c.closed) != 0 {
return nil, driver.ErrBadConn
}
req, err := c.buildRequest(ctx, query, args, false)
req, err := c.buildRequest(ctx, query, args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -285,41 +285,29 @@ func (c *conn) doRequest(ctx context.Context, req *http.Request) (io.ReadCloser,
return resp.Body, nil
}

func (c *conn) buildRequest(ctx context.Context, query string, params []driver.Value, readonly bool) (*http.Request, error) {
var (
method string
err error
)
func (c *conn) buildRequest(ctx context.Context, query string, params []driver.Value) (*http.Request, error) {
var err error
if len(params) > 0 {
if query, err = interpolateParams(query, params); err != nil {
return nil, err
}
}

var (
bodyReader io.Reader
bodyWriter io.WriteCloser
)
if readonly {
method = http.MethodGet
} else {
method = http.MethodPost
bodyReader, bodyWriter = io.Pipe()
go func() {
if c.useGzipCompression {
gz := gzip.NewWriter(bodyWriter)
gz.Write([]byte(query))
gz.Close()
bodyWriter.Close()
} else {
bodyWriter.Write([]byte(query))
bodyWriter.Close()
}
}()
}
bodyReader, bodyWriter := io.Pipe()
go func() {
if c.useGzipCompression {
gz := gzip.NewWriter(bodyWriter)
gz.Write([]byte(query))
gz.Close()
bodyWriter.Close()
} else {
bodyWriter.Write([]byte(query))
bodyWriter.Close()
}
}()
c.log("query: ", query)

req, err := http.NewRequest(method, c.url.String(), bodyReader)
req, err := http.NewRequest(http.MethodPost, c.url.String(), bodyReader)
if err != nil {
return nil, err
}
Expand All @@ -341,12 +329,7 @@ func (c *conn) buildRequest(ctx context.Context, query string, params []driver.V
}
queryID, queryOk := ctx.Value(QueryID).(string)
if c.killQueryOnErr && (!queryOk || queryID == "") {
queryUUID, err := uuid.NewV4()
if err != nil {
c.log("can't generate query_id: ", err)
} else {
queryID = queryUUID.String()
}
queryID = uuid.New().String()
}
if queryID != "" {
if reqQuery == nil {
Expand All @@ -356,17 +339,11 @@ func (c *conn) buildRequest(ctx context.Context, query string, params []driver.V
}

}
if method == http.MethodGet {
if reqQuery == nil {
reqQuery = req.URL.Query()
}
reqQuery.Add("query", query)
}
if reqQuery != nil {
req.URL.RawQuery = reqQuery.Encode()
}

if method == http.MethodPost && c.useGzipCompression {
if c.useGzipCompression {
req.Header.Set("Content-Encoding", "gzip")
}

Expand Down
2 changes: 1 addition & 1 deletion conn_go18.go
Expand Up @@ -15,7 +15,7 @@ func (c *conn) Ping(ctx context.Context) error {
return ErrTransportNil
}

req, err := c.buildRequest(ctx, "select 1", nil, true)
req, err := c.buildRequest(ctx, "select 1", nil)
if err != nil {
return err
}
Expand Down
48 changes: 30 additions & 18 deletions conn_test.go
Expand Up @@ -7,11 +7,11 @@ import (
"database/sql/driver"
"io/ioutil"
"net/http"
"net/url"
"strings"
"testing"
"time"

"github.com/gofrs/uuid"
"github.com/google/uuid"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -180,10 +180,8 @@ func (s *connSuite) TestServerError() {

func (s *connSuite) TestServerKillQuery() {
// kill query and check if it is cancelled
queryUUID, err := uuid.NewV4()
s.Require().NoError(err)
queryID := queryUUID.String()
_, err = s.connWithKillQuery.QueryContext(context.WithValue(context.Background(), QueryID, queryID), "SELECT sleep(3)")
queryID := uuid.New().String()
_, err := s.connWithKillQuery.QueryContext(context.WithValue(context.Background(), QueryID, queryID), "SELECT sleep(3)")
s.Error(err)
s.Contains(err.Error(), "net/http: timeout awaiting response headers")
rows := s.connWithKillQuery.QueryRow("SELECT count(query_id) FROM system.processes where query_id=? and is_cancelled=?", queryID, 1)
Expand All @@ -193,9 +191,7 @@ func (s *connSuite) TestServerKillQuery() {
s.Equal(1, amount)

// not kill query and check if it is not cancelled
queryUUID, err = uuid.NewV4()
s.Require().NoError(err)
queryID = queryUUID.String()
queryID = uuid.New().String()
_, err = s.connWithKillQuery.QueryContext(context.WithValue(context.Background(), QueryID, queryID), "SELECT sleep(0.5)")
s.NoError(err)
rows = s.connWithKillQuery.QueryRow("SELECT count(query_id) FROM system.processes where query_id=? and is_cancelled=?", queryID, 0)
Expand All @@ -212,21 +208,24 @@ func (s *connSuite) TestBuildRequestReadonlyWithAuth() {
cfg.User = "user"
cfg.Password = "password"
cn := newConn(cfg)
req, err := cn.buildRequest(context.Background(), "SELECT 1", nil, true)
req, err := cn.buildRequest(context.Background(), "SELECT 1", nil)
if s.NoError(err) {
user, password, ok := req.BasicAuth()
s.True(ok)
s.Equal("user", user)
s.Equal("password", password)
s.Equal(http.MethodGet, req.Method)
s.Equal(cn.url.String()+"&query="+url.QueryEscape("SELECT 1"), req.URL.String())
s.Equal(http.MethodPost, req.Method)
s.Equal(cn.url.String(), req.URL.String())
s.Nil(req.URL.User)
b, err := ioutil.ReadAll(req.Body)
s.Require().NoError(err)
s.Equal("SELECT 1", string(b))
}
}

func (s *connSuite) TestBuildRequestReadWriteWOAuth() {
cn := newConn(NewConfig())
req, err := cn.buildRequest(context.Background(), "INSERT 1 INTO num", nil, false)
req, err := cn.buildRequest(context.Background(), "INSERT 1 INTO num", nil)
if s.NoError(err) {
_, _, ok := req.BasicAuth()
s.False(ok)
Expand Down Expand Up @@ -271,7 +270,7 @@ func (s *connSuite) TestBuildRequestWithQueryId() {
},
}
for _, tc := range testCases {
req, err := cn.buildRequest(context.WithValue(context.Background(), QueryID, tc.queryID), "INSERT 1 INTO num", nil, false)
req, err := cn.buildRequest(context.WithValue(context.Background(), QueryID, tc.queryID), "INSERT 1 INTO num", nil)
if s.NoError(err) {
s.Equal(http.MethodPost, req.Method)
s.Equal(tc.expected, req.URL.String())
Expand Down Expand Up @@ -314,7 +313,7 @@ func (s *connSuite) TestBuildRequestWithQuotaKey() {
},
}
for _, tc := range testCases {
req, err := cn.buildRequest(context.WithValue(context.Background(), QuotaKey, tc.quotaKey), "SELECT 1", nil, false)
req, err := cn.buildRequest(context.WithValue(context.Background(), QuotaKey, tc.quotaKey), "SELECT 1", nil)
if s.NoError(err) {
s.Equal(http.MethodPost, req.Method)
s.Equal(tc.expected, req.URL.String())
Expand Down Expand Up @@ -363,7 +362,7 @@ func (s *connSuite) TestBuildRequestWithQueryIdAndQuotaKey() {
ctx := context.Background()
ctx = context.WithValue(ctx, QuotaKey, tc.quotaKey)
ctx = context.WithValue(ctx, QueryID, tc.queryID)
req, err := cn.buildRequest(ctx, "SELECT 1", nil, false)
req, err := cn.buildRequest(ctx, "SELECT 1", nil)
if s.NoError(err) {
s.Equal(http.MethodPost, req.Method)
s.Equal(tc.expected, req.URL.String())
Expand All @@ -373,7 +372,7 @@ func (s *connSuite) TestBuildRequestWithQueryIdAndQuotaKey() {
func (s *connSuite) TestBuildRequestParamsInterpolation() {
query := `INSERT INTO test (str) VALUES ("Question?")`
cn := newConn(NewConfig())
req, err := cn.buildRequest(context.Background(), query, make([]driver.Value, 0), false)
req, err := cn.buildRequest(context.Background(), query, make([]driver.Value, 0))
if s.NoError(err) {
body, e := ioutil.ReadAll(req.Body)
if s.NoError(e) {
Expand All @@ -386,7 +385,7 @@ func (s *connSuite) TestRequestBodyGzipCompression() {
query := `INSERT INTO test (str) VALUES ("Question?")`
cn := newConn(NewConfig())
cn.useGzipCompression = true
req, err := cn.buildRequest(context.Background(), query, make([]driver.Value, 0), false)
req, err := cn.buildRequest(context.Background(), query, make([]driver.Value, 0))
if s.NoError(err) {
s.Contains(req.Header, "Content-Encoding")
gz, err := gzip.NewReader(req.Body)
Expand All @@ -400,6 +399,19 @@ func (s *connSuite) TestRequestBodyGzipCompression() {
}
}

func (s *connSuite) TestLongRequest() {
expected := strings.Repeat("x", 100000)
rows, err := s.conn.Query("SELECT ?", expected)
if s.NoError(err) {
rows.Next()
var actual string
err = rows.Scan(&actual)
if s.NoError(err) {
s.Equal(expected, actual)
}
}
}

func TestConn(t *testing.T) {
suite.Run(t, new(connSuite))
}
2 changes: 1 addition & 1 deletion go.mod
@@ -1,7 +1,7 @@
module github.com/mailru/go-clickhouse

require (
github.com/gofrs/uuid v3.2.0+incompatible
github.com/google/uuid v1.2.0
github.com/stretchr/testify v1.3.0
)

Expand Down
5 changes: 2 additions & 3 deletions go.sum
@@ -1,10 +1,9 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=

0 comments on commit 7927f67

Please sign in to comment.