Skip to content

Commit

Permalink
Wrap checkpoints; add -start and -end flags
Browse files Browse the repository at this point in the history
Checkpoints should only be resumed if the parameters of the command are the same; otherwise some providers return errors when trying to get "next page" using different parameters (Google Photos).

Also add -start and -end flags for get-all (and -end for get-latest) so that you can customize the date range of items to get, either by duration (relative) or date (absolute). This is useful, for example, when you want to only download items that are at least 10 days old (`-end "-240h"`).
  • Loading branch information
mholt committed Dec 3, 2020
1 parent 2da15ad commit a8c45a8
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 12 deletions.
7 changes: 7 additions & 0 deletions account.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Account struct {

t *Timeline
ds DataSource
cp *checkpointWrapper
}

// NewHTTPClient returns an HTTP client that is suitable for use
Expand Down Expand Up @@ -153,6 +154,12 @@ func (t *Timeline) getAccount(dsID, userID string) (Account, error) {
if err != nil {
return acc, fmt.Errorf("querying account %s/%s from DB: %v", dsID, userID, err)
}
if acc.checkpoint != nil {
err = UnmarshalGob(acc.checkpoint, &acc.cp)
if err != nil {
return acc, fmt.Errorf("decoding checkpoint wrapper: %v", err)
}
}
return acc, nil
}

Expand Down
67 changes: 63 additions & 4 deletions cmd/timeliner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

