Skip to content

Commit

Permalink
Merge pull request #98 from lbryio/takeovers
Browse files Browse the repository at this point in the history
-Added logic to sync claims that have passed their valid at height
  • Loading branch information
tiger5226 committed Feb 23, 2019
2 parents de3ae25 + d127196 commit b82bc8b
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 40 deletions.
120 changes: 85 additions & 35 deletions daemon/jobs/claimtriesync.go
@@ -1,6 +1,7 @@
package jobs

import (
"encoding/json"
"runtime"
"strconv"
"sync"
Expand All @@ -17,6 +18,7 @@ import (
)

const claimTrieSyncJob = "claimtriesyncjob"
const debugClaimTrieSync = true

var expirationHardForkHeight uint = 400155 // https://github.com/lbryio/lbrycrd/pull/137
var hardForkBlocksToExpiration uint = 2102400 // https://github.com/lbryio/lbrycrd/pull/137
Expand All @@ -25,6 +27,14 @@ var blocksToExpiration uint = 262974 //Hardcoded! https://lbry.io/faq/claimtrie-
// ClaimTrieSyncRunning is a variable used to show whether or not the job is running already.
var claimTrieSyncRunning = false

var lastSync *claimTrieSyncStatus

type claimTrieSyncStatus struct {
JobStatus *model.JobStatus `json:"-"`
PreviousSyncTime time.Time `json:"previous_sync"`
LastHeight int64 `json:"last_height"`
}

// ClaimTrieSync synchronizes claimtrie information that is calculated and enforced by lbrycrd.
func ClaimTrieSync() {
if !claimTrieSyncRunning {
Expand All @@ -36,28 +46,23 @@ func ClaimTrieSync() {

func claimTrieSync() {
//defer util.TimeTrack(time.Now(), "ClaimTrieSync", "always")
logrus.Debug("ClaimTrieSync: started... ")
printDebug("ClaimTrieSync: started... ")
if lastSync == nil {
lastSync = &claimTrieSyncStatus{}
}
jobStatus, err := getClaimTrieSyncJobStatus()
if err != nil {
logrus.Error(err)
}

printDebug("ClaimTrieSync: updating spent claims")
//For Updating claims that are spent ( no longer in claimtrie )
if err := updateSpentClaims(); err != nil {
logrus.Error("ClaimTrieSync:", err)
saveJobError(jobStatus, err)
}

started := time.Now()

updatedClaims, err := getUpdatedClaims(jobStatus)
if err != nil {
logrus.Error("ClaimTrieSync:", err)
saveJobError(jobStatus, err)
panic(err)
}
logrus.Debug("ClaimTrieSync: Claims to update " + strconv.Itoa(len(updatedClaims)))

printDebug("ClaimTrieSync: getting block height")
//Get blockheight for calculating expired status
count, err := lbrycrd.GetBlockCount()
if err != nil {
Expand All @@ -66,6 +71,18 @@ func claimTrieSync() {
}
blockHeight = *count

lastSync.PreviousSyncTime = jobStatus.LastSync
lastSync.LastHeight = int64(blockHeight)

printDebug("ClaimTrieSync: getting updated claims...")
updatedClaims, err := getUpdatedClaims(jobStatus)
if err != nil {
logrus.Error("ClaimTrieSync:", err)
saveJobError(jobStatus, err)
panic(err)
}
printDebug("ClaimTrieSync: Claims to update " + strconv.Itoa(len(updatedClaims)))

//For syncing the claims
if err := syncClaims(updatedClaims); err != nil {
logrus.Error("ClaimTrieSync:", err)
Expand All @@ -80,10 +97,15 @@ func claimTrieSync() {

jobStatus.LastSync = started
jobStatus.IsSuccess = true
bytes, err := json.Marshal(&lastSync)
if err != nil {
logrus.Error(err)
}
jobStatus.State.SetValid(bytes)
if err := jobStatus.UpdateG(boil.Infer()); err != nil {
logrus.Panic(err)
}
logrus.Debug("ClaimTrieSync: Processed " + strconv.Itoa(len(updatedClaims)) + " claims.")
printDebug("ClaimTrieSync: Processed " + strconv.Itoa(len(updatedClaims)) + " claims.")
claimTrieSyncRunning = false
}

Expand Down Expand Up @@ -118,16 +140,22 @@ func controllingProcessor(names <-chan string, wg *sync.WaitGroup) {
}

func setControllingClaimForNames(claims model.ClaimSlice) error {
logrus.Debug("ClaimTrieSync: controlling claim status update started... ")
printDebug("ClaimTrieSync: controlling claim status update started... ")
controlwg := sync.WaitGroup{}
names := make(map[string]string)
printDebug("ClaimTrieSync: Making name map...")
for _, claim := range claims {
names[claim.Name] = claim.Name
}
printDebug("ClaimTrieSync: Finished making name map...")
setControllingQueue := make(chan string, 100)
initControllingWorkers(runtime.NumCPU()-1, setControllingQueue, &controlwg)
for _, claim := range claims {
setControllingQueue <- claim.Name
for _, name := range names {
setControllingQueue <- name
}
close(setControllingQueue)
controlwg.Wait()
logrus.Debug("ClaimTrieSync: controlling claim status update complete... ")
printDebug("ClaimTrieSync: controlling claim status update complete... ")

return nil
}
Expand Down Expand Up @@ -165,13 +193,13 @@ func setBidStateOfClaimsForName(name string) {

func syncClaims(claims model.ClaimSlice) error {

logrus.Debug("ClaimTrieSync: claim update started... ")
printDebug("ClaimTrieSync: claim update started... ")
syncwg := sync.WaitGroup{}
processingQueue := make(chan lbrycrd.Claim, 100)
initSyncWorkers(runtime.NumCPU()-1, processingQueue, &syncwg)
for i, claim := range claims {
if i%1000 == 0 {
logrus.Debug("ClaimTrieSync: syncing ", i, " of ", len(claims), " queued - ", len(processingQueue))
printDebug("ClaimTrieSync: syncing ", i, " of ", len(claims), " queued - ", len(processingQueue))
}
claims, err := lbrycrd.GetClaimsForName(claim.Name)
if err != nil {
Expand All @@ -184,7 +212,7 @@ func syncClaims(claims model.ClaimSlice) error {
close(processingQueue)
syncwg.Wait()

logrus.Debug("ClaimTrieSync: claim update complete... ")
printDebug("ClaimTrieSync: claim update complete... ")

return nil
}
Expand All @@ -195,7 +223,7 @@ func syncClaim(claimJSON *lbrycrd.Claim) {
if claim == nil {
unknown, _ := model.AbnormalClaims(qm.Where(model.AbnormalClaimColumns.ClaimID+"=?", claimJSON.ClaimID)).OneG()
if unknown == nil {
logrus.Debug("ClaimTrieSync: Missing Claim ", claimJSON.ClaimID, " ", claimJSON.TxID, " ", claimJSON.N)
printDebug("ClaimTrieSync: Missing Claim ", claimJSON.ClaimID, " ", claimJSON.TxID, " ", claimJSON.N)
}
return
}
Expand All @@ -210,7 +238,7 @@ func syncClaim(claimJSON *lbrycrd.Claim) {
if hasChanges {
if err := datastore.PutClaim(claim); err != nil {
logrus.Error("ClaimTrieSync: unable to sync claim ", claim.ClaimID, ". JSON-", claimJSON)
logrus.Debug("Error: ", err)
printDebug("Error: ", err)
}
}
}
Expand Down Expand Up @@ -272,6 +300,7 @@ func getUpdatedClaims(jobStatus *model.JobStatus) (model.ClaimSlice, error) {
supportedIDCol := model.TableNames.Support + "." + model.SupportColumns.SupportedClaimID
supportModifiedCol := model.TableNames.Support + "." + model.SupportColumns.ModifiedAt
claimModifiedCol := model.TableNames.Claim + "." + model.ClaimColumns.ModifiedAt
claimValidAtHeight := model.TableNames.Claim + "." + model.ClaimColumns.ValidAtHeight
sqlFormat := "2006-01-02 15:04:05"
lastsync := jobStatus.LastSync.Format(sqlFormat)
lastSyncStr := "'" + lastsync + "'"
Expand All @@ -285,12 +314,14 @@ func getUpdatedClaims(jobStatus *model.JobStatus) (model.ClaimSlice, error) {
FROM ` + model.TableNames.Claim + `
LEFT JOIN ` + model.TableNames.Support + `
ON ( ` + supportedIDCol + ` = ` + claimIDCol + ` AND ` + supportModifiedCol + ` >= ` + lastSyncStr + ` )
WHERE ` + claimModifiedCol + ` >= ` + lastSyncStr + ` OR ` + supportedIDCol + ` IS NOT NULL
WHERE ` + claimModifiedCol + ` >= ` + lastSyncStr + `
OR ` + supportedIDCol + ` IS NOT NULL
OR ` + claimValidAtHeight + ` >= ?
GROUP BY ` + claimNameCol + `
)
`
logrus.Debug("Query: ", query)
return model.Claims(qm.SQL(query)).AllG()
printDebug(query)
return model.Claims(qm.SQL(query, lastSync.LastHeight)).AllG()

}

Expand All @@ -307,18 +338,21 @@ func getSpentClaimsToUpdate() (model.ClaimSlice, error) {
output := model.TableNames.Output
outputTxHash := output + "." + model.OutputColumns.TransactionHash
outputVout := output + "." + model.OutputColumns.Vout
outputClaimID := output + "." + model.OutputColumns.ClaimID
outputIsSpent := output + "." + model.OutputColumns.IsSpent
outputModifiedAt := output + "." + model.OutputColumns.ModifiedAt

clause := qm.SQL(`
SELECT `+claimClaimID+`,`+claimID+`
FROM `+claim+`
INNER JOIN `+output+`
ON `+outputTxHash+` = `+claimTxByHash+`
AND `+outputVout+` = `+claimVout+`
AND `+outputIsSpent+` = ?
WHERE `+claimBidState+` != ? `, 1, "Spent")

return model.Claims(clause).AllG()
query := `
SELECT ` + claimClaimID + `,` + claimID + `
FROM ` + output + `
INNER JOIN ` + claim + ` ON ` + claimID + ` = ` + outputClaimID + `
AND ` + claimTxByHash + ` = ` + outputTxHash + `
AND ` + claimVout + ` = ` + outputVout + `
WHERE ` + outputModifiedAt + ` > ?
AND ` + outputIsSpent + ` = ?
AND ` + claimBidState + ` != ?`
printDebug(query)
return model.Claims(qm.SQL(query, lastSync.PreviousSyncTime, 1, "Spent")).AllG()
}

func updateSpentClaims() error {
Expand All @@ -337,14 +371,22 @@ func updateSpentClaims() error {
}

func getClaimTrieSyncJobStatus() (*model.JobStatus, error) {
jobStatus, _ := model.FindJobStatusG(claimTrieSyncJob)
jobStatus, err := model.FindJobStatusG(claimTrieSyncJob)
if err != nil {
return nil, errors.Err(err)
}
if jobStatus == nil {
jobStatus = &model.JobStatus{JobName: claimTrieSyncJob, LastSync: time.Time{}}
if err := jobStatus.InsertG(boil.Infer()); err != nil {
logrus.Panic("Cannot Retrieve/Create JobStatus for " + claimTrieSyncJob)
}
}

err = json.Unmarshal(jobStatus.State.JSON, lastSync)
if err != nil {
return nil, errors.Err(err)
}

return jobStatus, nil
}

Expand All @@ -355,3 +397,11 @@ func saveJobError(jobStatus *model.JobStatus, error error) {
logrus.Error(errors.Prefix("Saving Job Error Message "+error.Error(), err))
}
}

func printDebug(args ...interface{}) {
if debugClaimTrieSync {
logrus.Info(args...)
} else {
logrus.Debug(args...)
}
}
24 changes: 21 additions & 3 deletions daemon/upgrademanager/upgrade.go
Expand Up @@ -13,9 +13,9 @@ import (
)

const (
appVersion = 12
apiVersion = 12
dataVersion = 12
appVersion = 13
apiVersion = 13
dataVersion = 13
)

// RunUpgradesForVersion - Migrations are for structure of the data. Upgrade Manager scripts are for the data itself.
Expand Down Expand Up @@ -50,6 +50,7 @@ func RunUpgradesForVersion() {
upgradeFrom9(appStatus.AppVersion)
upgradeFrom10(appStatus.AppVersion)
upgradeFrom11(appStatus.AppVersion)
upgradeFrom12(appStatus.AppVersion)
////Increment and save
//
logrus.Debug("Upgrading app status version to App-", appVersion, " Data-", dataVersion, " Api-", apiVersion)
Expand Down Expand Up @@ -188,3 +189,20 @@ func upgradeFrom11(version int) {
go reProcessAllClaims()
}
}

func upgradeFrom12(version int) {
if version < 13 {
jobStatus := model.TableNames.JobStatus
state := model.JobStatusColumns.State
jobName := model.JobStatusColumns.JobName
startingJSON := `{\"JobStatus\": null, \"previous_sync\": \"2019-02-13T16:35:14Z\", \"last_height\": 530000}`
_, err := boil.GetDB().Exec(`
UPDATE `+jobStatus+`
SET `+state+`= '`+startingJSON+`'
WHERE `+jobName+`= ?
`, "claimtriesyncjob")
if err != nil {
logrus.Error("Upgrading to version 13:", err)
}
}
}
5 changes: 5 additions & 0 deletions migration/012_store_last_height.sql
@@ -0,0 +1,5 @@
-- +migrate Up

-- +migrate StatementBegin
ALTER TABLE job_status ADD COLUMN state JSON;
-- +migrate StatementEnd
23 changes: 23 additions & 0 deletions migration/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b82bc8b

Please sign in to comment.