Skip to content

Commit

Permalink
* add start/stop/remove/delete commands
Browse files Browse the repository at this point in the history
* fix up sftp backend to handle different fs types
  • Loading branch information
majestrate committed Sep 21, 2017
1 parent bed9e5d commit 0253542
Show file tree
Hide file tree
Showing 15 changed files with 345 additions and 23 deletions.
81 changes: 79 additions & 2 deletions src/xd/cmd/rpc/rpc.go
Expand Up @@ -57,6 +57,30 @@ func Run() {
addTorrents(c, args...)
count++
}
case "start":
for count < swarms {
c := rpc.NewClient(u.String(), count)
startTorrents(c, args...)
count++
}
case "stop":
for count < swarms {
c := rpc.NewClient(u.String(), count)
stopTorrents(c, args...)
count++
}
case "remove":
for count < swarms {
c := rpc.NewClient(u.String(), count)
removeTorrents(c, args...)
count++
}
case "delete":
for count < swarms {
c := rpc.NewClient(u.String(), count)
deleteTorrents(c, args...)
count++
}
case "set-piece-window":
for count < swarms {
c := rpc.NewClient(u.String(), count)
Expand All @@ -83,8 +107,61 @@ func setPieceWindow(c *rpc.Client, str string) {

func addTorrents(c *rpc.Client, urls ...string) {
for idx := range urls {
fmt.Printf("fetch %s", urls[idx])
c.AddTorrent(urls[idx])
fmt.Printf("fetch %s ... ", urls[idx])
err := c.AddTorrent(urls[idx])
if err == nil {
fmt.Println("OK")
} else {
fmt.Println(err.Error())
}
}
}

func startTorrents(c *rpc.Client, ih ...string) {
for idx := range ih {
fmt.Printf("start %s ... ", ih[idx])
err := c.AddTorrent(ih[idx])
if err == nil {
fmt.Println("OK")
} else {
fmt.Println(err.Error())
}
}
}

func stopTorrents(c *rpc.Client, ih ...string) {
for idx := range ih {
fmt.Printf("stop %s ... ", ih[idx])
err := c.StopTorrent(ih[idx])
if err == nil {
fmt.Println("OK")
} else {
fmt.Println(err.Error())
}
}
}

func removeTorrents(c *rpc.Client, ih ...string) {
for idx := range ih {
fmt.Printf("remove %s ... ", ih[idx])
err := c.RemoveTorrent(ih[idx])
if err == nil {
fmt.Println("OK")
} else {
fmt.Println(err.Error())
}
}
}

func deleteTorrents(c *rpc.Client, ih ...string) {
for idx := range ih {
fmt.Printf("delete %s ... ", ih[idx])
err := c.DeleteTorrent(ih[idx])
if err == nil {
fmt.Println("OK")
} else {
fmt.Println(err.Error())
}
}
}

Expand Down
1 change: 0 additions & 1 deletion src/xd/cmd/xd/xd.go
Expand Up @@ -138,7 +138,6 @@ func Run() {
for idx := range swarms {
net := conf.I2P.CreateSession()
go runFunc(net, swarms[idx])
closers = append(closers, net)
}
sigchnl := make(chan os.Signal)
signal.Notify(sigchnl, os.Interrupt)
Expand Down
23 changes: 19 additions & 4 deletions src/xd/lib/bittorrent/swarm/holder.go
Expand Up @@ -26,6 +26,19 @@ func (h *Holder) addTorrent(t storage.Torrent) {
h.access.Unlock()
}

func (h *Holder) removeTorrent(ih common.Infohash) {
if h.closing {
return
}
h.access.Lock()
ihh := ih.Hex()
_, ok := h.torrents[ihh]
if ok {
delete(h.torrents, ihh)
}
h.access.Unlock()
}

func (h *Holder) forEachTorrent(visit func(*Torrent), fork bool) {
if h.torrents == nil {
return
Expand Down Expand Up @@ -76,21 +89,23 @@ func (h *Holder) Close() (err error) {
if h.closing {
return
}
var wg sync.WaitGroup
var torrents []string
h.access.Lock()
h.closing = true
h.access.Lock()
for n := range h.torrents {
torrents = append(torrents, n)
}
h.access.Unlock()
for _, n := range torrents {
wg.Add(1)
go func(name string) {
h.access.Lock()
t := h.torrents[name]
delete(h.torrents, name)
h.access.Unlock()
t.Close()
wg.Done()
}(n)
}
wg.Wait()
h.access.Unlock()
return
}
8 changes: 6 additions & 2 deletions src/xd/lib/bittorrent/swarm/swarm.go
Expand Up @@ -38,6 +38,9 @@ func (sw *Swarm) WaitForNetwork() {
}

func (sw *Swarm) startTorrent(t *Torrent) {
t.RemoveSelf = func() {
sw.Torrents.removeTorrent(t.st.Infohash())
}
sw.WaitForNetwork()
t.ObtainedNetwork(sw.net)
t.xdht = &sw.xdht
Expand All @@ -63,7 +66,7 @@ func (sw *Swarm) startTorrent(t *Torrent) {
// start annoucing
go t.StartAnnouncing()
// handle messages
go t.Run()
t.Start()
}

// got inbound connection
Expand Down Expand Up @@ -187,7 +190,8 @@ func (sw *Swarm) AddOpenTracker(url string) {
func (sw *Swarm) Close() (err error) {
if !sw.closing {
sw.closing = true
err = sw.Torrents.Close()
sw.Torrents.Close()
err = sw.net.Close()
}
return
}
Expand Down
47 changes: 45 additions & 2 deletions src/xd/lib/bittorrent/swarm/torrent.go
Expand Up @@ -2,6 +2,7 @@ package swarm

import (
"bytes"
"errors"
"net"
"sync"
"time"
Expand All @@ -21,6 +22,7 @@ type Torrent struct {
Completed func()
Started func()
Stopped func()
RemoveSelf func()
netacces sync.Mutex
suspended bool
netContext network.Network
Expand All @@ -36,6 +38,7 @@ type Torrent struct {
pt *pieceTracker
defaultOpts *extensions.Message
closing bool
started bool
MaxRequests int
pexState *PEXSwarmState
xdht *dht.XDHT
Expand Down Expand Up @@ -84,7 +87,7 @@ func (t *Torrent) Close() error {
return nil
}
t.closing = true
t.StopAnnouncing()
t.started = false
t.VisitPeers(func(c *PeerConn) {
c.Close()
})
Expand Down Expand Up @@ -483,13 +486,14 @@ func (t *Torrent) onNewPeer(c *PeerConn) {
c.Send(t.Bitfield().ToWireMessage())
}

func (t *Torrent) Run() {
func (t *Torrent) run() {
if !t.MetaInfo().IsPrivate() {
go t.pexBroadcastLoop()
}
if t.Started != nil {
go t.Started()
}
t.started = true
for !t.Done() {
time.Sleep(time.Second * 5)
}
Expand Down Expand Up @@ -539,3 +543,42 @@ func (t *Torrent) Done() bool {
}
return bf.Completed()
}

var ErrAlreadyStopped = errors.New("torrent already stopped")
var ErrAlreadyStarted = errors.New("torrent already started")

func (t *Torrent) Stop() error {
if t.closing {
return ErrAlreadyStopped
}
t.StopAnnouncing()
return t.Close()
}

func (t *Torrent) Delete() error {
t.StopAnnouncing()
t.Close()
err := t.st.Delete()
if err == nil {
t.RemoveSelf()
}
return err
}

func (t *Torrent) Remove() error {
err := t.Stop()
if err != nil {
return err
}
t.RemoveSelf()
return nil
}

func (t *Torrent) Start() error {
if t.started {
return ErrAlreadyStarted
}
t.closing = false
go t.run()
return nil
}
6 changes: 6 additions & 0 deletions src/xd/lib/fs/fs.go
Expand Up @@ -30,4 +30,10 @@ type Driver interface {
EnsureFile(fpath string, sz uint64) error
// filepath.Glob lookalike
Glob(str string) ([]string, error)
// remove single file
Remove(fpath string) error
// Remove all in filepath
RemoveAll(fpath string) error
// Join path
Join(parts ...string) string
}
51 changes: 51 additions & 0 deletions src/xd/lib/fs/sftp.go
Expand Up @@ -135,6 +135,8 @@ func (fs *sftpFS) ensureConn(visit func(*sftp.Client) error) error {
s, err := fs.ensureSFTP()
if err == nil {
err = visit(s)
} else {
err = fs.Close()
}
return err
}
Expand Down Expand Up @@ -249,6 +251,55 @@ func (fs *sftpFS) EnsureFile(fname string, sz uint64) error {
})
}

func (fs *sftpFS) removeAllDir(root string, c *sftp.Client) error {
dirs, err := c.ReadDir(root)
if err != nil {
return err
}
for idx := range dirs {
if dirs[idx].IsDir() {
err = fs.removeAllDir(dirs[idx].Name(), c)
} else {
err = c.Remove(fs.Join(root, dirs[idx].Name()))
}
if err != nil {
return err
}
}
return c.RemoveDirectory(root)
}

func (fs *sftpFS) Join(paths ...string) string {
p := ""
err := fs.ensureConn(func(c *sftp.Client) error {
p = c.Join(paths...)
return nil
})
if err != nil {
panic(err.Error())
}
return p
}

func (fs *sftpFS) Remove(fpath string) error {
return fs.ensureConn(func(c *sftp.Client) error {
return c.Remove(fpath)
})
}

func (fs *sftpFS) RemoveAll(fpath string) error {
return fs.ensureConn(func(c *sftp.Client) error {
st, err := c.Stat(fpath)
if err != nil {
return err
}
if st.IsDir() {
return fs.removeAllDir(fpath, c)
}
return c.Remove(fpath)
})
}

func SFTP(username, hostname, keyfile, remotekey string, port int) Driver {
return &sftpFS{
username: username,
Expand Down
12 changes: 12 additions & 0 deletions src/xd/lib/fs/std.go
Expand Up @@ -41,3 +41,15 @@ func (f stdFs) OpenFileReadOnly(fname string) (ReadFile, error) {
func (f stdFs) OpenFileWriteOnly(fname string) (WriteFile, error) {
return os.OpenFile(fname, os.O_CREATE|os.O_WRONLY, 0600)
}

func (f stdFs) RemoveAll(fname string) error {
return os.RemoveAll(fname)
}

func (f stdFs) Remove(fname string) error {
return os.Remove(fname)
}

func (f stdFs) Join(parts ...string) string {
return filepath.Join(parts...)
}

0 comments on commit 0253542

Please sign in to comment.