Skip to content

Commit

Permalink
Add support for minio deploy.
Browse files Browse the repository at this point in the history
This adds support for Minio and all other S3 compatible servers.
This patch also uses `minio-go`. This has an added benefit i.e
this can be used S3 as well transparently.
  • Loading branch information
harshavardhana committed Jan 27, 2017
1 parent a7fbfba commit ee99982
Show file tree
Hide file tree
Showing 51 changed files with 9,720 additions and 8 deletions.
12 changes: 10 additions & 2 deletions src/server/pfs/server/obj_block_api_server.go
Expand Up @@ -9,8 +9,8 @@ import (
"io/ioutil"
"time"

"go.pedge.io/lion/proto"
"go.pedge.io/proto/rpclog"
protolion "go.pedge.io/lion"
protorpclog "go.pedge.io/proto/rpclog"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -73,6 +73,14 @@ func newObjBlockAPIServer(dir string, cacheBytes int64, objClient obj.Client) (*
}, nil
}

func newMinioBlockAPIServer(dir string, cacheBytes int64) (*objBlockAPIServer, error) {
objClient, err := obj.NewMinioClientFromSecret("")
if err != nil {
return nil, err
}
return newObjBlockAPIServer(dir, cacheBytes, objClient)
}

func newAmazonBlockAPIServer(dir string, cacheBytes int64) (*objBlockAPIServer, error) {
objClient, err := obj.NewAmazonClientFromSecret("")
if err != nil {
Expand Down
17 changes: 14 additions & 3 deletions src/server/pfs/server/server.go
Expand Up @@ -13,8 +13,9 @@ var (
MaxMsgSize = 3 * blockSize
)

// Valid backends
// Valid object storage backends
const (
MinioBackendEnvVar = "MINIO"
AmazonBackendEnvVar = "AMAZON"
GoogleBackendEnvVar = "GOOGLE"
MicrosoftBackendEnvVar = "MICROSOFT"
Expand All @@ -40,10 +41,20 @@ func NewObjBlockAPIServer(dir string, cacheBytes int64, objClient obj.Client) (p
return newObjBlockAPIServer(dir, cacheBytes, objClient)
}

// NewBlockAPIServer creates a BlockAPIServer using the credentials it finds in
// the environment
// NewBlockAPIServer creates a BlockAPIServer using the credentials
// it finds in the environment
func NewBlockAPIServer(dir string, cacheBytes int64, backend string) (pfsclient.BlockAPIServer, error) {
switch backend {
case MinioBackendEnvVar:
// S3 compatible doesn't like leading slashes
if len(dir) > 0 && dir[0] == '/' {
dir = dir[1:]
}
blockAPIServer, err := newMinioBlockAPIServer(dir, cacheBytes)
if err != nil {
return nil, err
}
return blockAPIServer, nil
case AmazonBackendEnvVar:
// amazon doesn't like leading slashes
if len(dir) > 0 && dir[0] == '/' {
Expand Down
69 changes: 66 additions & 3 deletions src/server/pkg/deploy/assets/assets.go
Expand Up @@ -31,6 +31,7 @@ var (
rethinkHeadlessName = "rethink-headless" // headless service; give Rethink pods consistent DNS addresses
rethinkVolumeName = "rethink-volume"
rethinkVolumeClaimName = "rethink-volume-claim"
minioSecretName = "minio-secret"
amazonSecretName = "amazon-secret"
googleSecretName = "google-secret"
microsoftSecretName = "microsoft-secret"
Expand All @@ -51,6 +52,7 @@ const (
amazonBackend
googleBackend
microsoftBackend
minioBackend
)

// ServiceAccount returns a kubernetes service account for use with Pachyderm.
Expand Down Expand Up @@ -107,6 +109,23 @@ func PachdRc(shards uint64, backend backend, hostPath string, logLevel string, v
volumes[0].HostPath = &api.HostPathVolumeSource{
Path: filepath.Join(hostPath, "pachd"),
}
case minioBackend:
backendEnvVar = server.MinioBackendEnvVar
volumes[0].HostPath = &api.HostPathVolumeSource{
Path: filepath.Join(hostPath, "pachd"),
}
volumes = append(volumes, api.Volume{
Name: minioSecretName,
VolumeSource: api.VolumeSource{
Secret: &api.SecretVolumeSource{
SecretName: minioSecretName,
},
},
})
volumeMounts = append(volumeMounts, api.VolumeMount{
Name: minioSecretName,
MountPath: "/" + minioSecretName,
})
case amazonBackend:
backendEnvVar = server.AmazonBackendEnvVar
volumes = append(volumes, api.Volume{
Expand Down Expand Up @@ -662,6 +681,36 @@ func InitJob(version string) *extensions.Job {
}
}

// MinioSecret creates an amazon secret with the following parameters:
// bucket - S3 bucket name
// id - S3 access key id
// secret - S3 secret access key
// endpoint - S3 compatible endpoint
// secure - set to true for a secure connection.
func MinioSecret(bucket string, id string, secret string, endpoint string, secure bool) *api.Secret {
secureV := "0"
if secure {
secureV = "1"
}
return &api.Secret{
TypeMeta: unversioned.TypeMeta{
Kind: "Secret",
APIVersion: "v1",
},
ObjectMeta: api.ObjectMeta{
Name: minioSecretName,
Labels: labels(minioSecretName),
},
Data: map[string][]byte{
"bucket": []byte(bucket),
"id": []byte(id),
"secret": []byte(secret),
"endpoint": []byte(endpoint),
"secure": []byte(secureV),
},
}
}

// AmazonSecret creates an amazon secret with the following parameters:
// bucket - S3 bucket name
// id - AWS access key id
Expand Down Expand Up @@ -730,7 +779,7 @@ func MicrosoftSecret(container string, id string, secret string) *api.Secret {
// WriteRethinkVolumes creates 'shards' persistent volumes, either backed by IAAS persistent volumes (EBS volumes for amazon, GCP volumes for Google, etc)
// or local volumes (if 'backend' == 'local'). All volumes are created with size 'size'.
func WriteRethinkVolumes(w io.Writer, backend backend, shards int, hostPath string, names []string, size int) error {
if backend != localBackend && len(names) < shards {
if backend != localBackend && backend != minioBackend && len(names) < shards {
return fmt.Errorf("could not create non-local rethink cluster with %d shards, as there are only %d external volumes", shards, len(names))
}
encoder := codec.NewEncoder(w, jsonEncoderHandle)
Expand Down Expand Up @@ -779,6 +828,8 @@ func WriteRethinkVolumes(w io.Writer, backend backend, shards int, hostPath stri
DataDiskURI: dataDiskURI,
},
}
case minioBackend:
fallthrough
case localBackend:
spec.Spec.PersistentVolumeSource = api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Expand Down Expand Up @@ -860,12 +911,12 @@ func WriteAssets(w io.Writer, opts *AssetOpts, backend backend,
fmt.Fprintf(w, "\n")
RethinkHeadlessService().CodecEncodeSelf(encoder)
} else {
if backend != localBackend && len(volumeNames) != 1 {
if backend != localBackend && backend != minioBackend && len(volumeNames) != 1 {
return fmt.Errorf("RethinkDB can only be managed by a ReplicationController as a single instance, but recieved %d volumes", len(volumeNames))
}
RethinkVolumeClaim(volumeSize).CodecEncodeSelf(encoder)
volumeName := ""
if backend != localBackend {
if backend != localBackend && backend != minioBackend {
volumeName = volumeNames[0]
}
RethinkRc(volumeName, opts.RethinkdbCacheSize).CodecEncodeSelf(encoder)
Expand All @@ -887,6 +938,18 @@ func WriteLocalAssets(w io.Writer, opts *AssetOpts, hostPath string) error {
return WriteAssets(w, opts, localBackend, nil, 1 /* = volume size (gb) */, hostPath)
}

// WriteMinioAssets writes assets to an s3 backend.
func WriteMinioAssets(w io.Writer, opts *AssetOpts, hostPath string, bucket string, id string,
secret string, endpoint string, secure bool) error {
if err := WriteAssets(w, opts, minioBackend, nil, 1 /* = volume size (gb) */, hostPath); err != nil {
return err
}
encoder := codec.NewEncoder(w, jsonEncoderHandle)
MinioSecret(bucket, id, secret, endpoint, secure).CodecEncodeSelf(encoder)
fmt.Fprintf(w, "\n")
return nil
}

// WriteAmazonAssets writes assets to an amazon backend.
func WriteAmazonAssets(w io.Writer, opts *AssetOpts, bucket string, id string, secret string,
token string, region string, volumeNames []string, volumeSize int) error {
Expand Down
25 changes: 25 additions & 0 deletions src/server/pkg/deploy/cmds/cmds.go
Expand Up @@ -17,6 +17,7 @@ import (
_metrics "github.com/pachyderm/pachyderm/src/server/pkg/metrics"

"github.com/spf13/cobra"
"go.pedge.io/pkg/cobra"
)

func maybeKcCreate(dryRun bool, manifest *bytes.Buffer) error {
Expand All @@ -40,6 +41,7 @@ func DeployCmd(noMetrics *bool) *cobra.Command {
var hostPath string
var dev bool
var dryRun bool
var secure bool
var deployRethinkAsRc bool
var deployRethinkAsStatefulSet bool
var rethinkdbCacheSize string
Expand Down Expand Up @@ -94,6 +96,28 @@ func DeployCmd(noMetrics *bool) *cobra.Command {
}),
}

deployMinio := &cobra.Command{
Use: "minio [-s] <S3 bucket> <id> <secret> <endpoint>",
Short: "Deploy a Pachyderm cluster running locally.",
Long: "Deploy a Pachyderm cluster running locally. Arguments are:\n" +
" <Minio bucket>: A Minio bucket where Pachyderm will store PFS data.\n" +
" <id>, <secret>, <endpoint>: Access credentials.\n" +
" <secure>: Secure connection.\n",
Run: pkgcobra.RunBoundedArgs(pkgcobra.Bounds{Min: 4, Max: 4}, func(args []string) (retErr error) {
if metrics && !dev {
metricsFn := _metrics.ReportAndFlushUserAction("Deploy")
defer func(start time.Time) { metricsFn(start, retErr) }(time.Now())
}
manifest := &bytes.Buffer{}
err := assets.WriteMinioAssets(manifest, opts, hostPath, args[0], args[1], args[2], args[3], secure)
if err != nil {
return err
}
return maybeKcCreate(dryRun, manifest)
}),
}
deployMinio.Flags().BoolVarP(&secure, "secure", "s", false, "Enable secure access to Minio server.")

deployAmazon := &cobra.Command{
Use: "amazon <S3 bucket> <id> <secret> <token> <region> <EBS volume names> <size of volumes (in GB)>",
Short: "Deploy a Pachyderm cluster running on AWS.",
Expand Down Expand Up @@ -206,6 +230,7 @@ func DeployCmd(noMetrics *bool) *cobra.Command {
deploy.AddCommand(deployAmazon)
deploy.AddCommand(deployGoogle)
deploy.AddCommand(deployMicrosoft)
deploy.AddCommand(deployMinio)
return deploy
}

Expand Down
107 changes: 107 additions & 0 deletions src/server/pkg/obj/minio_client.go
@@ -0,0 +1,107 @@
package obj

import (
"io"

minio "github.com/minio/minio-go"
)

// Represents minio client instance for any s3 compatible server.
type minioClient struct {
*minio.Client
bucket string
}

func newMinioClient(endpoint, bucket, id, secret string, secure bool) (*minioClient, error) {
mclient, err := minio.New(endpoint, id, secret, secure)
if err != nil {
return nil, err
}
return &minioClient{
bucket: bucket,
Client: mclient,
}, nil
}

func (c *minioClient) Writer(name string) (io.WriteCloser, error) {
reader, writer := io.Pipe()
go func(reader *io.PipeReader) {
_, err := c.PutObject(c.bucket, name, reader, "application/octet-stream")
if err != nil {
reader.CloseWithError(err)
return
}
}(reader)
return writer, nil
}

func (c *minioClient) Walk(name string, fn func(name string) error) error {
recursive := true // Recursively walk by default.

doneCh := make(chan struct{})
defer close(doneCh)
for objInfo := range c.ListObjectsV2(c.bucket, name, recursive, doneCh) {
if objInfo.Err != nil {
return objInfo.Err
}
if err := fn(objInfo.Key); err != nil {
return err
}
}
return nil
}

// sectionReadCloser implements a closer compatible wrapper
// over *io.SectionReader.
type sectionReadCloser struct {
*io.SectionReader
mObj *minio.Object
}

func (s *sectionReadCloser) Close() (err error) {
return s.mObj.Close()
}

func (c *minioClient) Reader(name string, offset uint64, size uint64) (io.ReadCloser, error) {
obj, err := c.GetObject(c.bucket, name)
if err != nil {
return nil, err
}
sectionReader := io.NewSectionReader(obj, int64(offset), int64(size))
return &sectionReadCloser{
SectionReader: sectionReader,
mObj: obj,
}, nil
}

func (c *minioClient) Delete(name string) error {
return c.RemoveObject(c.bucket, name)
}

func (c *minioClient) Exists(name string) bool {
_, err := c.StatObject(c.bucket, name)
return err == nil
}

func (c *minioClient) IsRetryable(err error) bool {
// Minio client already implements retrying, no
// need for a caller retry.
return false
}

func (c *minioClient) IsIgnorable(err error) bool {
return false
}

// Sentinel error response returned if err is not
// of type *minio.ErrorResponse.
var sentinelErrResp = minio.ErrorResponse{}

func (c *minioClient) IsNotExist(err error) bool {
errResp := minio.ToErrorResponse(err)
if errResp == sentinelErrResp {
return false
}
// Treat both object not found and bucket not found as IsNotExist().
return errResp.Code == "NoSuchKey" || errResp.Code == "NoSuchBucket"
}

0 comments on commit ee99982

Please sign in to comment.