Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial PR for simple admin client api and small admin client app #118

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ RUN apk update && apk upgrade && \
ADD . /go/src/github.com/travisjeffery/jocko
WORKDIR /go/src/github.com/travisjeffery/jocko
RUN GOOS=linux GOARCH=amd64 make build
RUN GOOS=linux GOARCH=amd64 make build-kadm

FROM alpine:latest
COPY --from=build-base /go/src/github.com/travisjeffery/jocko/cmd/kadm/kadm /usr/local/bin/kadm
COPY --from=build-base /go/src/github.com/travisjeffery/jocko/cmd/jocko/jocko /usr/local/bin/jocko
EXPOSE 9092 9093 9094 9095
VOLUME "/tmp/jocko"
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ vet:
@go list ./... | grep -v vendor | xargs go vet

build: deps
@go build -o $(BUILD_PATH) cmd/jocko/main.go
@go build -o $(BUILD_PATH) cmd/jocko/*.go

build-kadm:
@go build -o cmd/kadm/kadm cmd/kadm/*.go

release:
@which goreleaser 2>/dev/null || go get -u github.com/goreleaser/goreleaser
Expand Down
113 changes: 113 additions & 0 deletions client/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package client

import (
"time"
"errors"
"fmt"

"github.com/travisjeffery/jocko/protocol"
)

//simple admin client (derived from KIP-117)
type AdminClient struct {
*Client
}

func NewAdminClient(bootBrokers []string, cfg *ClientConfig, state ...*ClusterState) (adminCli *AdminClient, err error) {
adminCli = &AdminClient{}
adminCli.Client, err = NewClient(bootBrokers,cfg,state...)
return adminCli, err
}

type Options struct {
Timeout time.Duration
ValidateOnly bool
}

var (
DefaultOptions = &Options{10*time.Second,false}
)


type TopicInfo struct {
Topic string
Partitions int32
ReplicationFactor int32
}

func (adm *AdminClient) CreateTopics(topics []*TopicInfo, opt *Options) (err error) {
fmt.Println("create topic")
var reqs []*protocol.CreateTopicRequest
for _, ti := range topics {
reqs = append(reqs, &protocol.CreateTopicRequest{
Topic: ti.Topic,
NumPartitions: ti.Partitions,
ReplicationFactor: int16(ti.ReplicationFactor),
ReplicaAssignment: nil,
Configs: nil,
})
}
resp, err := adm.ctrlConn().CreateTopics(&protocol.CreateTopicRequests{
Requests: reqs,
})
if err != nil {
return err
}
for _, topicErrCode := range resp.TopicErrorCodes {
if topicErrCode.ErrorCode != protocol.ErrNone.Code() &&
topicErrCode.ErrorCode != protocol.ErrRequestTimedOut.Code() {
err = protocol.Errs[topicErrCode.ErrorCode]
return err
}
}
return nil
}

func (adm *AdminClient) DeleteTopics(topics []string, opt *Options) (err error) {
resp, err := adm.ctrlConn().DeleteTopics(&protocol.DeleteTopicsRequest{
Topics: topics,
})
if err != nil { return err }
for _, topicErrCode := range resp.TopicErrorCodes {
if topicErrCode.ErrorCode != protocol.ErrNone.Code() &&
topicErrCode.ErrorCode != protocol.ErrRequestTimedOut.Code() {
err = protocol.Errs[topicErrCode.ErrorCode]
return err
}
}
return nil
}

type APIVersion struct {
APIKey int16 `json:"key"`
MinVersion int16 `json:"minVersion"`
MaxVersion int16 `json:"maxVersion"`
}

func (adm *AdminClient) APIVersions(nodes []string, opt *Options) (versionInfo map[string][]APIVersion, err error) {
if len(nodes) == 0 {
return nil, errors.New("Invalid broker nodes info")
}
versionInfo = make(map[string][]APIVersion)
for _, nname := range nodes {
nconn, err := adm.connect(nname)
if err != nil {
continue
}

resp, err := nconn.APIVersions(&protocol.APIVersionsRequest{
APIVersion: 0,
})

if err != nil {
fmt.Printf("failed to retrieve api version info %v\n",err)
continue
}
fmt.Printf("%s verinfo: %v error: %v\n",nname,resp.APIVersions,resp.ErrorCode)
for _, v := range resp.APIVersions {
versionInfo[nname] = append(versionInfo[nname], APIVersion{v.APIKey,v.MinVersion,v.MaxVersion})
}
}
return versionInfo, nil
}

Loading