Skip to content

Commit

Permalink
refactor(dl): use tRun to start session
Browse files Browse the repository at this point in the history
  • Loading branch information
iyear committed Dec 10, 2023
1 parent 9526aee commit dfbb368
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 67 deletions.
126 changes: 60 additions & 66 deletions app/dl/dl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/AlecAivazis/survey/v2"
"github.com/fatih/color"
"github.com/go-faster/errors"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/peers"
"github.com/spf13/viper"
"go.uber.org/multierr"
Expand Down Expand Up @@ -51,88 +52,81 @@ type parser struct {
Parser tmessage.ParseSource
}

func Run(ctx context.Context, opts Options) error {
c, kvd, err := tgc.NoLogin(ctx)
func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr error) {
middlewares, err := tgc.NewDefaultMiddlewares(ctx)
if err != nil {
return err
return errors.Wrap(err, "create middlewares")
}

return tgc.RunWithAuth(ctx, c, func(ctx context.Context) (rerr error) {
middlewares, err := tgc.NewDefaultMiddlewares(ctx)
if err != nil {
return errors.Wrap(err, "create middlewares")
}
pool := dcpool.NewPool(c, int64(viper.GetInt(consts.FlagPoolSize)), middlewares...)
defer multierr.AppendInvoke(&rerr, multierr.Close(pool))

pool := dcpool.NewPool(c, int64(viper.GetInt(consts.FlagPoolSize)), middlewares...)
defer multierr.AppendInvoke(&rerr, multierr.Close(pool))
parsers := []parser{
{Data: opts.URLs, Parser: tmessage.FromURL(ctx, pool, kvd, opts.URLs)},
{Data: opts.Files, Parser: tmessage.FromFile(ctx, pool, kvd, opts.Files, true)},
}
dialogs, err := collectDialogs(parsers)
if err != nil {
return err
}
logger.From(ctx).Debug("Collect dialogs",
zap.Any("dialogs", dialogs))

parsers := []parser{
{Data: opts.URLs, Parser: tmessage.FromURL(ctx, pool, kvd, opts.URLs)},
{Data: opts.Files, Parser: tmessage.FromFile(ctx, pool, kvd, opts.Files, true)},
}
dialogs, err := collectDialogs(parsers)
if err != nil {
return err
}
logger.From(ctx).Debug("Collect dialogs",
zap.Any("dialogs", dialogs))
if opts.Serve {
return serve(ctx, kvd, pool, dialogs, opts.Port, opts.Takeout)
}

if opts.Serve {
return serve(ctx, kvd, pool, dialogs, opts.Port, opts.Takeout)
}
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))

manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))
it, err := newIter(pool, manager, dialogs, opts)
if err != nil {
return err
}

