Skip to content

Commit

Permalink
Merge pull request #3975 from benbjohnson/copy-shard
Browse files Browse the repository at this point in the history
Copier service
  • Loading branch information
benbjohnson committed Sep 3, 2015
2 parents 1c29e59 + deff06f commit 0163945
Show file tree
Hide file tree
Showing 11 changed files with 580 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3916](https://github.com/influxdb/influxdb/pull/3916): New statistics and diagnostics support. Graphite first to be instrumented.
- [#3901](https://github.com/influxdb/influxdb/pull/3901): Add consistency level option to influx cli Thanks @takayuki
- [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service

### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.
Expand Down
11 changes: 11 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/influxdb/influxdb/services/admin"
"github.com/influxdb/influxdb/services/collectd"
"github.com/influxdb/influxdb/services/continuous_querier"
"github.com/influxdb/influxdb/services/copier"
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/hh"
"github.com/influxdb/influxdb/services/httpd"
Expand Down Expand Up @@ -57,6 +58,7 @@ type Server struct {
// These references are required for the tcp muxer.
ClusterService *cluster.Service
SnapshotterService *snapshotter.Service
CopierService *copier.Service

Monitor *monitor.Monitor

Expand Down Expand Up @@ -134,6 +136,7 @@ func NewServer(c *Config, version string) (*Server, error) {
s.appendClusterService(c.Cluster)
s.appendPrecreatorService(c.Precreator)
s.appendSnapshotterService()
s.appendCopierService()
s.appendAdminService(c.Admin)
s.appendContinuousQueryService(c.ContinuousQuery)
s.appendHTTPDService(c.HTTPD)
Expand Down Expand Up @@ -170,6 +173,13 @@ func (s *Server) appendSnapshotterService() {
s.SnapshotterService = srv
}

func (s *Server) appendCopierService() {
srv := copier.NewService()
srv.TSDBStore = s.TSDBStore
s.Services = append(s.Services, srv)
s.CopierService = srv
}

func (s *Server) appendRetentionPolicyService(c retention.Config) {
if !c.Enabled {
return
Expand Down Expand Up @@ -324,6 +334,7 @@ func (s *Server) Open() error {

s.ClusterService.Listener = mux.Listen(cluster.MuxHeader)
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
s.CopierService.Listener = mux.Listen(copier.MuxHeader)
go mux.Serve(ln)

// Open meta store.
Expand Down
57 changes: 57 additions & 0 deletions services/copier/internal/internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions services/copier/internal/internal.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package internal;

message Request {
required uint64 ShardID = 1;
}

message Response {
optional string Error = 1;
}
261 changes: 261 additions & 0 deletions services/copier/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package copier

import (
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/influxdb/influxdb/services/copier/internal"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/tsdb"
)

//go:generate protoc --gogo_out=. internal/internal.proto

// MuxHeader is the header byte used for the TCP muxer.
const MuxHeader = 6

// Service manages the listener for the endpoint.
type Service struct {
wg sync.WaitGroup
err chan error

TSDBStore interface {
Shard(id uint64) *tsdb.Shard
}

Listener net.Listener
Logger *log.Logger
}

// NewService returns a new instance of Service.
func NewService() *Service {
return &Service{
err: make(chan error),
Logger: log.New(os.Stderr, "[copier] ", log.LstdFlags),
}
}

// Open starts the service.
func (s *Service) Open() error {
s.Logger.Println("Starting copier service")

s.wg.Add(1)
go s.serve()
return nil
}

// Close implements the Service interface.
func (s *Service) Close() error {
if s.Listener != nil {
s.Listener.Close()
}
s.wg.Wait()
return nil
}

// SetLogger sets the internal logger to the logger passed in.
func (s *Service) SetLogger(l *log.Logger) {
s.Logger = l
}

// Err returns a channel for fatal out-of-band errors.
func (s *Service) Err() <-chan error { return s.err }

// serve serves shard copy requests from the listener.
func (s *Service) serve() {
defer s.wg.Done()

for {
// Wait for next connection.
conn, err := s.Listener.Accept()
if err != nil && strings.Contains(err.Error(), "connection closed") {
s.Logger.Println("copier listener closed")
return
} else if err != nil {
s.Logger.Println("error accepting copier request: ", err.Error())
continue
}

// Handle connection in separate goroutine.
s.wg.Add(1)
go func(conn net.Conn) {
defer s.wg.Done()
defer conn.Close()
if err := s.handleConn(conn); err != nil {
s.Logger.Println(err)
}
}(conn)
}
}

// handleConn processes conn. This is run in a separate goroutine.
func (s *Service) handleConn(conn net.Conn) error {
// Read request from connection.
req, err := s.readRequest(conn)
if err != nil {
return fmt.Errorf("read request: %s", err)
}

// Retrieve shard.
sh := s.TSDBStore.Shard(req.GetShardID())

// Return error response if the shard doesn't exist.
if sh == nil {
if err := s.writeResponse(conn, &internal.Response{
Error: proto.String(fmt.Sprintf("shard not found: id=%d", req.GetShardID())),
}); err != nil {
return fmt.Errorf("write error response: %s", err)
}
return nil
}

// Write successful response.
if err := s.writeResponse(conn, &internal.Response{}); err != nil {
return fmt.Errorf("write response: %s", err)
}

// Write shard to response.
if _, err := sh.WriteTo(conn); err != nil {
return fmt.Errorf("write shard: %s", err)
}

return nil
}

// readRequest reads and unmarshals a Request from r.
func (s *Service) readRequest(r io.Reader) (*internal.Request, error) {
// Read request length.
var n uint32
if err := binary.Read(r, binary.BigEndian, &n); err != nil {
return nil, fmt.Errorf("read request length: %s", err)
}

// Read body.
buf := make([]byte, n)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, fmt.Errorf("read request: %s", err)
}

// Unmarshal request.
req := &internal.Request{}
if err := proto.Unmarshal(buf, req); err != nil {
return nil, fmt.Errorf("unmarshal request: %s", err)
}

return req, nil
}

// writeResponse marshals and writes a Response to w.
func (s *Service) writeResponse(w io.Writer, resp *internal.Response) error {
// Marshal the response to a byte slice.
buf, err := proto.Marshal(resp)
if err != nil {
return fmt.Errorf("marshal error: %s", err)
}

// Write response length to writer.
if err := binary.Write(w, binary.BigEndian, uint32(len(buf))); err != nil {
return fmt.Errorf("write response length error: %s", err)
}

// Write body to writer.
if _, err := w.Write(buf); err != nil {
return fmt.Errorf("write body error: %s", err)
}

return nil
}

// Client represents a client for connecting remotely to a copier service.
type Client struct {
host string
}

// NewClient return a new instance of Client.
func NewClient(host string) *Client {
return &Client{
host: host,
}
}

// ShardReader returns a reader for streaming shard data.
// Returned ReadCloser must be closed by the caller.
func (c *Client) ShardReader(id uint64) (io.ReadCloser, error) {
// Connect to remote server.
conn, err := tcp.Dial("tcp", c.host, MuxHeader)
if err != nil {
return nil, err
}

// Send request to server.
if err := c.writeRequest(conn, &internal.Request{ShardID: proto.Uint64(id)}); err != nil {
return nil, fmt.Errorf("write request: %s", err)
}

// Read response from the server.
resp, err := c.readResponse(conn)
if err != nil {
return nil, fmt.Errorf("read response: %s", err)
}

// If there was an error then return it and close connection.
if resp.GetError() != "" {
conn.Close()
return nil, errors.New(resp.GetError())
}

// Returning remaining stream for caller to consume.
return conn, nil
}

// writeRequest marshals and writes req to w.
func (c *Client) writeRequest(w io.Writer, req *internal.Request) error {
// Marshal request.
buf, err := proto.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %s", err)
}

// Write request length.
if err := binary.Write(w, binary.BigEndian, uint32(len(buf))); err != nil {
return fmt.Errorf("write request length: %s", err)
}

// Send request to server.
if _, err := w.Write(buf); err != nil {
return fmt.Errorf("write request body: %s", err)
}

return nil
}

// readResponse reads and unmarshals a Response from r.
func (c *Client) readResponse(r io.Reader) (*internal.Response, error) {
// Read response length.
var n uint32
if err := binary.Read(r, binary.BigEndian, &n); err != nil {
return nil, fmt.Errorf("read response length: %s", err)
}

// Read response.
buf := make([]byte, n)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, fmt.Errorf("read response: %s", err)
}

// Unmarshal response.
resp := &internal.Response{}
if err := proto.Unmarshal(buf, resp); err != nil {
return nil, fmt.Errorf("unmarshal response: %s", err)
}

return resp, nil
}
Loading

0 comments on commit 0163945

Please sign in to comment.