Skip to content

Commit

Permalink
exporter/clickhouse: use endpoint instead of raw dsn in configuration. (
Browse files Browse the repository at this point in the history
  • Loading branch information
hanjm committed Jan 27, 2023
1 parent 72d590e commit c3d68f6
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 93 deletions.
16 changes: 16 additions & 0 deletions .chloggen/clickhouseexporter-tidy-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: clickhouseexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: use endpoint instead of raw dsn in configuration.

# One or more tracking issues related to the change
issues: [8028]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
17 changes: 9 additions & 8 deletions exporter/clickhouseexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,17 @@ around 40k/s logs entry per CPU cores, add more collector node can increase line

The following settings are required:

- `dsn` (no default): The ClickHouse server DSN (Data Source Name), for
example `tcp://user:password@127.0.0.1:9000/default`
For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn).
For http protocol
reference: [ClickHouse/clickhouse-go#http-support-experimental](https://github.com/ClickHouse/clickhouse-go/tree/main#http-support-experimental)
.
- `endpoint` (no default): The ClickHouse server endpoint, support multi host, for example:
tcp protocol `tcp://ip1:port,ip2:port`
http protocol `http://ip:port,ip2:port`

The following settings can be optionally configured:

- `ttl_days` (default= 0): The data time-to-live in days, 0 means no ttl.
- `username` (default = ): The authentication username.
- `password` (default = ): The authentication password.
- `ttl_days` (default = 0): The data time-to-live in days, 0 means no ttl.
- `database` (default = otel): The database name.
- `connection_params` (default = {}). Params is the extra connection parameters with map format. for example compression/dial_timeout.
- `logs_table_name` (default = otel_logs): The table name for logs.
- `traces_table_name` (default = otel_traces): The table name for traces.
- `metrics_table_name` (default = otel_metrics): The table name for metrics.
Expand All @@ -297,7 +297,8 @@ processors:
send_batch_size: 100000
exporters:
clickhouse:
dsn: tcp://127.0.0.1:9000/otel
endpoint: tcp://127.0.0.1:9000
database: otel
ttl_days: 3
logs_table: otel_logs
traces_table: otel_traces
Expand Down
54 changes: 35 additions & 19 deletions exporter/clickhouseexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-co

import (
"errors"
"fmt"
"net/url"
"strings"

"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/multierr"
Expand All @@ -32,10 +30,18 @@ type Config struct {
// because only QueueSize is user-settable.
QueueSettings QueueSettings `mapstructure:"sending_queue"`

// DSN is the ClickHouse server Data Source Name.
// For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn).
// For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn).
DSN string `mapstructure:"dsn"`
// Endpoint is the clickhouse server endpoint.
// TCP endpoint: tcp://ip1:port,ip2:port
// HTTP endpoint: http://ip:port,ip2:port
Endpoint string `mapstructure:"endpoint"`
// Username is the authentication username.
Username string `mapstructure:"username"`
// Username is the authentication password.
Password string `mapstructure:"password"`
// Database is the database name to export.
Database string `mapstructure:"database"`
// ConnectionParams is the extra connection parameters with map format. for example compression/dial_timeout
ConnectionParams map[string]string `mapstructure:"connection_params"`
// LogsTableName is the table name for logs. default is `otel_logs`.
LogsTableName string `mapstructure:"logs_table_name"`
// TracesTableName is the table name for logs. default is `otel_traces`.
Expand All @@ -53,35 +59,45 @@ type QueueSettings struct {
}

var (
errConfigNoDSN = errors.New("dsn must be specified")
errConfigNoEndpoint = errors.New("endpoint must be specified")
errConfigInvalidEndpoint = errors.New("endpoint must be url format")
)

// Validate validates the clickhouse server configuration.
func (cfg *Config) Validate() (err error) {
if cfg.DSN == "" {
err = multierr.Append(err, errConfigNoDSN)
if cfg.Endpoint == "" {
err = multierr.Append(err, errConfigNoEndpoint)
}
_, e := parseDSNDatabase(cfg.DSN)
_, e := cfg.buildDSN(cfg.Database)
if e != nil {
err = multierr.Append(err, fmt.Errorf("invalid dsn format:%w", err))
err = multierr.Append(err, e)
}
return err
}

const defaultDatabase = "default"

func parseDSNDatabase(dsn string) (string, error) {
u, err := url.Parse(dsn)
if err != nil {
return defaultDatabase, err
}
return strings.TrimPrefix(u.Path, "/"), nil
}

func (cfg *Config) enforcedQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: cfg.QueueSettings.QueueSize,
}
}

func (cfg *Config) buildDSN(database string) (string, error) {
dsn, err := url.Parse(cfg.Endpoint)
if err != nil {
return "", errConfigInvalidEndpoint
}
if cfg.Username != "" {
dsn.User = url.UserPassword(cfg.Username, cfg.Password)
}
dsn.Path = "/" + database
params := url.Values{}
for k, v := range cfg.ConnectionParams {
params.Set(k, v)
}
dsn.RawQuery = params.Encode()
return dsn.String(), nil
}
71 changes: 68 additions & 3 deletions exporter/clickhouseexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const defaultDSN = "tcp://127.0.0.1:9000/otel"
const defaultEndpoint = "tcp://127.0.0.1:9000"

func TestLoadConfig(t *testing.T) {
t.Parallel()
Expand All @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)

defaultCfg := createDefaultConfig()
defaultCfg.(*Config).DSN = defaultDSN
defaultCfg.(*Config).Endpoint = defaultEndpoint

tests := []struct {
id component.ID
Expand All @@ -49,7 +49,14 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(typeStr, "full"),
expected: &Config{
DSN: defaultDSN,
Endpoint: defaultEndpoint,
Database: "otel",
Username: "foo",
Password: "bar",
ConnectionParams: map[string]string{
"compression": "zstd",
"dial_timeout": "5s",
},
TTLDays: 3,
LogsTableName: "otel_logs",
TracesTableName: "otel_traces",
Expand Down Expand Up @@ -92,3 +99,61 @@ func withDefaultConfig(fns ...func(*Config)) *Config {
}
return cfg
}

func TestConfig_buildDSN(t *testing.T) {
type fields struct {
Endpoint string
Username string
Password string
Database string
Params map[string]string
}
type args struct {
database string
}
tests := []struct {
name string
fields fields
args args
want string
wantErr error
}{
{
name: "valid config",
fields: fields{
Endpoint: defaultEndpoint,
Username: "foo",
Password: "bar",
Database: "otel",
Params: map[string]string{
"compression": "zstd",
},
},
args: args{
database: defaultDatabase,
},
want: "tcp://foo:bar@127.0.0.1:9000/default?compression=zstd",
},
{
name: "invalid config",
fields: fields{
Endpoint: "127.0.0.1:9000",
},
wantErr: errConfigInvalidEndpoint,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Endpoint: tt.fields.Endpoint,
Username: tt.fields.Username,
Password: tt.fields.Password,
Database: tt.fields.Database,
ConnectionParams: tt.fields.Params,
}
got, err := cfg.buildDSN(tt.args.database)
assert.Equalf(t, tt.wantErr, err, "buildDSN(%v)", tt.args.database)
assert.Equalf(t, tt.want, got, "buildDSN(%v)", tt.args.database)
})
}
}
3 changes: 2 additions & 1 deletion exporter/clickhouseexporter/example/datasource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ datasources:
defaultDatabase: otel
port: 9000
server: clickhouse
protocol: native
username:
tlsSkipVerify: true
secureJsonData:
secureJsonData:
password:
- name: ClickHouse-vertamedia
type: vertamedia-clickhouse-datasource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ processors:
action: upsert
exporters:
clickhouse:
dsn: tcp://clickhouse:9000/otel
endpoint: tcp://clickhouse:9000
database: otel
logs_table_name: otel_logs
traces_table_name: otel_traces
ttl_days: 3
Expand Down
28 changes: 12 additions & 16 deletions exporter/clickhouseexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"time"

_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
Expand Down Expand Up @@ -183,16 +182,23 @@ var driverName = "clickhouse" // for testing

// newClickhouseClient create a clickhouse client.
func newClickhouseClient(cfg *Config) (*sql.DB, error) {
return sql.Open(driverName, cfg.DSN)
dsn, err := cfg.buildDSN(cfg.Database)
if err != nil {
return nil, err
}
db, err := sql.Open(driverName, dsn)
if err != nil {
return nil, err
}
return db, nil
}

func createDatabase(cfg *Config) error {
database, _ := parseDSNDatabase(cfg.DSN)
if database == defaultDatabase {
if cfg.Database == defaultDatabase {
return nil
}
// use default database to create new database
dsnUseDefaultDatabase, err := getDefaultDSN(cfg.DSN, database)
dsnUseDefaultDatabase, err := cfg.buildDSN(defaultDatabase)
if err != nil {
return err
}
Expand All @@ -203,24 +209,14 @@ func createDatabase(cfg *Config) error {
defer func() {
_ = db.Close()
}()
query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database)
query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database)
_, err = db.Exec(query)
if err != nil {
return fmt.Errorf("create database:%w", err)
}
return nil
}

func getDefaultDSN(dsn string, database string) (string, error) {
if strings.LastIndex(dsn, database) == -1 {
return "", fmt.Errorf("database not present in dsn")
}
if dsn[strings.LastIndex(dsn, database):] == defaultDatabase {
return dsn, nil
}
return fmt.Sprintf("%s%s", dsn[0:strings.LastIndex(dsn, database)], defaultDatabase), nil
}

func createLogsTable(cfg *Config, db *sql.DB) error {
if _, err := db.Exec(renderCreateLogsTableSQL(cfg)); err != nil {
return fmt.Errorf("exec create logs table sql: %w", err)
Expand Down
32 changes: 3 additions & 29 deletions exporter/clickhouseexporter/exporter_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,40 +93,14 @@ func TestExporter_pushLogsData(t *testing.T) {
return nil
})

exporter := newTestLogsExporter(t, defaultDSN)
exporter := newTestLogsExporter(t, defaultEndpoint)
mustPushLogsData(t, exporter, simpleLogs(1))
mustPushLogsData(t, exporter, simpleLogs(2))

require.Equal(t, 3, items)
})
}

