Skip to content

Commit

Permalink
feat: support snapshot max count and size
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <poan.yang@suse.com>
  • Loading branch information
FrankYang0529 authored and David Ko committed Jan 3, 2024
1 parent 29ee44d commit cc796ff
Show file tree
Hide file tree
Showing 16 changed files with 492 additions and 68 deletions.
21 changes: 20 additions & 1 deletion app/cmd/controller.go
Expand Up @@ -83,6 +83,15 @@ func ControllerCmd() cli.Command {
Value: 5,
Usage: "HTTP client timeout for replica file sync server",
},
cli.IntFlag{
Name: "snapshot-max-count",
Value: 250,
Usage: "Maximum number of snapshots to keep",
},
cli.StringFlag{
Name: "snapshot-max-size",
Usage: "Maximum total snapshot size in bytes or human readable 42kb, 42mb, 42gb",
},
},
Action: func(c *cli.Context) {
if err := startController(c); err != nil {
Expand Down Expand Up @@ -138,6 +147,16 @@ func startController(c *cli.Context) error {
engineReplicaTimeout = controller.DetermineEngineReplicaTimeout(engineReplicaTimeout)
iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeout)

snapshotMaxCount := c.Int("snapshot-max-count")
snapshotMaxSize := int64(0)
snapshotMaxSizeString := c.String("snapshot-max-size")
if snapshotMaxSizeString != "" {
snapshotMaxSize, err = units.RAMInBytes(snapshotMaxSizeString)
if err != nil {
return err
}
}

factories := map[string]types.BackendFactory{}
for _, backend := range backends {
switch backend {
Expand All @@ -163,7 +182,7 @@ func startController(c *cli.Context) error {
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeout)
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter, salvageRequested,
unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeout, types.DataServerProtocol(dataServerProtocol),
fileSyncHTTPClientTimeout)
fileSyncHTTPClientTimeout, snapshotMaxCount, snapshotMaxSize)

// need to wait for Shutdown() completion
control.ShutdownWG.Add(1)
Expand Down
23 changes: 21 additions & 2 deletions app/cmd/replica.go
Expand Up @@ -24,7 +24,7 @@ import (
func ReplicaCmd() cli.Command {
return cli.Command{
Name: "replica",
UsageText: "longhorn controller DIRECTORY SIZE",
UsageText: "longhorn replica DIRECTORY",
Flags: []cli.Flag{
cli.StringFlag{
Name: "listen",
Expand Down Expand Up @@ -73,6 +73,15 @@ func ReplicaCmd() cli.Command {
Value: "",
Usage: "Name of the replica instance (for validation purposes)",
},
cli.IntFlag{
Name: "snapshot-max-count",
Value: 250,
Usage: "Maximum number of snapshots to keep",
},
cli.StringFlag{
Name: "snapshot-max-size",
Usage: "Maximum total snapshot size in bytes or human readable 42kb, 42mb, 42gb",
},
},
Action: func(c *cli.Context) {
if err := startReplica(c); err != nil {
Expand All @@ -96,7 +105,17 @@ func startReplica(c *cli.Context) error {
disableRevCounter := c.Bool("disableRevCounter")
unmapMarkDiskChainRemoved := c.Bool("unmap-mark-disk-chain-removed")

s := replica.NewServer(dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved)
snapshotMaxCount := c.Int("snapshot-max-count")
snapshotMaxSize := int64(0)
snapshotMaxSizeString := c.String("snapshot-max-size")
if snapshotMaxSizeString != "" {
snapshotMaxSize, err = units.RAMInBytes(snapshotMaxSizeString)
if err != nil {
return err
}
}

s := replica.NewServer(dir, backingFile, diskutil.ReplicaSectorSize, disableRevCounter, unmapMarkDiskChainRemoved, snapshotMaxCount, snapshotMaxSize)

address := c.String("listen")

Expand Down
12 changes: 10 additions & 2 deletions pkg/backend/file/file.go
Expand Up @@ -100,8 +100,8 @@ func (f *Wrapper) SectorSize() (int64, error) {
return 4096, nil
}

func (f *Wrapper) RemainSnapshots() (int, error) {
return 1, nil
func (f *Wrapper) GetSnapshotCountAndSizeUsage() (int, int64, error) {
return 1, 0, nil
}

func (f *Wrapper) GetRevisionCounter() (int64, error) {
Expand All @@ -124,6 +124,14 @@ func (f *Wrapper) ResetRebuild() error {
return nil
}

func (f *Wrapper) SetSnapshotMaxCount(count int) error {
return nil
}

func (f *Wrapper) SetSnapshotMaxSize(size int64) error {
return nil
}

func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
logrus.Infof("Creating file: %s", address)
file, err := os.OpenFile(address, os.O_RDWR|os.O_CREATE, 0600)
Expand Down
56 changes: 52 additions & 4 deletions pkg/backend/remote/remote.go
Expand Up @@ -211,16 +211,16 @@ func (r *Remote) SectorSize() (int64, error) {
return replicaInfo.SectorSize, nil
}

func (r *Remote) RemainSnapshots() (int, error) {
func (r *Remote) GetSnapshotCountAndSizeUsage() (int, int64, error) {
replicaInfo, err := r.info()
if err != nil {
return 0, err
return 0, 0, err
}
switch replicaInfo.State {
case "open", "dirty", "rebuilding":
return replicaInfo.RemainSnapshots, nil
return replicaInfo.SnapshotCountUsage, replicaInfo.SnapshotSizeUsage, nil
}
return 0, fmt.Errorf("invalid state %v for counting snapshots", replicaInfo.State)
return 0, 0, fmt.Errorf("invalid state %v for counting snapshots", replicaInfo.State)
}

func (r *Remote) GetRevisionCounter() (int64, error) {
Expand Down Expand Up @@ -310,6 +310,54 @@ func (r *Remote) ResetRebuild() error {
return nil
}

func (r *Remote) SetSnapshotMaxCount(count int) error {
logrus.Infof("Setting SnapshotMaxCount of %s to : %d", r.name, count)

conn, err := grpc.Dial(r.replicaServiceURL,
grpc.WithTransportCredentials(insecure.NewCredentials()),
ptypes.WithIdentityValidationClientInterceptor(r.volumeName, ""))
if err != nil {
return errors.Wrapf(err, "cannot connect to ReplicaService %s", r.replicaServiceURL)
}
defer conn.Close()
replicaServiceClient := ptypes.NewReplicaServiceClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), replicaClient.GRPCServiceCommonTimeout)
defer cancel()

if _, err := replicaServiceClient.SnapshotMaxCountSet(ctx, &ptypes.SnapshotMaxCountSetRequest{
Count: int32(count),
}); err != nil {
return errors.Wrapf(err, "failed to set SnapshotMaxCount to %d for replica %s from remote", count, r.replicaServiceURL)
}

return nil
}

func (r *Remote) SetSnapshotMaxSize(size int64) error {
logrus.Infof("Setting SnapshotMaxSize of %s to : %d", r.name, size)

conn, err := grpc.Dial(r.replicaServiceURL,
grpc.WithTransportCredentials(insecure.NewCredentials()),
ptypes.WithIdentityValidationClientInterceptor(r.volumeName, ""))
if err != nil {
return errors.Wrapf(err, "cannot connect to ReplicaService %s", r.replicaServiceURL)
}
defer conn.Close()
replicaServiceClient := ptypes.NewReplicaServiceClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), replicaClient.GRPCServiceCommonTimeout)
defer cancel()

if _, err := replicaServiceClient.SnapshotMaxSizeSet(ctx, &ptypes.SnapshotMaxSizeSetRequest{
Size: size,
}); err != nil {
return errors.Wrapf(err, "failed to set SnapshotMaxSize to %d for replica %s from remote", size, r.replicaServiceURL)
}

return nil
}

func (r *Remote) info() (*types.ReplicaInfo, error) {
conn, err := grpc.Dial(r.replicaServiceURL, grpc.WithTransportCredentials(insecure.NewCredentials()),
ptypes.WithIdentityValidationClientInterceptor(r.volumeName, ""))
Expand Down
30 changes: 30 additions & 0 deletions pkg/controller/client/controller_client.go
Expand Up @@ -83,6 +83,8 @@ func GetVolumeInfo(v *ptypes.Volume) *types.VolumeInfo {
LastExpansionError: v.LastExpansionError,
LastExpansionFailedAt: v.LastExpansionFailedAt,
UnmapMarkSnapChainRemoved: v.UnmapMarkSnapChainRemoved,
SnapshotMaxCount: int(v.SnapshotMaxCount),
SnapshotMaxSize: v.SnapshotMaxSize,
}
}

Expand Down Expand Up @@ -231,6 +233,34 @@ func (c *ControllerClient) VolumeUnmapMarkSnapChainRemovedSet(enabled bool) erro
return nil
}

func (c *ControllerClient) VolumeSnapshotMaxCountSet(count int) error {
controllerServiceClient := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout)
defer cancel()

if _, err := controllerServiceClient.VolumeSnapshotMaxCountSet(ctx, &ptypes.VolumeSnapshotMaxCountSetRequest{
Count: int32(count),
}); err != nil {
return errors.Wrapf(err, "failed to set SnapshotMaxCount to %d for volume %s", count, c.serviceURL)
}

return nil
}

func (c *ControllerClient) VolumeSnapshotMaxSizeSet(size int64) error {
controllerServiceClient := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout)
defer cancel()

if _, err := controllerServiceClient.VolumeSnapshotMaxSizeSet(ctx, &ptypes.VolumeSnapshotMaxSizeSetRequest{
Size: size,
}); err != nil {
return errors.Wrapf(err, "failed to set SnapshotMaxSize to %d for volume %s", size, c.serviceURL)
}

return nil
}

func (c *ControllerClient) ReplicaList() ([]*types.ControllerReplicaInfo, error) {
controllerServiceClient := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout)
Expand Down

0 comments on commit cc796ff

Please sign in to comment.