diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f15a4981b4..92ccb91ddb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#3519](https://github.com/influxdb/influxdb/pull/3519): **--BREAKING CHANGE--** Update line protocol to require trailing i for field values that are integers - [#3529](https://github.com/influxdb/influxdb/pull/3529): Add TLS support for OpenTSDB plugin. Thanks @nathanielc - [#3421](https://github.com/influxdb/influxdb/issues/3421): Should update metastore and cluster if IP or hostname changes +- [#3502](https://github.com/influxdb/influxdb/pull/3502): Importer for 0.8.9 data via the CLI ### Bugfixes - [#3405](https://github.com/influxdb/influxdb/pull/3405): Prevent database panic when fields are missing. Thanks @jhorwit2 diff --git a/client/example_test.go b/client/example_test.go index 58805ceeaa3..5bb886edb0f 100644 --- a/client/example_test.go +++ b/client/example_test.go @@ -20,7 +20,7 @@ func ExampleNewClient() { // NOTE: this assumes you've setup a user and have setup shell env variables, // namely INFLUX_USER/INFLUX_PWD. If not just ommit Username/Password below. - conf := client.Config{ + conf := &client.Config{ URL: *host, Username: os.Getenv("INFLUX_USER"), Password: os.Getenv("INFLUX_PWD"), @@ -37,7 +37,7 @@ func ExampleClient_Ping() { if err != nil { log.Fatal(err) } - con, err := client.NewClient(client.Config{URL: *host}) + con, err := client.NewClient(&client.Config{URL: *host}) if err != nil { log.Fatal(err) } @@ -54,7 +54,7 @@ func ExampleClient_Query() { if err != nil { log.Fatal(err) } - con, err := client.NewClient(client.Config{URL: *host}) + con, err := client.NewClient(&client.Config{URL: *host}) if err != nil { log.Fatal(err) } @@ -73,7 +73,7 @@ func ExampleClient_Write() { if err != nil { log.Fatal(err) } - con, err := client.NewClient(client.Config{URL: *host}) + con, err := client.NewClient(&client.Config{URL: *host}) if err != nil { log.Fatal(err) } diff --git a/client/influxdb.go b/client/influxdb.go index 2ec08a96eec..16c301d97ea 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -7,20 +7,68 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "net/url" + "strconv" + "strings" "time" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/tsdb" ) +const ( + // DefaultHost is the default host used to connect to an InfluxDB instance + DefaultHost = "localhost" + + // DefaultPort is the default port used to connect to an InfluxDB instance + DefaultPort = 8086 + + // DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance + DefaultTimeout = 0 +) + // Query is used to send a command to the server. Both Command and Database are required. type Query struct { Command string Database string } +// ParseConnectionString will parse a string to create a valid connection URL +func ParseConnectionString(path string, ssl bool) (url.URL, error) { + var host string + var port int + + if strings.Contains(path, ":") { + h := strings.Split(path, ":") + i, e := strconv.Atoi(h[1]) + if e != nil { + return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, e) + } + port = i + if h[0] == "" { + host = DefaultHost + } else { + host = h[0] + } + } else { + host = path + // If they didn't specify a port, always use the default port + port = DefaultPort + } + + u := url.URL{ + Scheme: "http", + } + if ssl { + u.Scheme = "https" + } + u.Host = net.JoinHostPort(host, strconv.Itoa(port)) + + return u, nil +} + // Config is used to specify what server to connect to. // URL: The URL of the server connecting to. // Username/Password are optional. They will be passed via basic auth if provided. @@ -34,6 +82,13 @@ type Config struct { Timeout time.Duration } +// NewConfig will create a config to be used in connecting to the client +func NewConfig() *Config { + return &Config{ + Timeout: DefaultTimeout, + } +} + // Client is used to make calls to the server. type Client struct { url url.URL @@ -51,7 +106,7 @@ const ( ) // NewClient will instantiate and return a connected client to issue commands to the server. -func NewClient(c Config) (*Client, error) { +func NewClient(c *Config) (*Client, error) { client := Client{ url: c.URL, username: c.Username, @@ -120,7 +175,8 @@ func (c *Client) Query(q Query) (*Response, error) { // If successful, error is nil and Response is nil // If an error occurs, Response may contain additional information if populated. func (c *Client) Write(bp BatchPoints) (*Response, error) { - c.url.Path = "write" + u := c.url + u.Path = "write" var b bytes.Buffer for _, p := range bp.Points { @@ -146,7 +202,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) { } } - req, err := http.NewRequest("POST", c.url.String(), &b) + req, err := http.NewRequest("POST", u.String(), &b) if err != nil { return nil, err } @@ -156,10 +212,10 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) { req.SetBasicAuth(c.username, c.password) } params := req.URL.Query() - params.Add("db", bp.Database) - params.Add("rp", bp.RetentionPolicy) - params.Add("precision", bp.Precision) - params.Add("consistency", bp.WriteConsistency) + params.Set("db", bp.Database) + params.Set("rp", bp.RetentionPolicy) + params.Set("precision", bp.Precision) + params.Set("consistency", bp.WriteConsistency) req.URL.RawQuery = params.Encode() resp, err := c.httpClient.Do(req) @@ -170,7 +226,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) { var response Response body, err := ioutil.ReadAll(resp.Body) - if err != nil && err.Error() != "EOF" { + if err != nil { return nil, err } @@ -183,6 +239,52 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) { return nil, nil } +// WriteLineProtocol takes a string with line returns to delimit each write +// If successful, error is nil and Response is nil +// If an error occurs, Response may contain additional information if populated. +func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) { + u := c.url + u.Path = "write" + + r := strings.NewReader(data) + + req, err := http.NewRequest("POST", u.String(), r) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.userAgent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + params := req.URL.Query() + params.Set("db", database) + params.Set("rp", retentionPolicy) + params.Set("precision", precision) + params.Set("consistency", writeConsistency) + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var response Response + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + err := fmt.Errorf(string(body)) + response.Err = err + return &response, err + } + + return nil, nil +} + // Ping will check to see if the server is up // Ping returns how long the request took, the version of the server it connected to, and an error if one occurred. func (c *Client) Ping() (time.Duration, string, error) { diff --git a/client/influxdb_test.go b/client/influxdb_test.go index 0a6df042eea..d30a5ee7877 100644 --- a/client/influxdb_test.go +++ b/client/influxdb_test.go @@ -84,7 +84,7 @@ func BenchmarkUnmarshalJSON10Tags(b *testing.B) { } func TestNewClient(t *testing.T) { - config := client.Config{} + config := &client.Config{} _, err := client.NewClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) @@ -96,7 +96,7 @@ func TestClient_Ping(t *testing.T) { defer ts.Close() u, _ := url.Parse(ts.URL) - config := client.Config{URL: *u} + config := &client.Config{URL: *u} c, err := client.NewClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) @@ -122,7 +122,7 @@ func TestClient_Query(t *testing.T) { defer ts.Close() u, _ := url.Parse(ts.URL) - config := client.Config{URL: *u} + config := &client.Config{URL: *u} c, err := client.NewClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) @@ -154,7 +154,7 @@ func TestClient_BasicAuth(t *testing.T) { u, _ := url.Parse(ts.URL) u.User = url.UserPassword("username", "password") - config := client.Config{URL: *u, Username: "username", Password: "password"} + config := &client.Config{URL: *u, Username: "username", Password: "password"} c, err := client.NewClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) @@ -175,7 +175,7 @@ func TestClient_Write(t *testing.T) { defer ts.Close() u, _ := url.Parse(ts.URL) - config := client.Config{URL: *u} + config := &client.Config{URL: *u} c, err := client.NewClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) @@ -226,7 +226,7 @@ func TestClient_UserAgent(t *testing.T) { for _, test := range tests { u, _ := url.Parse(ts.URL) - config := client.Config{URL: *u, UserAgent: test.userAgent} + config := &client.Config{URL: *u, UserAgent: test.userAgent} c, err := client.NewClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) @@ -507,7 +507,7 @@ func TestClient_Timeout(t *testing.T) { defer ts.Close() u, _ := url.Parse(ts.URL) - config := client.Config{URL: *u, Timeout: 500 * time.Millisecond} + config := &client.Config{URL: *u, Timeout: 500 * time.Millisecond} c, err := client.NewClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) @@ -521,7 +521,7 @@ func TestClient_Timeout(t *testing.T) { t.Fatalf("unexpected error. expected 'use of closed network connection' error, got %v", err) } - confignotimeout := client.Config{URL: *u} + confignotimeout := &client.Config{URL: *u} cnotimeout, err := client.NewClient(confignotimeout) _, err = cnotimeout.Query(query) if err != nil { diff --git a/cmd/influx/main.go b/cmd/influx/main.go index e480c6d7ca3..cef4761169a 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "io" + "net" "net/url" "os" "os/user" @@ -16,6 +17,7 @@ import ( "text/tabwriter" "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/importer/v8" "github.com/peterh/liner" ) @@ -25,8 +27,6 @@ var ( ) const ( - default_host = "localhost" - default_port = 8086 default_format = "column" ) @@ -45,14 +45,17 @@ type CommandLine struct { Format string // controls the output format. Valid values are json, csv, or column Execute string ShowVersion bool + Import bool + Path string + Compressed bool } func main() { c := CommandLine{} fs := flag.NewFlagSet("InfluxDB shell version "+version, flag.ExitOnError) - fs.StringVar(&c.Host, "host", default_host, "Influxdb host to connect to.") - fs.IntVar(&c.Port, "port", default_port, "Influxdb port to connect to.") + fs.StringVar(&c.Host, "host", client.DefaultHost, "Influxdb host to connect to.") + fs.IntVar(&c.Port, "port", client.DefaultPort, "Influxdb port to connect to.") fs.StringVar(&c.Username, "username", c.Username, "Username to connect to the server.") fs.StringVar(&c.Password, "password", c.Password, `Password to connect to the server. Leaving blank will prompt for password (--password="").`) fs.StringVar(&c.Database, "database", c.Database, "Database to connect to the server.") @@ -61,6 +64,9 @@ func main() { fs.BoolVar(&c.Pretty, "pretty", false, "Turns on pretty print for the json format.") fs.StringVar(&c.Execute, "execute", c.Execute, "Execute command and quit.") fs.BoolVar(&c.ShowVersion, "version", false, "Displays the InfluxDB version.") + fs.BoolVar(&c.Import, "import", false, "Import a previous database.") + fs.StringVar(&c.Path, "path", "", "path to the file to import") + fs.BoolVar(&c.Compressed, "compressed", false, "set to true if the import file is compressed") // Define our own custom usage to print fs.Usage = func() { @@ -85,13 +91,19 @@ func main() { Format specifies the format of the server responses: json, csv, or column. -pretty Turns on pretty print for the json format. + -import + Import a previous database export from file + -path + Path to file to import + -compressed + Set to true if the import file is compressed Examples: # Use influx in a non-interactive mode to query the database "metrics" and pretty print json: $ influx -database 'metrics' -execute 'select * from cpu' -format 'json' -pretty - # Connect to a specific database on startup and set database context: + # Connect to a specific database on startup and set database context: $ influx -database 'metrics' -host 'localhost' -port '8086' `) } @@ -124,16 +136,48 @@ Examples: } } - c.connect("") + if err := c.connect(""); err != nil { + + } + if c.Execute == "" && !c.Import { + fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) + } if c.Execute != "" { if err := c.ExecuteQuery(c.Execute); err != nil { c.Line.Close() os.Exit(1) - } else { + } + c.Line.Close() + os.Exit(0) + } + + if c.Import { + path := net.JoinHostPort(c.Host, strconv.Itoa(c.Port)) + u, e := client.ParseConnectionString(path, c.Ssl) + if e != nil { + fmt.Println(e) + return + } + + config := v8.NewConfig() + config.Username = c.Username + config.Password = c.Password + config.Precision = "ns" + config.WriteConsistency = "any" + config.Path = c.Path + config.Version = version + config.URL = u + config.Compressed = c.Compressed + + i := v8.NewImporter(config) + if err := i.Import(); err != nil { + fmt.Printf("ERROR: %s\n", err) c.Line.Close() - os.Exit(0) + os.Exit(1) } + c.Line.Close() + os.Exit(0) } showVersion() @@ -211,66 +255,40 @@ func (c *CommandLine) ParseCommand(cmd string) bool { return true } -func (c *CommandLine) connect(cmd string) { +func (c *CommandLine) connect(cmd string) error { var cl *client.Client + var u url.URL - if cmd != "" { - // Remove the "connect" keyword if it exists - cmd = strings.TrimSpace(strings.Replace(cmd, "connect", "", -1)) - if cmd == "" { - return - } - if strings.Contains(cmd, ":") { - h := strings.Split(cmd, ":") - i, e := strconv.Atoi(h[1]) - if e != nil { - fmt.Printf("Connect error: Invalid port number %q: %s\n", cmd, e) - return - } - c.Port = i - if h[0] == "" { - c.Host = default_host - } else { - c.Host = h[0] - } - } else { - c.Host = cmd - // If they didn't specify a port, always use the default port - c.Port = default_port - } - } + // Remove the "connect" keyword if it exists + path := strings.TrimSpace(strings.Replace(cmd, "connect", "", -1)) - u := url.URL{ - Scheme: "http", + // If they didn't provide a connection string, use the current settings + if path == "" { + path = net.JoinHostPort(c.Host, strconv.Itoa(c.Port)) } - if c.Ssl { - u.Scheme = "https" + + var e error + u, e = client.ParseConnectionString(path, c.Ssl) + if e != nil { + return e } - if c.Port > 0 { - u.Host = fmt.Sprintf("%s:%d", c.Host, c.Port) - } else { - u.Host = c.Host - } - cl, err := client.NewClient( - client.Config{ - URL: u, - Username: c.Username, - Password: c.Password, - UserAgent: "InfluxDBShell/" + version, - }) + + config := client.NewConfig() + config.URL = u + config.Username = c.Username + config.Password = c.Password + config.UserAgent = "InfluxDBShell/" + version + cl, err := client.NewClient(config) if err != nil { - fmt.Printf("Could not create client %s", err) - return + return fmt.Errorf("Could not create client %s", err) } c.Client = cl if _, v, e := c.Client.Ping(); e != nil { - fmt.Printf("Failed to connect to %s\n", c.Client.Addr()) + return fmt.Errorf("Failed to connect to %s\n", c.Client.Addr()) } else { c.Version = v - if c.Execute == "" { - fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) - } } + return nil } func (c *CommandLine) SetAuth(cmd string) { diff --git a/cmd/influx/main_test.go b/cmd/influx/main_test.go index eb58cb1e86a..2939509c200 100644 --- a/cmd/influx/main_test.go +++ b/cmd/influx/main_test.go @@ -101,7 +101,7 @@ func TestParseCommand_Insert(t *testing.T) { defer ts.Close() u, _ := url.Parse(ts.URL) - config := client.Config{URL: *u} + config := &client.Config{URL: *u} c, err := client.NewClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) @@ -138,7 +138,7 @@ func TestParseCommand_InsertInto(t *testing.T) { defer ts.Close() u, _ := url.Parse(ts.URL) - config := client.Config{URL: *u} + config := &client.Config{URL: *u} c, err := client.NewClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) diff --git a/importer/v8/importer.go b/importer/v8/importer.go new file mode 100644 index 00000000000..277455e923e --- /dev/null +++ b/importer/v8/importer.go @@ -0,0 +1,187 @@ +package v8 + +import ( + "bufio" + "compress/gzip" + "fmt" + "io" + "log" + "net/url" + "os" + "strings" + + "github.com/influxdb/influxdb/client" +) + +const batchSize = 5000 + +// Config is the config used to initialize a Importer importer +type Config struct { + Username string + Password string + URL url.URL + Precision string + WriteConsistency string + Path string + Version string + Compressed bool +} + +// NewConfig returns an initialized *Config +func NewConfig() *Config { + return &Config{} +} + +// Importer is the importer used for importing 0.8 data +type Importer struct { + client *client.Client + database string + retentionPolicy string + config *Config + batch []string + totalInserts int + failedInserts int + totalCommands int +} + +// NewImporter will return an intialized Importer struct +func NewImporter(config *Config) *Importer { + return &Importer{ + config: config, + batch: make([]string, 0, batchSize), + } +} + +// Import processes the specified file in the Config and writes the data to the databases in chunks specified by batchSize +func (i *Importer) Import() error { + // Create a client and try to connect + config := client.NewConfig() + config.URL = i.config.URL + config.Username = i.config.Username + config.Password = i.config.Password + config.UserAgent = fmt.Sprintf("influxDB importer/%s", i.config.Version) + cl, err := client.NewClient(config) + if err != nil { + return fmt.Errorf("could not create client %s", err) + } + i.client = cl + if _, _, e := i.client.Ping(); e != nil { + return fmt.Errorf("failed to connect to %s\n", i.client.Addr()) + } + + // Validate args + if i.config.Path == "" { + return fmt.Errorf("file argument required") + } + + defer func() { + if i.totalInserts > 0 { + log.Printf("Processed %d commands\n", i.totalCommands) + log.Printf("Processed %d inserts\n", i.totalInserts) + log.Printf("Failed %d inserts\n", i.failedInserts) + } + }() + + // Open the file + f, err := os.Open(i.config.Path) + if err != nil { + return err + } + defer f.Close() + + var r io.Reader + + // If gzipped, wrap in a gzip reader + if i.config.Compressed { + gr, err := gzip.NewReader(f) + if err != nil { + return err + } + defer gr.Close() + // Set the reader to the gzip reader + r = gr + } else { + // Standard text file so our reader can just be the file + r = f + } + + // Get our reader + scanner := bufio.NewScanner(r) + + // Process the scanner + i.processDDL(scanner) + i.processDML(scanner) + + // Check if we had any errors scanning the file + if err := scanner.Err(); err != nil { + return fmt.Errorf("reading standard input: %s", err) + } + + return nil +} + +func (i *Importer) processDDL(scanner *bufio.Scanner) { + for scanner.Scan() { + line := scanner.Text() + // If we find the DML token, we are done with DDL + if strings.HasPrefix(line, "# DML") { + return + } + if strings.HasPrefix(line, "#") { + continue + } + i.queryExecutor(line) + } +} + +func (i *Importer) processDML(scanner *bufio.Scanner) { + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "# CONTEXT-DATABASE:") { + i.database = strings.TrimSpace(strings.Split(line, ":")[1]) + } + if strings.HasPrefix(line, "# CONTEXT-RETENTION-POLICY:") { + i.retentionPolicy = strings.TrimSpace(strings.Split(line, ":")[1]) + } + if strings.HasPrefix(line, "#") { + continue + } + i.batchAccumulator(line) + } +} + +func (i *Importer) execute(command string) { + response, err := i.client.Query(client.Query{Command: command, Database: i.database}) + if err != nil { + log.Printf("error: %s\n", err) + return + } + if err := response.Error(); err != nil { + log.Printf("error: %s\n", response.Error()) + } +} + +func (i *Importer) queryExecutor(command string) { + i.totalCommands++ + i.execute(command) +} + +func (i *Importer) batchAccumulator(line string) { + i.batch = append(i.batch, line) + if len(i.batch) == batchSize { + if e := i.batchWrite(); e != nil { + log.Println("error writing batch: ", e) + // Output failed lines to STDOUT so users can capture lines that failed to import + fmt.Println(strings.Join(i.batch, "\n")) + i.failedInserts += len(i.batch) + } else { + i.totalInserts += len(i.batch) + } + i.batch = i.batch[:0] + } +} + +func (i *Importer) batchWrite() error { + _, e := i.client.WriteLineProtocol(strings.Join(i.batch, "\n"), i.database, i.retentionPolicy, i.config.Precision, i.config.WriteConsistency) + return e +}