Skip to content

Commit

Permalink
Merge pull request #3502 from influxdb/import
Browse files Browse the repository at this point in the history
Importer for 0.8.9 data via the CLI
  • Loading branch information
corylanou committed Aug 6, 2015
2 parents 872541b + 05fde32 commit 08f84a2
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 78 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [#3519](https://github.com/influxdb/influxdb/pull/3519): **--BREAKING CHANGE--** Update line protocol to require trailing i for field values that are integers
- [#3529](https://github.com/influxdb/influxdb/pull/3529): Add TLS support for OpenTSDB plugin. Thanks @nathanielc
- [#3421](https://github.com/influxdb/influxdb/issues/3421): Should update metastore and cluster if IP or hostname changes
- [#3502](https://github.com/influxdb/influxdb/pull/3502): Importer for 0.8.9 data via the CLI

### Bugfixes
- [#3405](https://github.com/influxdb/influxdb/pull/3405): Prevent database panic when fields are missing. Thanks @jhorwit2
Expand Down
8 changes: 4 additions & 4 deletions client/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func ExampleNewClient() {

// NOTE: this assumes you've setup a user and have setup shell env variables,
// namely INFLUX_USER/INFLUX_PWD. If not just ommit Username/Password below.
conf := client.Config{
conf := &client.Config{
URL: *host,
Username: os.Getenv("INFLUX_USER"),
Password: os.Getenv("INFLUX_PWD"),
Expand All @@ -37,7 +37,7 @@ func ExampleClient_Ping() {
if err != nil {
log.Fatal(err)
}
con, err := client.NewClient(client.Config{URL: *host})
con, err := client.NewClient(&client.Config{URL: *host})
if err != nil {
log.Fatal(err)
}
Expand All @@ -54,7 +54,7 @@ func ExampleClient_Query() {
if err != nil {
log.Fatal(err)
}
con, err := client.NewClient(client.Config{URL: *host})
con, err := client.NewClient(&client.Config{URL: *host})
if err != nil {
log.Fatal(err)
}
Expand All @@ -73,7 +73,7 @@ func ExampleClient_Write() {
if err != nil {
log.Fatal(err)
}
con, err := client.NewClient(client.Config{URL: *host})
con, err := client.NewClient(&client.Config{URL: *host})
if err != nil {
log.Fatal(err)
}
Expand Down
118 changes: 110 additions & 8 deletions client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,68 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
)

const (
// DefaultHost is the default host used to connect to an InfluxDB instance
DefaultHost = "localhost"

// DefaultPort is the default port used to connect to an InfluxDB instance
DefaultPort = 8086

// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
DefaultTimeout = 0
)

// Query is used to send a command to the server. Both Command and Database are required.
type Query struct {
Command string
Database string
}

// ParseConnectionString will parse a string to create a valid connection URL
func ParseConnectionString(path string, ssl bool) (url.URL, error) {
var host string
var port int

if strings.Contains(path, ":") {
h := strings.Split(path, ":")
i, e := strconv.Atoi(h[1])
if e != nil {
return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, e)
}
port = i
if h[0] == "" {
host = DefaultHost
} else {
host = h[0]
}
} else {
host = path
// If they didn't specify a port, always use the default port
port = DefaultPort
}

u := url.URL{
Scheme: "http",
}
if ssl {
u.Scheme = "https"
}
u.Host = net.JoinHostPort(host, strconv.Itoa(port))

return u, nil
}

// Config is used to specify what server to connect to.
// URL: The URL of the server connecting to.
// Username/Password are optional. They will be passed via basic auth if provided.
Expand All @@ -34,6 +82,13 @@ type Config struct {
Timeout time.Duration
}

// NewConfig will create a config to be used in connecting to the client
func NewConfig() *Config {
return &Config{
Timeout: DefaultTimeout,
}
}

// Client is used to make calls to the server.
type Client struct {
url url.URL
Expand All @@ -51,7 +106,7 @@ const (
)

// NewClient will instantiate and return a connected client to issue commands to the server.
func NewClient(c Config) (*Client, error) {
func NewClient(c *Config) (*Client, error) {
client := Client{
url: c.URL,
username: c.Username,
Expand Down Expand Up @@ -120,7 +175,8 @@ func (c *Client) Query(q Query) (*Response, error) {
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) Write(bp BatchPoints) (*Response, error) {
c.url.Path = "write"
u := c.url
u.Path = "write"

var b bytes.Buffer
for _, p := range bp.Points {
Expand All @@ -146,7 +202,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
}
}

req, err := http.NewRequest("POST", c.url.String(), &b)
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return nil, err
}
Expand All @@ -156,10 +212,10 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Add("db", bp.Database)
params.Add("rp", bp.RetentionPolicy)
params.Add("precision", bp.Precision)
params.Add("consistency", bp.WriteConsistency)
params.Set("db", bp.Database)
params.Set("rp", bp.RetentionPolicy)
params.Set("precision", bp.Precision)
params.Set("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode()

resp, err := c.httpClient.Do(req)
Expand All @@ -170,7 +226,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {

var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil && err.Error() != "EOF" {
if err != nil {
return nil, err
}

Expand All @@ -183,6 +239,52 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
return nil, nil
}

// WriteLineProtocol takes a string with line returns to delimit each write
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) {
u := c.url
u.Path = "write"

r := strings.NewReader(data)

req, err := http.NewRequest("POST", u.String(), r)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", database)
params.Set("rp", retentionPolicy)
params.Set("precision", precision)
params.Set("consistency", writeConsistency)
req.URL.RawQuery = params.Encode()

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
err := fmt.Errorf(string(body))
response.Err = err
return &response, err
}

return nil, nil
}

// Ping will check to see if the server is up
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *Client) Ping() (time.Duration, string, error) {
Expand Down
16 changes: 8 additions & 8 deletions client/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func BenchmarkUnmarshalJSON10Tags(b *testing.B) {
}

func TestNewClient(t *testing.T) {
config := client.Config{}
config := &client.Config{}
_, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand All @@ -96,7 +96,7 @@ func TestClient_Ping(t *testing.T) {
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
config := &client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand All @@ -122,7 +122,7 @@ func TestClient_Query(t *testing.T) {
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
config := &client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestClient_BasicAuth(t *testing.T) {

u, _ := url.Parse(ts.URL)
u.User = url.UserPassword("username", "password")
config := client.Config{URL: *u, Username: "username", Password: "password"}
config := &client.Config{URL: *u, Username: "username", Password: "password"}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand All @@ -175,7 +175,7 @@ func TestClient_Write(t *testing.T) {
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
config := &client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestClient_UserAgent(t *testing.T) {

for _, test := range tests {
u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u, UserAgent: test.userAgent}
config := &client.Config{URL: *u, UserAgent: test.userAgent}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestClient_Timeout(t *testing.T) {
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u, Timeout: 500 * time.Millisecond}
config := &client.Config{URL: *u, Timeout: 500 * time.Millisecond}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Expand All @@ -521,7 +521,7 @@ func TestClient_Timeout(t *testing.T) {
t.Fatalf("unexpected error. expected 'use of closed network connection' error, got %v", err)
}

confignotimeout := client.Config{URL: *u}
confignotimeout := &client.Config{URL: *u}
cnotimeout, err := client.NewClient(confignotimeout)
_, err = cnotimeout.Query(query)
if err != nil {
Expand Down
Loading

0 comments on commit 08f84a2

Please sign in to comment.