it, err := newIter(pool, manager, dialogs, opts)
if err != nil {
if !opts.Restart {
// resume download and ask user to continue
if err = resume(ctx, kvd, it, !opts.Continue); err != nil {
return err
}
} else {
color.Yellow("Restart download by 'restart' flag")
}

if !opts.Restart {
// resume download and ask user to continue
if err = resume(ctx, kvd, it, !opts.Continue); err != nil {
return err
}
} else {
color.Yellow("Restart download by 'restart' flag")
}

defer func() { // save progress
if rerr != nil { // download is interrupted
multierr.AppendInto(&rerr, saveProgress(ctx, kvd, it))
} else { // if finished, we should clear resume key
multierr.AppendInto(&rerr, kvd.Delete(key.Resume(it.Fingerprint())))
}
}()

dlProgress := prog.New(utils.Byte.FormatBinaryBytes)
dlProgress.SetNumTrackersExpected(it.Total())
prog.EnablePS(ctx, dlProgress)

options := downloader.Options{
Pool: pool,
PartSize: viper.GetInt(consts.FlagPartSize),
Threads: viper.GetInt(consts.FlagThreads),
Iter: it,
Progress: newProgress(dlProgress, it, opts),
defer func() { // save progress
if rerr != nil { // download is interrupted
multierr.AppendInto(&rerr, saveProgress(ctx, kvd, it))
} else { // if finished, we should clear resume key
multierr.AppendInto(&rerr, kvd.Delete(key.Resume(it.Fingerprint())))
}
limit := viper.GetInt(consts.FlagLimit)
}()

dlProgress := prog.New(utils.Byte.FormatBinaryBytes)
dlProgress.SetNumTrackersExpected(it.Total())
prog.EnablePS(ctx, dlProgress)

options := downloader.Options{
Pool: pool,
PartSize: viper.GetInt(consts.FlagPartSize),
Threads: viper.GetInt(consts.FlagThreads),
Iter: it,
Progress: newProgress(dlProgress, it, opts),
}
limit := viper.GetInt(consts.FlagLimit)

logger.From(ctx).Info("Start download",
zap.String("dir", opts.Dir),
zap.Bool("rewrite_ext", opts.RewriteExt),
zap.Bool("skip_same", opts.SkipSame),
zap.Int("part_size", options.PartSize),
zap.Int("threads", options.Threads),
zap.Int("limit", limit))
logger.From(ctx).Info("Start download",
zap.String("dir", opts.Dir),
zap.Bool("rewrite_ext", opts.RewriteExt),
zap.Bool("skip_same", opts.SkipSame),
zap.Int("part_size", options.PartSize),
zap.Int("threads", options.Threads),
zap.Int("limit", limit))

color.Green("All files will be downloaded to '%s' dir", opts.Dir)
color.Green("All files will be downloaded to '%s' dir", opts.Dir)

go dlProgress.Render()
defer prog.Wait(ctx, dlProgress)
go dlProgress.Render()
defer prog.Wait(ctx, dlProgress)

return downloader.New(options).Download(ctx, limit)
})
return downloader.New(options).Download(ctx, limit)
}

func collectDialogs(parsers []parser) ([][]*tmessage.Dialog, error) {
Expand Down
8 changes: 7 additions & 1 deletion cmd/dl.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package cmd

import (
"context"
"fmt"
"strings"

"github.com/gotd/td/telegram"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/iyear/tdl/app/dl"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
)

Expand All @@ -25,7 +28,10 @@ func NewDownload() *cobra.Command {
}

opts.Template = viper.GetString(consts.FlagDlTemplate)
return dl.Run(logger.Named(cmd.Context(), "dl"), opts)

return tRun(cmd.Context(), false, func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return dl.Run(logger.Named(ctx, "dl"), c, kvd, opts)
})
},
}

Expand Down
27 changes: 27 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package cmd

import (
"context"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/go-faster/errors"
"github.com/gotd/td/telegram"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/multierr"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
"github.com/iyear/tdl/pkg/tclient"
)

func New() *cobra.Command {
Expand Down Expand Up @@ -126,3 +129,27 @@ func completeExtFiles(ext ...string) completeFunc {
return files, cobra.ShellCompDirectiveFilterDirs
}
}

func tRun(ctx context.Context, login bool, f func(ctx context.Context, c *telegram.Client, kvd kv.KV) error, middlewares ...telegram.Middleware) error {
// init tclient kv
kvd, err := kv.From(ctx).Open(viper.GetString(consts.FlagNamespace))
if err != nil {
return errors.Wrap(err, "open kv storage")
}
o := tclient.Options{
KV: kvd,
Proxy: viper.GetString(consts.FlagProxy),
NTP: viper.GetString(consts.FlagNTP),
ReconnectTimeout: viper.GetDuration(consts.FlagReconnectTimeout),
Test: viper.GetString(consts.FlagTest) != "",
}

client, err := tclient.New(ctx, o, login, middlewares...)
if err != nil {
return errors.Wrap(err, "create client")
}

return tclient.Run(ctx, client, func(ctx context.Context) error {
return f(ctx, client, kvd)
})
}

0 comments on commit dfbb368

Please sign in to comment.