// plug in data sources
_ "github.com/mholt/timeliner/datasources/facebook"
_ "github.com/mholt/timeliner/datasources/gmail"
_ "github.com/mholt/timeliner/datasources/googlelocation"
_ "github.com/mholt/timeliner/datasources/googlephotos"
_ "github.com/mholt/timeliner/datasources/instagram"
Expand All @@ -34,6 +35,9 @@ func init() {
flag.BoolVar(&integrity, "integrity", integrity, "Perform integrity check on existing items and reprocess if needed (download-all or import only)")
flag.BoolVar(&reprocess, "reprocess", reprocess, "Reprocess every item that has not been modified locally (download-all or import only)")

flag.StringVar(&tfStartInput, "start", "", "Timeframe start (relative=duration, absolute=YYYY/MM/DD)")
flag.StringVar(&tfEndInput, "end", "", "Timeframe end (relative=duration, absolute=YYYY/MM/DD)")

flag.BoolVar(&twitterRetweets, "twitter-retweets", twitterRetweets, "Twitter: include retweets")
flag.BoolVar(&twitterReplies, "twitter-replies", twitterReplies, "Twitter: include replies that are not just replies to self")

Expand Down Expand Up @@ -129,8 +133,13 @@ func main() {

switch subcmd {
case "get-latest":
if reprocess || prune || integrity {
log.Fatalf("[FATAL] The get-latest subcommand does not support -reprocess, -prune, or -integrity")
if reprocess || prune || integrity || tfStartInput != "" {
log.Fatalf("[FATAL] The get-latest subcommand does not support -reprocess, -prune, -integrity, or -start")
}

_, tfEnd, err := parseTimeframe()
if err != nil {
log.Fatalf("[FATAL] %v", err)
}

var wg sync.WaitGroup
Expand All @@ -143,7 +152,7 @@ func main() {
if retryNum > 0 {
log.Println("[INFO] Retrying command")
}
err := wc.GetLatest(ctx)
err := wc.GetLatest(ctx, tfEnd)
if err != nil {
log.Printf("[ERROR][%s/%s] Getting latest: %v",
wc.DataSourceID(), wc.UserID(), err)
Expand All @@ -160,6 +169,11 @@ func main() {
wg.Wait()

case "get-all":
tfStart, tfEnd, err := parseTimeframe()
if err != nil {
log.Fatalf("[FATAL] %v", err)
}

var wg sync.WaitGroup
for _, wc := range clients {
wg.Add(1)
Expand All @@ -170,7 +184,7 @@ func main() {
if retryNum > 0 {
log.Println("[INFO] Retrying command")
}
err := wc.GetAll(ctx, reprocess, prune, integrity)
err := wc.GetAll(ctx, reprocess, prune, integrity, timeliner.Timeframe{Since: tfStart, Until: tfEnd})
if err != nil {
log.Printf("[ERROR][%s/%s] Downloading all: %v",
wc.DataSourceID(), wc.UserID(), err)
Expand Down Expand Up @@ -203,6 +217,47 @@ func main() {
}
}

// parseTimeframe parses tfStartInput and/or tfEndInput and returns
// the resulting start and end times (may be nil), or an error.
func parseTimeframe() (start, end *time.Time, err error) {
var tfStart, tfEnd time.Time
if tfStartInput != "" {
var tfStartRel time.Duration
tfStartRel, err = time.ParseDuration(tfStartInput)
if err == nil {
tfStart = time.Now().Add(tfStartRel)
} else {
tfStart, err = time.Parse(dateFormat, tfStartInput)
if err != nil {
err = fmt.Errorf("bad timeframe start value '%s': %v", tfStartInput, err)
return
}
}
start = &tfStart
}

if tfEndInput != "" {
var tfEndRel time.Duration
tfEndRel, err = time.ParseDuration(tfEndInput)
if err == nil {
tfEnd = time.Now().Add(tfEndRel)
} else {
tfEnd, err = time.Parse(dateFormat, tfEndInput)
if err != nil {
err = fmt.Errorf("bad timeframe end value '%s': %v", tfEndInput, err)
return
}
}
end = &tfEnd
}

if start != nil && end != nil && end.Before(*start) {
err = fmt.Errorf("end time must be after start time (start=%s end=%s)", start, end)
}

return
}

func loadConfig() error {
// no config file is allowed, but that might be useless
_, err := os.Stat(configFile)
Expand Down Expand Up @@ -299,8 +354,12 @@ var (
prune bool
reprocess bool

tfStartInput, tfEndInput string

twitterRetweets bool
twitterReplies bool

phoneDefaultRegion string = "US"
)

const dateFormat = "2006/01/02" // YYYY/MM/DD
12 changes: 12 additions & 0 deletions datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,16 @@ type Timeframe struct {
SinceItemID, UntilItemID *string
}

func (tf Timeframe) String() string {
var sinceItemID, untilItemID string
if tf.SinceItemID != nil {
sinceItemID = *tf.SinceItemID
}
if tf.UntilItemID != nil {
untilItemID = *tf.UntilItemID
}
return fmt.Sprintf("{Since:%s Until:%s SinceItemID:%s UntilItemID:%s}",
tf.Since, tf.Until, sinceItemID, untilItemID)
}

var dataSources = make(map[string]DataSource)
21 changes: 19 additions & 2 deletions timeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,27 @@ func Checkpoint(ctx context.Context, checkpoint []byte) {
return
}

_, err := wc.tl.db.Exec(`UPDATE accounts SET checkpoint=? WHERE id=?`, // TODO: LIMIT 1 (see https://github.com/mattn/go-sqlite3/pull/564)
checkpoint, wc.acc.ID)
chkpt, err := MarshalGob(checkpointWrapper{wc.commandParams, checkpoint})
if err != nil {
log.Printf("[ERROR][%s/%s] Encoding checkpoint wrapper: %v", wc.ds.ID, wc.acc.UserID, err)
return
}

_, err = wc.tl.db.Exec(`UPDATE accounts SET checkpoint=? WHERE id=?`, // TODO: LIMIT 1 (see https://github.com/mattn/go-sqlite3/pull/564)
chkpt, wc.acc.ID)
if err != nil {
log.Printf("[ERROR][%s/%s] Checkpoint: %v", wc.ds.ID, wc.acc.UserID, err)
return
}
}

// checkpointWrapper stores a provider's checkpoint along with the
// parameters of the command that initiated the process; the checkpoint
// will only be loaded and restored to the provider on next run if
// the parameters match, because it doesn't make sense to restore a
// process that has different, potentially conflicting, parameters,
// such as timeframe.
type checkpointWrapper struct {
Params string
Data []byte
}
41 changes: 35 additions & 6 deletions wrappedclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@ type WrappedClient struct {
lastItemRowID int64
lastItemTimestamp time.Time
lastItemMu *sync.Mutex

// used with checkpoints; it only makes sense to resume a checkpoint
// if the process has the same operational parameters as before;
// some providers (like Google Photos) even return errors if you
// query a "next page" with different parameters
commandParams string
}

// GetLatest gets the most recent items from wc. It does not prune or
// reprocess; only meant for a quick pull. If there are no items pulled
// yet, all items will be pulled.
func (wc *WrappedClient) GetLatest(ctx context.Context) error {
// yet, all items will be pulled. If until is not nil, the latest only
// up to that timestamp will be pulled, and if until is after the latest
// item, no items will be pulled.
func (wc *WrappedClient) GetLatest(ctx context.Context, until *time.Time) error {
if ctx == nil {
ctx = context.Background()
}
Expand All @@ -49,20 +57,26 @@ func (wc *WrappedClient) GetLatest(ctx context.Context) error {
}

// constrain the pull to the recent timeframe
timeframe := Timeframe{}
timeframe := Timeframe{Until: until}
if mostRecentTimestamp > 0 {
ts := time.Unix(mostRecentTimestamp, 0)
timeframe.Since = &ts
if timeframe.Until.Before(ts) {
// most recent item is already after "until"/end date; nothing to do
return nil
}
}
if mostRecentOriginalID != "" {
timeframe.SinceItemID = &mostRecentOriginalID
}

checkpoint := wc.prepareCheckpoint(timeframe)

wg, ch := wc.beginProcessing(concurrentCuckoo{}, false, false)

err := wc.Client.ListItems(ctx, ch, Options{
Timeframe: timeframe,
Checkpoint: wc.acc.checkpoint,
Checkpoint: checkpoint,
})
if err != nil {
return fmt.Errorf("getting items from service: %v", err)
Expand All @@ -86,7 +100,7 @@ func (wc *WrappedClient) GetLatest(ctx context.Context) error {
// all items that are listed by wc that exist in the timeline and which
// consist of a data file will be opened and checked for integrity; if
// the file has changed, it will be reprocessed.
func (wc *WrappedClient) GetAll(ctx context.Context, reprocess, prune, integrity bool) error {
func (wc *WrappedClient) GetAll(ctx context.Context, reprocess, prune, integrity bool, tf Timeframe) error {
if wc.Client == nil {
return fmt.Errorf("no client")
}
Expand All @@ -101,9 +115,11 @@ func (wc *WrappedClient) GetAll(ctx context.Context, reprocess, prune, integrity
cc.Mutex = new(sync.Mutex)
}

checkpoint := wc.prepareCheckpoint(tf)

wg, ch := wc.beginProcessing(cc, reprocess, integrity)

err := wc.Client.ListItems(ctx, ch, Options{Checkpoint: wc.acc.checkpoint})
err := wc.Client.ListItems(ctx, ch, Options{Checkpoint: checkpoint, Timeframe: tf})
if err != nil {
return fmt.Errorf("getting items from service: %v", err)
}
Expand All @@ -127,6 +143,19 @@ func (wc *WrappedClient) GetAll(ctx context.Context, reprocess, prune, integrity
return nil
}

// prepareCheckpoint sets the current command parameters on wc for
// checkpoints to be saved later on, and then returns the last
// checkpoint data only if its parameters match the new/current ones.
// This prevents trying to resume a process with different parameters
// which can cause errors.
func (wc *WrappedClient) prepareCheckpoint(tf Timeframe) []byte {
wc.commandParams = tf.String()
if wc.acc.cp == nil || wc.acc.cp.Params != wc.commandParams {
return nil
}
return wc.acc.cp.Data
}

func (wc *WrappedClient) successCleanup() error {
// clear checkpoint
_, err := wc.tl.db.Exec(`UPDATE accounts SET checkpoint=NULL WHERE id=?`, wc.acc.ID) // TODO: limit 1
Expand Down

0 comments on commit a8c45a8

Please sign in to comment.