Skip to content

Commit

Permalink
kvscheduler: keep node in the graph during recreate (#1423)
Browse files Browse the repository at this point in the history
This patch fixes issues reported in #1418.
During recreate (Delete+Create) the node would be completely
removed from the graph by Delete and all the associated flags
(metadata) were therefore lost.
When the subsequent Create fails, the scheduler needs to access
the flags, which in this case are undefined, causing the scheduler
to panic by dereferencing nil pointer.
This patch ensures that the node and its flags are preserved
during re-creation.

An in-progress refactor of the scheduling algorithm
will approach value recreation in a much cleaner
way...

Signed-off-by: Milan Lenco <milan.lenco@pantheon.tech>
  • Loading branch information
Milan Lenčo authored and ondrej-fabry committed Jul 29, 2019
1 parent c55eb6e commit 431fc32
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 34 deletions.
2 changes: 1 addition & 1 deletion plugins/kvscheduler/api/txn_record.go
Expand Up @@ -244,7 +244,7 @@ func (op *RecordedTxnOp) StringWithOpts(index int, verbose bool, indent int) str
flags = append(flags, "WAS-UNIMPLEMENTED")
}
// -> REMOVED / MISSING
if op.PrevState == ValueState_REMOVED && !op.IsRecreate {
if op.PrevState == ValueState_REMOVED && op.Operation == TxnOperation_DELETE {
flags = append(flags, "ALREADY-REMOVED")
}
if op.PrevState == ValueState_MISSING {
Expand Down
160 changes: 160 additions & 0 deletions plugins/kvscheduler/datachange_test.go
Expand Up @@ -2040,3 +2040,163 @@ func TestFailedDeleteOfDerivedValue(t *testing.T) {
err = scheduler.Close()
Expect(err).To(BeNil())
}

func TestFailedRecreateOfDerivedValue(t *testing.T) {
RegisterTestingT(t)

// prepare KV Scheduler
scheduler := NewPlugin(UseDeps(func(deps *Deps) {
deps.HTTPHandlers = nil
}))
err := scheduler.Init()
Expect(err).To(BeNil())

// prepare mocks
mockSB := test.NewMockSouthbound()
// descriptor:
descriptor := test.NewMockDescriptor(&KVDescriptor{
Name: descriptor1Name,
NBKeyPrefix: prefixA,
KeySelector: prefixSelector(prefixA),
ValueTypeName: proto.MessageName(test.NewArrayValue()),
DerivedValues: test.ArrayValueDerBuilder,
WithMetadata: true,
UpdateWithRecreate: func(key string, oldValue, newValue proto.Message, metadata Metadata) bool {
return key == prefixA+baseValue1+"/item1"
},
}, mockSB, 0)
scheduler.RegisterKVDescriptor(descriptor)

// run non-resync transaction against empty SB
arrayVal1 := test.NewArrayValueWithSuffix("-v1", "item1")
schedulerTxn := scheduler.StartNBTransaction()
schedulerTxn.SetValue(prefixA+baseValue1, arrayVal1)
seqNum, err := schedulerTxn.Commit(testCtx)
Expect(seqNum).To(BeEquivalentTo(0))
Expect(err).ShouldNot(HaveOccurred())

// check the state of SB
Expect(mockSB.GetKeysWithInvalidData()).To(BeEmpty())
// -> base value 1
value := mockSB.GetValue(prefixA + baseValue1)
Expect(value).ToNot(BeNil())
Expect(proto.Equal(value.Value, arrayVal1)).To(BeTrue())
Expect(value.Metadata).ToNot(BeNil())
Expect(value.Metadata.(test.MetaWithInteger).GetInteger()).To(BeEquivalentTo(0))
Expect(value.Origin).To(BeEquivalentTo(FromNB))
// -> item1 derived from base value 1
value = mockSB.GetValue(prefixA + baseValue1 + "/item1")
Expect(value).ToNot(BeNil())
Expect(proto.Equal(value.Value, test.NewStringValue("item1-v1"))).To(BeTrue())
Expect(value.Metadata).To(BeNil())
Expect(value.Origin).To(BeEquivalentTo(FromNB))

// plan error before 2nd txn
failedCreateClb := func() {
mockSB.SetValue(prefixA+baseValue1, test.NewArrayValue(),
&test.OnlyInteger{Integer: 0}, FromNB, false)
}
mockSB.PlanError(prefixA+baseValue1+"/item1", nil, nil) // Delete
mockSB.PlanError(prefixA+baseValue1+"/item1", errors.New("failed to create value"), failedCreateClb) // (Re)Create

// run 2nd non-resync transaction that will have errors
startTime := time.Now()
schedulerTxn2 := scheduler.StartNBTransaction()
arrayVal2 := test.NewArrayValueWithSuffix("-v2", "item1")
schedulerTxn2.SetValue(prefixA+baseValue1, arrayVal2)
seqNum, err = schedulerTxn2.Commit(testCtx)
stopTime := time.Now()
Expect(seqNum).To(BeEquivalentTo(1))
Expect(err).ToNot(BeNil())
txnErr := err.(*TransactionError)
Expect(txnErr.GetTxnInitError()).ShouldNot(HaveOccurred())
kvErrors := txnErr.GetKVErrors()
Expect(kvErrors).To(HaveLen(1))
Expect(kvErrors[0].Key).To(BeEquivalentTo(prefixA + baseValue1 + "/item1"))
Expect(kvErrors[0].TxnOperation).To(BeEquivalentTo(TxnOperation_CREATE))
Expect(kvErrors[0].Error.Error()).To(BeEquivalentTo("failed to create value"))

// check transaction operations
txnHistory := scheduler.GetTransactionHistory(time.Time{}, time.Now())
Expect(txnHistory).To(HaveLen(2))
txn := txnHistory[1]
Expect(txn.PreRecord).To(BeFalse())
Expect(txn.Start.After(startTime)).To(BeTrue())
Expect(txn.Start.Before(txn.Stop)).To(BeTrue())
Expect(txn.Stop.Before(stopTime)).To(BeTrue())
Expect(txn.SeqNum).To(BeEquivalentTo(1))
Expect(txn.TxnType).To(BeEquivalentTo(NBTransaction))
Expect(txn.ResyncType).To(BeEquivalentTo(NotResync))
Expect(txn.Description).To(BeEmpty())
checkRecordedValues(txn.Values, []RecordedKVPair{
{Key: prefixA + baseValue1, Value: utils.RecordProtoMessage(arrayVal2), Origin: FromNB},
})

// -> planned
txnOps := RecordedTxnOps{
{
Operation: TxnOperation_UPDATE,
Key: prefixA + baseValue1,
PrevValue: utils.RecordProtoMessage(arrayVal1),
NewValue: utils.RecordProtoMessage(arrayVal2),
PrevState: ValueState_CONFIGURED,
NewState: ValueState_CONFIGURED,
},
{
Operation: TxnOperation_DELETE,
Key: prefixA + baseValue1 + "/item1",
IsDerived: true,
PrevValue: utils.RecordProtoMessage(test.NewStringValue("item1-v1")),
PrevState: ValueState_CONFIGURED,
NewState: ValueState_REMOVED,
IsRecreate: true,
},
{
Operation: TxnOperation_CREATE,
Key: prefixA + baseValue1 + "/item1",
IsDerived: true,
NewValue: utils.RecordProtoMessage(test.NewStringValue("item1-v2")),
PrevState: ValueState_REMOVED,
NewState: ValueState_CONFIGURED,
IsRecreate: true,
},
}
checkTxnOperations(txn.Planned, txnOps)

// -> executed
txnOps = RecordedTxnOps{
{
Operation: TxnOperation_UPDATE,
Key: prefixA + baseValue1,
PrevValue: utils.RecordProtoMessage(arrayVal1),
NewValue: utils.RecordProtoMessage(arrayVal2),
PrevState: ValueState_CONFIGURED,
NewState: ValueState_CONFIGURED,
},
{
Operation: TxnOperation_DELETE,
Key: prefixA + baseValue1 + "/item1",
IsDerived: true,
PrevValue: utils.RecordProtoMessage(test.NewStringValue("item1-v1")),
PrevState: ValueState_CONFIGURED,
NewState: ValueState_REMOVED,
IsRecreate: true,
},
{
Operation: TxnOperation_CREATE,
Key: prefixA + baseValue1 + "/item1",
IsDerived: true,
NewValue: utils.RecordProtoMessage(test.NewStringValue("item1-v2")),
PrevState: ValueState_REMOVED,
NewState: ValueState_FAILED,
NewErr: errors.New("failed to create value"),
IsRecreate: true,
},
}
checkTxnOperations(txn.Executed, txnOps)

// close scheduler
err = scheduler.Close()
Expect(err).To(BeNil())
}

47 changes: 29 additions & 18 deletions plugins/kvscheduler/internal/test/model/values.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions plugins/kvscheduler/internal/test/model/values.proto
Expand Up @@ -4,6 +4,7 @@ package model;

message ArrayValue {
repeated string items = 1;
string item_suffix = 2;
}

message StringValue {
Expand Down
16 changes: 9 additions & 7 deletions plugins/kvscheduler/internal/test/southbound.go
Expand Up @@ -210,15 +210,17 @@ func (ms *MockSouthbound) executeChange(descriptor string, opType MockOpType, ke
}
err := plannedErrors[0].err
clb := plannedErrors[0].afterErrClb
operation.Err = err
ms.opHistory = append(ms.opHistory, operation)
ms.Unlock()
if err != nil {
operation.Err = err
ms.opHistory = append(ms.opHistory, operation)
ms.Unlock()

if clb != nil {
clb()
}
if clb != nil {
clb()
}

return err
return err
}
}

// the simulated operation has succeeded
Expand Down
8 changes: 7 additions & 1 deletion plugins/kvscheduler/internal/test/values.go
Expand Up @@ -33,6 +33,12 @@ func NewArrayValue(items ...string) proto.Message {
return &ArrayValue{Items: items}
}

// NewArrayValue creates a new instance of ArrayValue with a suffix
// appended into each item value (but not key).
func NewArrayValueWithSuffix(suffix string, items ...string) proto.Message {
return &ArrayValue{Items: items, ItemSuffix: suffix}
}

// StringValueComparator is (a custom) KVDescriptor.ValueComparator for string values.
func StringValueComparator(key string, v1, v2 proto.Message) bool {
sv1, isStringVal1 := v1.(*StringValue)
Expand All @@ -52,7 +58,7 @@ func ArrayValueDerBuilder(key string, value proto.Message) []KeyValuePair {
for _, item := range arrayVal.Items {
derivedVals = append(derivedVals, KeyValuePair{
Key: key + "/" + item,
Value: NewStringValue(item),
Value: NewStringValue(item + arrayVal.ItemSuffix),
})
}
}
Expand Down
14 changes: 8 additions & 6 deletions plugins/kvscheduler/txn_exec.go
Expand Up @@ -235,7 +235,7 @@ func (s *Scheduler) applyValue(args *applyValueArgs) (executed kvs.RecordedTxnOp
// run selected operation
switch txnOp.Operation {
case kvs.TxnOperation_DELETE:
executed, err = s.applyDelete(node, txnOp, args, args.isDepUpdate)
executed, err = s.applyDelete(node, txnOp, args, args.isDepUpdate, false)
case kvs.TxnOperation_CREATE:
executed, err = s.applyCreate(node, txnOp, args)
case kvs.TxnOperation_UPDATE:
Expand All @@ -255,7 +255,9 @@ func (s *Scheduler) applyValue(args *applyValueArgs) (executed kvs.RecordedTxnOp
}

// applyDelete removes value.
func (s *Scheduler) applyDelete(node graph.NodeRW, txnOp *kvs.RecordedTxnOp, args *applyValueArgs, pending bool) (executed kvs.RecordedTxnOps, err error) {
func (s *Scheduler) applyDelete(node graph.NodeRW, txnOp *kvs.RecordedTxnOp, args *applyValueArgs,
pending, recreate bool) (executed kvs.RecordedTxnOps, err error) {

if s.logGraphWalk {
endLog := s.logNodeVisit("applyDelete", args)
defer endLog()
Expand Down Expand Up @@ -289,7 +291,7 @@ func (s *Scheduler) applyDelete(node graph.NodeRW, txnOp *kvs.RecordedTxnOp, arg
} else {
// removed by request
txnOp.NewState = kvs.ValueState_REMOVED
if args.isDerived {
if args.isDerived && !recreate {
args.graphW.DeleteNode(args.kv.key)
} else {
s.updateNodeState(node, txnOp.NewState, args)
Expand Down Expand Up @@ -517,7 +519,7 @@ func (s *Scheduler) applyUpdate(node graph.NodeRW, txnOp *kvs.RecordedTxnOp, arg
descriptor := s.registry.GetDescriptorForKey(args.kv.key)
handler := newDescriptorHandler(descriptor)
if !args.dryRun && args.kv.origin == kvs.FromNB {
err = handler.validate(node.GetKey(), node.GetValue())
err = handler.validate(node.GetKey(), args.kv.value)
if err != nil {
node.SetValue(args.kv.value) // save the invalid value
node.SetFlags(&UnavailValueFlag{})
Expand Down Expand Up @@ -551,7 +553,7 @@ func (s *Scheduler) applyUpdate(node graph.NodeRW, txnOp *kvs.RecordedTxnOp, arg
delOp := s.preRecordTxnOp(args, node)
delOp.Operation = kvs.TxnOperation_DELETE
delOp.NewValue = nil
delExec, inheritedErr := s.applyDelete(node, delOp, args, false)
delExec, inheritedErr := s.applyDelete(node, delOp, args, false, true)
executed = append(executed, delExec...)
if inheritedErr != nil {
err = inheritedErr
Expand Down Expand Up @@ -584,7 +586,7 @@ func (s *Scheduler) applyUpdate(node graph.NodeRW, txnOp *kvs.RecordedTxnOp, arg
// if the new dependencies are not satisfied => delete and set as pending with the new value
if !equivalent && !isNodeReady(node) {
node.SetValue(prevValue) // apply delete on the original value
delExec, inheritedErr := s.applyDelete(node, txnOp, args, true)
delExec, inheritedErr := s.applyDelete(node, txnOp, args, true, false)
executed = append(executed, delExec...)
if inheritedErr != nil {
err = inheritedErr
Expand Down

0 comments on commit 431fc32

Please sign in to comment.