Skip to content

Commit

Permalink
Changes to make the cluster sub package golint-able
Browse files Browse the repository at this point in the history
Issue: Enable golint on the code base influxdata#4098
  • Loading branch information
Mint committed Sep 29, 2015
1 parent b079d20 commit 3cbc193
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 23 deletions.
7 changes: 5 additions & 2 deletions cluster/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
// ConsistencyLevelOne requires at least one data node acknowledged a write
ConsistencyLevelOne

// ConsistencyLevelOne requires a quorum of data nodes to acknowledge a write
// ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write
ConsistencyLevelQuorum

// ConsistencyLevelAll requires all data nodes to acknowledge a write
Expand All @@ -63,6 +63,7 @@ var (
ErrInvalidConsistencyLevel = errors.New("invalid consistency level")
)

// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const
func ParseConsistencyLevel(level string) (ConsistencyLevel, error) {
switch strings.ToLower(level) {
case "any":
Expand Down Expand Up @@ -144,6 +145,7 @@ func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
s.Shards[shardInfo.ID] = shardInfo
}

// Open opens the communication channel with the point writer
func (w *PointsWriter) Open() error {
w.mu.Lock()
defer w.mu.Unlock()
Expand All @@ -153,6 +155,7 @@ func (w *PointsWriter) Open() error {
return nil
}

// Close closes the communication channel with the point writer
func (w *PointsWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down Expand Up @@ -327,7 +330,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
continue
}

wrote += 1
wrote++

// We wrote the required consistency level
if wrote >= required {
Expand Down
71 changes: 56 additions & 15 deletions cluster/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,22 @@ type MapShardRequest struct {
pb internal.MapShardRequest
}

func (m *MapShardRequest) ShardID() uint64 { return m.pb.GetShardID() }
func (m *MapShardRequest) Query() string { return m.pb.GetQuery() }
// ShardID of the storage
func (m *MapShardRequest) ShardID() uint64 { return m.pb.GetShardID() }

// Query returns the protocol buffers' query string
func (m *MapShardRequest) Query() string { return m.pb.GetQuery() }

// ChunkSize - proto buff's chunk size
func (m *MapShardRequest) ChunkSize() int32 { return m.pb.GetChunkSize() }

func (m *MapShardRequest) SetShardID(id uint64) { m.pb.ShardID = &id }
func (m *MapShardRequest) SetQuery(query string) { m.pb.Query = &query }
// SetShardID sets the shard id
func (m *MapShardRequest) SetShardID(id uint64) { m.pb.ShardID = &id }

// SetQuery sets the proto buffs query
func (m *MapShardRequest) SetQuery(query string) { m.pb.Query = &query }

// SetChunkSize sets the proto buffs chunk size
func (m *MapShardRequest) SetChunkSize(chunkSize int32) { m.pb.ChunkSize = &chunkSize }

// MarshalBinary encodes the object to a binary format.
Expand All @@ -42,24 +52,43 @@ type MapShardResponse struct {
pb internal.MapShardResponse
}

// NewMapShardResponse returns the response returned from a remote MapShardRequest call
func NewMapShardResponse(code int, message string) *MapShardResponse {
m := &MapShardResponse{}
m.SetCode(code)
m.SetMessage(message)
return m
}

func (r *MapShardResponse) Code() int { return int(r.pb.GetCode()) }
func (r *MapShardResponse) Message() string { return r.pb.GetMessage() }
// Code returns the proto buff code
func (r *MapShardResponse) Code() int { return int(r.pb.GetCode()) }

// Message returns the proto buff Message
func (r *MapShardResponse) Message() string { return r.pb.GetMessage() }

// TagSets returns the proto buff tag sets
func (r *MapShardResponse) TagSets() []string { return r.pb.GetTagSets() }
func (r *MapShardResponse) Fields() []string { return r.pb.GetFields() }
func (r *MapShardResponse) Data() []byte { return r.pb.GetData() }

func (r *MapShardResponse) SetCode(code int) { r.pb.Code = proto.Int32(int32(code)) }
func (r *MapShardResponse) SetMessage(message string) { r.pb.Message = &message }
// Fields returns the proto buff Fields
func (r *MapShardResponse) Fields() []string { return r.pb.GetFields() }

// Data returns the proto buff Data
func (r *MapShardResponse) Data() []byte { return r.pb.GetData() }

// SetCode sets the proto buff code
func (r *MapShardResponse) SetCode(code int) { r.pb.Code = proto.Int32(int32(code)) }

// SetMessage sets the proto buff message
func (r *MapShardResponse) SetMessage(message string) { r.pb.Message = &message }

// SetTagSets sets the proto buff tagsets
func (r *MapShardResponse) SetTagSets(tagsets []string) { r.pb.TagSets = tagsets }
func (r *MapShardResponse) SetFields(fields []string) { r.pb.Fields = fields }
func (r *MapShardResponse) SetData(data []byte) { r.pb.Data = data }

// SetFields sets the proto buff Fields
func (r *MapShardResponse) SetFields(fields []string) { r.pb.Fields = fields }

// SetData sets the proto buff Data
func (r *MapShardResponse) SetData(data []byte) { r.pb.Data = data }

// MarshalBinary encodes the object to a binary format.
func (r *MapShardResponse) MarshalBinary() ([]byte, error) {
Expand Down Expand Up @@ -99,17 +128,23 @@ type WriteShardResponse struct {
pb internal.WriteShardResponse
}

// SetShardID sets the ShardID
func (w *WriteShardRequest) SetShardID(id uint64) { w.pb.ShardID = &id }
func (w *WriteShardRequest) ShardID() uint64 { return w.pb.GetShardID() }

// ShardID gets the ShardID
func (w *WriteShardRequest) ShardID() uint64 { return w.pb.GetShardID() }

// Points returns the time series Points
func (w *WriteShardRequest) Points() []models.Point { return w.unmarshalPoints() }

// AddPoint adds a new time series point
func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
w.AddPoints([]models.Point{models.NewPoint(
name, tags, map[string]interface{}{"value": value}, timestamp,
)})
}

// AddPoints adds a new time series point
func (w *WriteShardRequest) AddPoints(points []models.Point) {
for _, p := range points {
w.pb.Points = append(w.pb.Points, []byte(p.String()))
Expand Down Expand Up @@ -144,10 +179,16 @@ func (w *WriteShardRequest) unmarshalPoints() []models.Point {
return points
}

func (w *WriteShardResponse) SetCode(code int) { w.pb.Code = proto.Int32(int32(code)) }
// SetCode sets the Code
func (w *WriteShardResponse) SetCode(code int) { w.pb.Code = proto.Int32(int32(code)) }

// SetMessage sets the Message
func (w *WriteShardResponse) SetMessage(message string) { w.pb.Message = &message }

func (w *WriteShardResponse) Code() int { return int(w.pb.GetCode()) }
// Code returns the Code
func (w *WriteShardResponse) Code() int { return int(w.pb.GetCode()) }

// Message returns the Message
func (w *WriteShardResponse) Message() string { return w.pb.GetMessage() }

// MarshalBinary encodes the object to a binary format.
Expand Down
2 changes: 2 additions & 0 deletions cluster/shard_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,12 @@ func (r *RemoteMapper) Open() (err error) {
return nil
}

// TagSets returns the TagSets
func (r *RemoteMapper) TagSets() []string {
return r.tagsets
}

// Fields returns RemoteMapper's Fields
func (r *RemoteMapper) Fields() []string {
return r.fields
}
Expand Down
14 changes: 8 additions & 6 deletions cluster/shard_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewShardWriter(timeout time.Duration) *ShardWriter {
}
}

// WriteShard writes time series points to a shard
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {
c, err := w.dial(ownerID)
if err != nil {
Expand Down Expand Up @@ -88,22 +89,23 @@ func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point)
return nil
}

func (c *ShardWriter) dial(nodeID uint64) (net.Conn, error) {
func (w *ShardWriter) dial(nodeID uint64) (net.Conn, error) {
// If we don't have a connection pool for that addr yet, create one
_, ok := c.pool.getPool(nodeID)
_, ok := w.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: c.pool, timeout: c.timeout}
factory.metaStore = c.MetaStore
factory := &connFactory{nodeID: nodeID, clientPool: w.pool, timeout: w.timeout}
factory.metaStore = w.MetaStore

p, err := pool.NewChannelPool(1, 3, factory.dial)
if err != nil {
return nil, err
}
c.pool.setPool(nodeID, p)
w.pool.setPool(nodeID, p)
}
return c.pool.conn(nodeID)
return w.pool.conn(nodeID)
}

// Close closes ShardWriter's pool
func (w *ShardWriter) Close() error {
if w.pool == nil {
return fmt.Errorf("client already closed")
Expand Down

0 comments on commit 3cbc193

Please sign in to comment.