Skip to content

Commit

Permalink
payments+switch: sender-side check to not pay same preimage twice
Browse files Browse the repository at this point in the history
Create a PaymentStatus enum encapsulating the cases: Grounded,
InFlight, and Completed.
Add a db bucket channeldb/payments mapping PaymentHash -> PaymentStatus.
Add abstract the functionality behind an interface.
Implement methods performing checks/state transitions, and unit test
the behavior.
Integrate checks/transitions into switch.
Add an integration test to test the fault tolerance of when failing in
any of the states outlined above.
Add migration to retroactively Complete all previous payments.
  • Loading branch information
Vadym Popov authored and vapopov committed Jun 8, 2018
1 parent 4bde4c1 commit 9b20c9f
Show file tree
Hide file tree
Showing 14 changed files with 774 additions and 105 deletions.
6 changes: 6 additions & 0 deletions channeldb/db.go
Expand Up @@ -47,6 +47,12 @@ var (
number: 1,
migration: migrateNodeAndEdgeUpdateIndex,
},
{
// The version with added payment statuses
// for each existing payment
number: 2,
migration: paymentStatusesMigration,
},
}

// Big endian is the preferred byte order, due to cursor scans over
Expand Down
55 changes: 55 additions & 0 deletions channeldb/db_test.go
Expand Up @@ -5,6 +5,8 @@ import (
"os"
"path/filepath"
"testing"

"github.com/go-errors/errors"
)

func TestOpenWithCreate(t *testing.T) {
Expand Down Expand Up @@ -33,3 +35,56 @@ func TestOpenWithCreate(t *testing.T) {
t.Fatalf("channeldb failed to create data directory")
}
}

// applyMigration is a helper test function that encapsulates the general steps
// which are needed to properly check the result of applying migration function.
func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB),
migrationFunc migration, shouldFail bool) {

cdb, cleanUp, err := makeTestDB()
defer cleanUp()
if err != nil {
t.Fatal(err)
}

// beforeMigration usually used for populating the database
// with test data.
beforeMigration(cdb)

// Create test meta info with zero database version and put it on disk.
// Than creating the version list pretending that new version was added.
meta := &Meta{DbVersionNumber: 0}
if err := cdb.PutMeta(meta); err != nil {
t.Fatalf("unable to store meta data: %v", err)
}

versions := []version{
{
number: 0,
migration: nil,
},
{
number: 1,
migration: migrationFunc,
},
}

defer func() {
if r := recover(); r != nil {
err = errors.New(r)
}

if err == nil && shouldFail {
t.Fatal("error wasn't received on migration stage")
} else if err != nil && !shouldFail {
t.Fatal("error was received on migration stage")
}

// afterMigration usually used for checking the database state and
// throwing the error if something went wrong.
afterMigration(cdb)
}()

// Sync with the latest version - applying migration function.
err = cdb.syncVersions(versions)
}
53 changes: 0 additions & 53 deletions channeldb/meta_test.go
Expand Up @@ -117,59 +117,6 @@ func TestGlobalVersionList(t *testing.T) {
}
}

// applyMigration is a helper test function that encapsulates the general steps
// which are needed to properly check the result of applying migration function.
func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB),
migrationFunc migration, shouldFail bool) {

cdb, cleanUp, err := makeTestDB()
defer cleanUp()
if err != nil {
t.Fatal(err)
}

// beforeMigration usually used for populating the database
// with test data.
beforeMigration(cdb)

// Create test meta info with zero database version and put it on disk.
// Than creating the version list pretending that new version was added.
meta := &Meta{DbVersionNumber: 0}
if err := cdb.PutMeta(meta); err != nil {
t.Fatalf("unable to store meta data: %v", err)
}

versions := []version{
{
number: 0,
migration: nil,
},
{
number: 1,
migration: migrationFunc,
},
}

defer func() {
if r := recover(); r != nil {
err = errors.New(r)
}

if err == nil && shouldFail {
t.Fatal("error wasn't received on migration stage")
} else if err != nil && !shouldFail {
t.Fatal("error was received on migration stage")
}

// afterMigration usually used for checking the database state and
// throwing the error if something went wrong.
afterMigration(cdb)
}()

// Sync with the latest version - applying migration function.
err = cdb.syncVersions(versions)
}