func TestLogsExporter_getDefaultDns(t *testing.T) {
t.Run("database name is a substring of the DSN", func(t *testing.T) {
dsn := "tcp://mydatabase-clickhouse-headless:9000/mydatabase"
defaultDSN, err := getDefaultDSN(dsn, "mydatabase")
require.NoError(t, err)
require.Equal(t, defaultDSN, "tcp://mydatabase-clickhouse-headless:9000/default")
})
t.Run("database name isn't a substring of the DSN", func(t *testing.T) {
dsn := "tcp://newdatabase-clickhouse-headless:9000/otel"
defaultDSN, err := getDefaultDSN(dsn, "otel")
require.NoError(t, err)
require.Equal(t, defaultDSN, "tcp://newdatabase-clickhouse-headless:9000/default")
})
t.Run("error param for database", func(t *testing.T) {
dsn := "tcp://mydatabase-clickhouse-headless:9000/mydatabase"
_, err := getDefaultDSN(dsn, "otel")
require.Error(t, err)
})
t.Run("database name is same as default database", func(t *testing.T) {
dsn := "tcp://mydatabase-clickhouse-headless:9000/default"
defaultDSN, err := getDefaultDSN(dsn, "default")
require.NoError(t, err)
require.Equal(t, defaultDSN, "tcp://mydatabase-clickhouse-headless:9000/default")
})
}

func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter {
exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn))
require.NoError(t, err)
Expand All @@ -136,10 +110,10 @@ func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsEx
}

func withTestExporterConfig(fns ...func(*Config)) func(string) *Config {
return func(dsn string) *Config {
return func(endpoint string) *Config {
var configMods []func(*Config)
configMods = append(configMods, func(cfg *Config) {
cfg.DSN = dsn
cfg.Endpoint = endpoint
})
configMods = append(configMods, fns...)
return withDefaultConfig(configMods...)
Expand Down
Loading

0 comments on commit c3d68f6

Please sign in to comment.