Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Importer for 0.8.9 data via the CLI #3502

Merged
merged 11 commits into from
Aug 6, 2015
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [#3519](https://github.com/influxdb/influxdb/pull/3519): **--BREAKING CHANGE--** Update line protocol to require trailing i for field values that are integers
- [#3529](https://github.com/influxdb/influxdb/pull/3529): Add TLS support for OpenTSDB plugin. Thanks @nathanielc
- [#3421](https://github.com/influxdb/influxdb/issues/3421): Should update metastore and cluster if IP or hostname changes
- [#3502](https://github.com/influxdb/influxdb/pull/3502): Importer for 0.8.9 data via the CLI

### Bugfixes
- [#3405](https://github.com/influxdb/influxdb/pull/3405): Prevent database panic when fields are missing. Thanks @jhorwit2
Expand Down
8 changes: 4 additions & 4 deletions client/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func ExampleNewClient() {

// NOTE: this assumes you've setup a user and have setup shell env variables,
// namely INFLUX_USER/INFLUX_PWD. If not just ommit Username/Password below.
conf := client.Config{
conf := &client.Config{
URL: *host,
Username: os.Getenv("INFLUX_USER"),
Password: os.Getenv("INFLUX_PWD"),
Expand All @@ -37,7 +37,7 @@ func ExampleClient_Ping() {
if err != nil {
log.Fatal(err)
}
con, err := client.NewClient(client.Config{URL: *host})
con, err := client.NewClient(&client.Config{URL: *host})
if err != nil {
log.Fatal(err)
}
Expand All @@ -54,7 +54,7 @@ func ExampleClient_Query() {
if err != nil {
log.Fatal(err)
}
con, err := client.NewClient(client.Config{URL: *host})
con, err := client.NewClient(&client.Config{URL: *host})
if err != nil {
log.Fatal(err)
}
Expand All @@ -73,7 +73,7 @@ func ExampleClient_Write() {
if err != nil {
log.Fatal(err)
}
con, err := client.NewClient(client.Config{URL: *host})
con, err := client.NewClient(&client.Config{URL: *host})
if err != nil {
log.Fatal(err)
}
Expand Down
118 changes: 110 additions & 8 deletions client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,68 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
)

const (
// DefaultHost is the default host used to connect to an InfluxDB instance
DefaultHost = "localhost"

// DefaultPort is the default port used to connect to an InfluxDB instance
DefaultPort = 8086

// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
DefaultTimeout = 0
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the names to camel case? Also, spaces between the definition and the next comment line would be nice. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, I went old school didn't I lol. Will fix!

// Query is used to send a command to the server. Both Command and Database are required.
type Query struct {
Command string
Database string
}

// ParseConnectionString will parse a string to create a valid connection URL
func ParseConnectionString(path string, ssl bool) (url.URL, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the path just require http://... or https://... and then no need for ssl bool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could, but then you have to parse and validate, and if they don't give you either, you aren't sure what they wanted (you could default to http, but that might not be right). Making them explicitly choose makes it their responsibility to get right. Otherwise, if they don't provide either, it's now on us to try to make the right decision without enough information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that http or https would be required but no strong feeling here.

var host string
var port int

if strings.Contains(path, ":") {
h := strings.Split(path, ":")
i, e := strconv.Atoi(h[1])
if e != nil {
return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, e)
}
port = i
if h[0] == "" {
host = DefaultHost
} else {
host = h[0]
}
} else {
host = path
// If they didn't specify a port, always use the default port
port = DefaultPort
}

u := url.URL{
Scheme: "http",
}
if ssl {
u.Scheme = "https"
}
u.Host = net.JoinHostPort(host, strconv.Itoa(port))

return u, nil
}

// Config is used to specify what server to connect to.
// URL: The URL of the server connecting to.
// Username/Password are optional. They will be passed via basic auth if provided.
Expand All @@ -34,6 +82,13 @@ type Config struct {
Timeout time.Duration
}

// NewConfig will create a config to be used in connecting to the client
func NewConfig() *Config {
return &Config{
Timeout: DefaultTimeout,
}
}

// Client is used to make calls to the server.
type Client struct {
url url.URL
Expand All @@ -51,7 +106,7 @@ const (
)

// NewClient will instantiate and return a connected client to issue commands to the server.
func NewClient(c Config) (*Client, error) {
func NewClient(c *Config) (*Client, error) {
client := Client{
url: c.URL,
username: c.Username,
Expand Down Expand Up @@ -120,7 +175,8 @@ func (c *Client) Query(q Query) (*Response, error) {
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) Write(bp BatchPoints) (*Response, error) {
c.url.Path = "write"
u := c.url
u.Path = "write"

var b bytes.Buffer
for _, p := range bp.Points {
Expand All @@ -146,7 +202,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
}
}

req, err := http.NewRequest("POST", c.url.String(), &b)
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return nil, err
}
Expand All @@ -156,10 +212,10 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Add("db", bp.Database)
params.Add("rp", bp.RetentionPolicy)
params.Add("precision", bp.Precision)
params.Add("consistency", bp.WriteConsistency)
params.Set("db", bp.Database)
params.Set("rp", bp.RetentionPolicy)
params.Set("precision", bp.Precision)
params.Set("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode()

resp, err := c.httpClient.Do(req)
Expand All @@ -170,7 +226,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {

var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil && err.Error() != "EOF" {
if err != nil {
return nil, err
}

Expand All @@ -183,6 +239,52 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
return nil, nil
}

// WriteLineProtocol takes a string with line returns to delimit each write
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) {
u := c.url
u.Path = "write"

r := strings.NewReader(data)

req, err := http.NewRequest("POST", u.String(), r)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", database)
params.Set("rp", retentionPolicy)
params.Set("precision", precision)
params.Set("consistency", writeConsistency)
req.URL.RawQuery = params.Encode()

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ioutil.ReadAll() doesn't return an EOF. Also, if you're checking for EOF you can compare by reference: err != io.EOF.


if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
err := fmt.Errorf(string(body))
response.Err = err
return &response, err
}

return nil, nil
}

// Ping will check to see if the server is up
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *Client) Ping() (time.Duration, string, error) {
Expand Down
16 changes: 8 additions & 8 deletions client/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func BenchmarkUnmarshalJSON10Tags(b *testing.B) {
}

func TestNewClient(t *testing.T) {
config := client.Config{}
config := &client.Config{}
_, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand All @@ -96,7 +96,7 @@ func TestClient_Ping(t *testing.T) {
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
config := &client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand All @@ -122,7 +122,7 @@ func TestClient_Query(t *testing.T) {
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
config := &client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestClient_BasicAuth(t *testing.T) {

u, _ := url.Parse(ts.URL)
u.User = url.UserPassword("username", "password")
config := client.Config{URL: *u, Username: "username", Password: "password"}
config := &client.Config{URL: *u, Username: "username", Password: "password"}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand All @@ -175,7 +175,7 @@ func TestClient_Write(t *testing.T) {
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
config := &client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestClient_UserAgent(t *testing.T) {

for _, test := range tests {
u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u, UserAgent: test.userAgent}
config := &client.Config{URL: *u, UserAgent: test.userAgent}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestClient_Timeout(t *testing.T) {
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u, Timeout: 500 * time.Millisecond}
config := &client.Config{URL: *u, Timeout: 500 * time.Millisecond}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand All @@ -521,7 +521,7 @@ func TestClient_Timeout(t *testing.T) {
t.Fatalf("unexpected error. expected 'use of closed network connection' error, got %v", err)
}

confignotimeout := client.Config{URL: *u}
confignotimeout := &client.Config{URL: *u}
cnotimeout, err := client.NewClient(confignotimeout)
_, err = cnotimeout.Query(query)
if err != nil {
Expand Down
Loading