func TestMigrationWithPanic(t *testing.T) {
t.Parallel()

Expand Down
44 changes: 44 additions & 0 deletions channeldb/migrations.go
Expand Up @@ -2,6 +2,7 @@ package channeldb

import (
"bytes"
"crypto/sha256"
"fmt"

"github.com/coreos/bbolt"
Expand Down Expand Up @@ -112,3 +113,46 @@ func migrateNodeAndEdgeUpdateIndex(tx *bolt.Tx) error {

return nil
}

// paymentStatusesMigration is a database migration intended for adding payment
// statuses for each existing payment entity in bucket to be able control
// transitions of statuses and prevent cases such as double payment
func paymentStatusesMigration(tx *bolt.Tx) error {
// Get the bucket dedicated to storing payments
bucket := tx.Bucket(paymentBucket)
if bucket == nil {
return ErrNoPaymentsCreated
}

// Get the bucket dedicated to storing statuses of payments,
// where a key is payment hash, value is payment status
paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket)
if err != nil {
return err
}

log.Infof("Migration database adds to all existing payments " +
"statuses as Completed")

// For each payment in the bucket, fetch all data.
return bucket.ForEach(func(k, v []byte) error {
// ignores if it is sub-bucket
if v == nil {
return nil
}

r := bytes.NewReader(v)
payment, err := deserializeOutgoingPayment(r)
if err != nil {
return err
}

// calculate payment hash for current payment
paymentHash := sha256.Sum256(payment.PaymentPreimage[:])

// tries to update status for current payment to completed
// if it fails - migration abort transaction and return payment bucket
// to previous state
return paymentStatuses.Put(paymentHash[:], StatusCompleted.Bytes())
})
}
71 changes: 71 additions & 0 deletions channeldb/migrations_test.go
@@ -0,0 +1,71 @@
package channeldb

import (
"crypto/sha256"
"testing"
)

func TestPaymentStatusesMigration(t *testing.T) {
t.Parallel()

fakePayment := makeFakePayment()
paymentHash := sha256.Sum256(fakePayment.PaymentPreimage[:])

// Add fake payment to the test database and verifies that it was created
// and there is only one payment and its status is not "Completed".
beforeMigrationFunc := func(d *DB) {
if err := d.AddPayment(fakePayment); err != nil {
t.Fatalf("unable to add payment: %v", err)
}

payments, err := d.FetchAllPayments()
if err != nil {
t.Fatalf("unable to fetch payments: %v", err)
}

if len(payments) != 1 {
t.Fatalf("wrong qty of paymets: expected 1, got %v",
len(payments))
}

paymentStatus, err := d.FetchPaymentStatus(paymentHash)
if err != nil {
t.Fatalf("unable to fetch payment status: %v", err)
}

// we should receive default status if we have any in database
if paymentStatus != StatusGrounded {
t.Fatalf("wrong payment status: expected %v, got %v",
StatusGrounded.String(), paymentStatus.String())
}
}

// Verify that was created payment status "Completed" for our one fake
// payment.
afterMigrationFunc := func(d *DB) {
meta, err := d.FetchMeta(nil)
if err != nil {
t.Fatal(err)
}

if meta.DbVersionNumber != 1 {
t.Fatal("migration 'paymentStatusesMigration' wasn't applied")
}

paymentStatus, err := d.FetchPaymentStatus(paymentHash)
if err != nil {
t.Fatalf("unable to fetch payment status: %v", err)
}

if paymentStatus != StatusCompleted {
t.Fatalf("wrong payment status: expected %v, got %v",
StatusCompleted.String(), paymentStatus.String())
}
}

applyMigration(t,
beforeMigrationFunc,
afterMigrationFunc,
paymentStatusesMigration,
false)
}
96 changes: 96 additions & 0 deletions channeldb/payments.go
Expand Up @@ -3,6 +3,7 @@ package channeldb
import (
"bytes"
"encoding/binary"
"errors"
"io"

"github.com/coreos/bbolt"
Expand All @@ -17,8 +18,64 @@ var (
// which is a monotonically increasing uint64. BoltDB's sequence
// feature is used for generating monotonically increasing id.
paymentBucket = []byte("payments")

// paymentStatusBucket is the name of the bucket within the database that
// stores the status of a payment indexed by the payment's preimage.
paymentStatusBucket = []byte("payment-status")
)

// PaymentStatus represent current status of payment
type PaymentStatus byte

const (
// StatusGrounded is status where payment is initiated and received
// an intermittent failure
StatusGrounded PaymentStatus = 0

// StatusInFlight is status where payment is initiated, but a response
// has not been received
StatusInFlight PaymentStatus = 1

// StatusCompleted is status where payment is initiated and complete
// a payment successfully
StatusCompleted PaymentStatus = 2
)

// Bytes returns status as slice of bytes
func (ps PaymentStatus) Bytes() []byte {
return []byte{byte(ps)}
}

// FromBytes sets status from slice of bytes
func (ps *PaymentStatus) FromBytes(status []byte) error {
if len(status) != 1 {
return errors.New("payment status is empty")
}

switch PaymentStatus(status[0]) {
case StatusGrounded, StatusInFlight, StatusCompleted:
*ps = PaymentStatus(status[0])
default:
return errors.New("unknown payment status")
}

return nil
}

// String returns readable representation of payment status
func (ps PaymentStatus) String() string {
switch ps {
case StatusGrounded:
return "Grounded"
case StatusInFlight:
return "In Flight"
case StatusCompleted:
return "Completed"
default:
return "Unknown"
}
}

// OutgoingPayment represents a successful payment between the daemon and a
// remote node. Details such as the total fee paid, and the time of the payment
// are stored.
Expand Down Expand Up @@ -129,6 +186,45 @@ func (db *DB) DeleteAllPayments() error {
})
}

// UpdatePaymentStatus sets status for outgoing/finished payment to store status in
// local database.
func (db *DB) UpdatePaymentStatus(paymentHash [32]byte, status PaymentStatus) error {
return db.Batch(func(tx *bolt.Tx) error {
paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket)
if err != nil {
return err
}

return paymentStatuses.Put(paymentHash[:], status.Bytes())
})
}

// FetchPaymentStatus returns payment status for outgoing payment
// if status of the payment isn't found it set to default status "StatusGrounded".
func (db *DB) FetchPaymentStatus(paymentHash [32]byte) (PaymentStatus, error) {
// default status for all payments that wasn't recorded in database
paymentStatus := StatusGrounded

err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(paymentStatusBucket)
if bucket == nil {
return nil
}

paymentStatusBytes := bucket.Get(paymentHash[:])
if paymentStatusBytes == nil {
return nil
}

return paymentStatus.FromBytes(paymentStatusBytes)
})
if err != nil {
return StatusGrounded, err
}

return paymentStatus, nil
}

func serializeOutgoingPayment(w io.Writer, p *OutgoingPayment) error {
var scratch [8]byte

Expand Down

0 comments on commit 9b20c9f

Please sign in to comment.