Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Enforce nanosecond precision on fields containing enqueue-times (#95)
Browse files Browse the repository at this point in the history
* Enforce nanosecond precision on fields containing enqueue-times

* tests for UpdateStoreExtentReplicaStats

* do not change created-time units
  • Loading branch information
Kiran RG committed Mar 15, 2017
1 parent a8c5a95 commit 9e38cca
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 27 deletions.
60 changes: 36 additions & 24 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -2448,6 +2448,18 @@ func timeToMilliseconds(i interface{}) int64 {
return t.UnixNano() / int64(time.Millisecond)
}

func timeToUnixNano(i interface{}) int64 {
// if the interface is nil, bail immediately
if i == nil {
return 0
}
t := i.(time.Time)
if t.IsZero() {
return 0
}
return t.UnixNano()
}

func cqlTimestampToUnixNano(milliseconds int64) common.UnixNanoTime {
return common.UnixNanoTime(milliseconds * 1000 * 1000) // Milliseconds are 10⁻³, nanoseconds are 10⁻⁹, (-3) - (-9) = 6, so multiply by 10⁶
}
Expand Down Expand Up @@ -2506,15 +2518,15 @@ func convertReplicaStatsMap(r map[string]interface{}) *shared.ExtentReplicaStats
AvailableSequence: common.Int64Ptr(toInt64(r[columnAvailableSequence])),
AvailableSequenceRate: common.Float64Ptr(toFloat64(r[columnAvailableSequenceRate])),
BeginAddress: common.Int64Ptr(toInt64(r[columnBeginAddress])),
BeginEnqueueTimeUtc: common.Int64Ptr(timeToMilliseconds(r[columnBeginEnqueueTime])),
BeginEnqueueTimeUtc: common.Int64Ptr(timeToUnixNano(r[columnBeginEnqueueTime])),
BeginSequence: common.Int64Ptr(toInt64(r[columnBeginSequence])),
BeginTime: common.Int64Ptr(timeToMilliseconds(r[columnBeginTime])),
BeginTime: common.Int64Ptr(timeToUnixNano(r[columnBeginTime])),
CreatedAt: common.Int64Ptr(timeToMilliseconds(r[columnCreatedTime])),
/* DestinationUUID is in the CQL but not the thrift */
EndTime: common.Int64Ptr(timeToMilliseconds(r[columnEndTime])),
EndTime: common.Int64Ptr(timeToUnixNano(r[columnEndTime])),
ExtentUUID: common.StringPtr(toUUIDString(r[columnExtentUUID])),
LastAddress: common.Int64Ptr(toInt64(r[columnLastAddress])),
LastEnqueueTimeUtc: common.Int64Ptr(timeToMilliseconds(r[columnLastEnqueueTime])),
LastEnqueueTimeUtc: common.Int64Ptr(timeToUnixNano(r[columnLastEnqueueTime])),
LastSequence: common.Int64Ptr(toInt64(r[columnLastSequence])),
LastSequenceRate: common.Float64Ptr(toFloat64(r[columnLastSequenceRate])),
SizeInBytes: common.Int64Ptr(toInt64(r[columnSizeInBytes])),
Expand All @@ -2540,14 +2552,14 @@ func makeReplicaStatsMap(rs []*shared.ExtentReplicaStats, destUUID string) map[s
columnBeginSequence: r.GetBeginSequence(),
columnLastSequence: r.GetLastSequence(),
columnLastSequenceRate: r.GetLastSequenceRate(),
columnBeginEnqueueTime: r.GetBeginEnqueueTimeUtc(),
columnLastEnqueueTime: r.GetLastEnqueueTimeUtc(),
columnBeginEnqueueTime: time.Unix(0, r.GetBeginEnqueueTimeUtc()),
columnLastEnqueueTime: time.Unix(0, r.GetLastEnqueueTimeUtc()),
columnSizeInBytes: r.GetSizeInBytes(),
columnSizeInBytesRate: r.GetSizeInBytesRate(),
columnStatus: r.GetStatus(),
columnBeginTime: r.GetBeginTime(),
columnBeginTime: time.Unix(0, r.GetBeginTime()),
columnCreatedTime: r.GetCreatedAt(),
columnEndTime: r.GetEndTime(),
columnEndTime: time.Unix(0, r.GetEndTime()),
columnStore: "ManyRocks", // FIXME: hardcoded for now
columnStoreVersion: "0.2", // FIXME: hardcoded for now
}
Expand Down Expand Up @@ -3099,12 +3111,12 @@ func (s *CassandraMetadataService) UpdateStoreExtentReplicaStats(ctx thrift.Cont
stats.GetLastAddress(),
stats.GetBeginSequence(),
stats.GetLastSequence(),
stats.GetBeginEnqueueTimeUtc(),
stats.GetLastEnqueueTimeUtc(),
time.Unix(0, stats.GetBeginEnqueueTimeUtc()),
time.Unix(0, stats.GetLastEnqueueTimeUtc()),
stats.GetSizeInBytes(),
stats.GetStatus(),
stats.GetBeginTime(),
stats.GetEndTime(),
time.Unix(0, stats.GetBeginTime()),
time.Unix(0, stats.GetEndTime()),
stats.GetAvailableSequence(),
stats.GetAvailableSequenceRate(),
stats.GetLastSequenceRate(),
Expand Down Expand Up @@ -3176,12 +3188,12 @@ func (s *CassandraMetadataService) UpdateExtentReplicaStats(ctx thrift.Context,
stats.GetLastAddress(),
stats.GetBeginSequence(),
stats.GetLastSequence(),
stats.GetBeginEnqueueTimeUtc(),
stats.GetLastEnqueueTimeUtc(),
time.Unix(0, stats.GetBeginEnqueueTimeUtc()),
time.Unix(0, stats.GetLastEnqueueTimeUtc()),
stats.GetSizeInBytes(),
stats.GetStatus(),
stats.GetBeginTime(),
stats.GetEndTime(),
time.Unix(0, stats.GetBeginTime()),
time.Unix(0, stats.GetEndTime()),
request.GetDestinationUUID(),
request.GetExtentUUID(),
)
Expand Down Expand Up @@ -3213,12 +3225,12 @@ func (s *CassandraMetadataService) UpdateExtentReplicaStats(ctx thrift.Context,
stats.GetLastAddress(),
stats.GetBeginSequence(),
stats.GetLastSequence(),
stats.GetBeginEnqueueTimeUtc(),
stats.GetLastEnqueueTimeUtc(),
time.Unix(0, stats.GetBeginEnqueueTimeUtc()),
time.Unix(0, stats.GetLastEnqueueTimeUtc()),
stats.GetSizeInBytes(),
stats.GetStatus(),
stats.GetBeginTime(),
stats.GetEndTime(),
time.Unix(0, stats.GetBeginTime()),
time.Unix(0, stats.GetEndTime()),
request.GetDestinationUUID(),
request.GetInputHostUUID(),
request.GetExtentUUID(),
Expand Down Expand Up @@ -3253,12 +3265,12 @@ func (s *CassandraMetadataService) UpdateExtentReplicaStats(ctx thrift.Context,
stats.GetLastAddress(),
stats.GetBeginSequence(),
stats.GetLastSequence(),
stats.GetBeginEnqueueTimeUtc(),
stats.GetLastEnqueueTimeUtc(),
time.Unix(0, stats.GetBeginEnqueueTimeUtc()),
time.Unix(0, stats.GetLastEnqueueTimeUtc()),
stats.GetSizeInBytes(),
stats.GetStatus(),
stats.GetBeginTime(),
stats.GetEndTime(),
time.Unix(0, stats.GetBeginTime()),
time.Unix(0, stats.GetEndTime()),
stats.GetAvailableSequence(),
stats.GetAvailableSequenceRate(),
stats.GetLastSequenceRate(),
Expand Down
136 changes: 133 additions & 3 deletions clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func (s *CassandraSuite) TestListDestinationsByUUID() {

// ListDestinationsByUUID
listDestinations := &shared.ListDestinationsByUUIDRequest{
Limit: common.Int64Ptr(testPageSize),
Limit: common.Int64Ptr(testPageSize),
ValidateAgainstPathTable: common.BoolPtr(true),
}
var result []*shared.DestinationDescription
Expand Down Expand Up @@ -607,9 +607,9 @@ func (s *CassandraSuite) TestListDestinationsByUUID() {

// list multi zone only
listDestinations = &shared.ListDestinationsByUUIDRequest{
MultiZoneOnly: common.BoolPtr(true),
MultiZoneOnly: common.BoolPtr(true),
ValidateAgainstPathTable: common.BoolPtr(true),
Limit: common.Int64Ptr(testPageSize),
Limit: common.Int64Ptr(testPageSize),
}
result = nil
for {
Expand Down Expand Up @@ -752,6 +752,98 @@ func (s *CassandraSuite) TestExtentCRU() {
}
}

func (s *CassandraSuite) TestUpdateStoreExtentReplicaStats() {
extentUUID := uuid.New()
destUUID := uuid.New()
storeIds := []string{uuid.New(), uuid.New(), uuid.New()}
extent := &shared.Extent{
ExtentUUID: common.StringPtr(extentUUID),
DestinationUUID: common.StringPtr(destUUID),
StoreUUIDs: storeIds,
InputHostUUID: common.StringPtr(uuid.New()),
}
createRequest := &shared.CreateExtentRequest{Extent: extent}
_, err := s.client.CreateExtent(nil, createRequest)
s.Nil(err)

beginEnqTime := time.Now().Add(-time.Hour)
lastEnqTime := time.Now().Add(-time.Second)

beginTime := time.Now().Add(-time.Minute)
endTime := time.Now().Add(time.Millisecond)

var beginSeq, lastSeq, availSeq int64 = 0x123, 0x456789, 0x345678
var lastSeqRate, availSeqRate float64 = 23.45, 67.89
var beginAddr, lastAddr int64 = 0x123456789ABCDE, 0xABCDEF012345678
var sizeInBytes int64 = 0x456123789
var sizeInBytesRate float64 = 987.67

stats1 := &shared.ExtentReplicaStats{
ExtentUUID: common.StringPtr(extentUUID),
StoreUUID: common.StringPtr(storeIds[1]),
BeginAddress: common.Int64Ptr(beginAddr),
LastAddress: common.Int64Ptr(lastAddr),
BeginSequence: common.Int64Ptr(beginSeq),
LastSequence: common.Int64Ptr(lastSeq),
LastSequenceRate: common.Float64Ptr(lastSeqRate),
AvailableSequence: common.Int64Ptr(availSeq),
AvailableSequenceRate: common.Float64Ptr(availSeqRate),
BeginEnqueueTimeUtc: common.Int64Ptr(beginEnqTime.UnixNano()),
LastEnqueueTimeUtc: common.Int64Ptr(lastEnqTime.UnixNano()),
SizeInBytes: common.Int64Ptr(sizeInBytes),
SizeInBytesRate: common.Float64Ptr(sizeInBytesRate),
Status: common.MetadataExtentReplicaStatusPtr(shared.ExtentReplicaStatus_SEALED),
BeginTime: common.Int64Ptr(beginTime.UnixNano()),
EndTime: common.Int64Ptr(endTime.UnixNano()),
}

// update replica stats
updateRequest := &m.UpdateStoreExtentReplicaStatsRequest{
StoreUUID: common.StringPtr(storeIds[1]),
ExtentUUID: common.StringPtr(extentUUID),
ReplicaStats: []*shared.ExtentReplicaStats{stats1},
}
err = s.client.UpdateStoreExtentReplicaStats(nil, updateRequest)
s.Nil(err)

readReq := &m.ReadStoreExtentReplicaStatsRequest{
StoreUUID: common.StringPtr(storeIds[1]),
ExtentUUID: common.StringPtr(extent.GetExtentUUID()),
}

result, err := s.client.ReadStoreExtentReplicaStats(nil, readReq)
s.Nil(err, "Reading store extent stats failed")
s.NotNil(result, "ReadStoreExtentReplicaStats returned nil")

stats := result.GetExtent().GetReplicaStats()

s.Equal(stats1.GetExtentUUID(), stats[0].GetExtentUUID())
s.Equal(stats1.GetStoreUUID(), stats[0].GetStoreUUID())
s.Equal(stats1.GetBeginAddress(), stats[0].GetBeginAddress())
s.Equal(stats1.GetLastAddress(), stats[0].GetLastAddress())
s.Equal(stats1.GetBeginSequence(), stats[0].GetBeginSequence())
s.Equal(stats1.GetLastSequence(), stats[0].GetLastSequence())
s.Equal(stats1.GetLastSequenceRate(), stats[0].GetLastSequenceRate())
s.Equal(stats1.GetAvailableSequence(), stats[0].GetAvailableSequence())
s.Equal(stats1.GetAvailableSequenceRate(), stats[0].GetAvailableSequenceRate())
s.Equal(stats1.GetSizeInBytes(), stats[0].GetSizeInBytes())
s.Equal(stats1.GetSizeInBytesRate(), stats[0].GetSizeInBytesRate())
s.Equal(stats1.GetStatus(), stats[0].GetStatus())

// after gocql marshals and unmarshals the timestamps, the values are off by a little, but not much!
timeDifference := func(t0, t1 int64) time.Duration {
if t0 > t1 {
return time.Unix(0, t0).Sub(time.Unix(0, t1))
} else {
return time.Unix(0, t1).Sub(time.Unix(0, t0))
}
}
s.True(timeDifference(stats1.GetBeginEnqueueTimeUtc(), stats[0].GetBeginEnqueueTimeUtc()) < time.Millisecond)
s.True(timeDifference(stats1.GetLastEnqueueTimeUtc(), stats[0].GetLastEnqueueTimeUtc()) < time.Millisecond)
s.True(timeDifference(stats1.GetBeginTime(), stats[0].GetBeginTime()) < time.Millisecond)
s.True(timeDifference(stats1.GetEndTime(), stats[0].GetEndTime()) < time.Millisecond)
}

func (s *CassandraSuite) TestReplicationStatus() {
extentUUID := uuid.New()
destUUID := uuid.New()
Expand Down Expand Up @@ -790,6 +882,44 @@ func (s *CassandraSuite) TestReplicationStatus() {
s.Equal(0, len(readResult.GetExtentStatsList()))
}

func (s *CassandraSuite) TestStoreExtentTimestamps() {
extentUUID := uuid.New()
destUUID := uuid.New()
storeIds := []string{uuid.New(), uuid.New(), uuid.New()}
extent := &shared.Extent{
ExtentUUID: common.StringPtr(extentUUID),
DestinationUUID: common.StringPtr(destUUID),
StoreUUIDs: storeIds,
InputHostUUID: common.StringPtr(uuid.New()),
}
createRequest := &shared.CreateExtentRequest{Extent: extent}
_, err := s.client.CreateExtent(nil, createRequest)
s.Nil(err)

// read store extent with filtering
readRequest := &m.ListStoreExtentsStatsRequest{
StoreUUID: common.StringPtr(storeIds[0]),
ReplicationStatus: common.InternalExtentReplicaReplicationStatusTypePtr(shared.ExtentReplicaReplicationStatus_INVALID),
}
readResult, err := s.client.ListStoreExtentsStats(nil, readRequest)
s.Nil(err)
s.Equal(1, len(readResult.GetExtentStatsList()))

// update replication status
updateRequest := &m.UpdateStoreExtentReplicaStatsRequest{
StoreUUID: common.StringPtr(storeIds[0]),
ExtentUUID: common.StringPtr(extentUUID),
ReplicationStatus: common.InternalExtentReplicaReplicationStatusTypePtr(shared.ExtentReplicaReplicationStatus_DONE),
}
err = s.client.UpdateStoreExtentReplicaStats(nil, updateRequest)
s.Nil(err)

// read again with filtering, expect no result is returned
readResult, err = s.client.ListStoreExtentsStats(nil, readRequest)
s.Nil(err)
s.Equal(0, len(readResult.GetExtentStatsList()))
}

func (s *CassandraSuite) TestReplicationStatus_Remote() {
extentUUID := uuid.New()
destUUID := uuid.New()
Expand Down

0 comments on commit 9e38cca

Please sign in to comment.