Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

smb: add support for SMB #6448

Merged
merged 19 commits into from Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -74,6 +74,7 @@ Rclone *("rsync for cloud storage")* is a command-line program to sync files and
* Seafile [:page_facing_up:](https://rclone.org/seafile/)
* SeaweedFS [:page_facing_up:](https://rclone.org/s3/#seaweedfs)
* SFTP [:page_facing_up:](https://rclone.org/sftp/)
* SMB / CIFS [:page_facing_up:](https://rclone.org/smb/)
* StackPath [:page_facing_up:](https://rclone.org/s3/#stackpath)
* Storj [:page_facing_up:](https://rclone.org/storj/)
* SugarSync [:page_facing_up:](https://rclone.org/sugarsync/)
Expand Down
1 change: 1 addition & 0 deletions backend/all/all.go
Expand Up @@ -44,6 +44,7 @@ import (
_ "github.com/rclone/rclone/backend/sftp"
_ "github.com/rclone/rclone/backend/sharefile"
_ "github.com/rclone/rclone/backend/sia"
_ "github.com/rclone/rclone/backend/smb"
_ "github.com/rclone/rclone/backend/storj"
_ "github.com/rclone/rclone/backend/sugarsync"
_ "github.com/rclone/rclone/backend/swift"
Expand Down
228 changes: 228 additions & 0 deletions backend/smb/connpool.go
@@ -0,0 +1,228 @@
package smb

import (
"context"
"fmt"
"net"
"sync/atomic"
"time"

smb2 "github.com/hirochachacha/go-smb2"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/config/obscure"
"github.com/rclone/rclone/fs/fshttp"
)

// dial starts a client connection to the given SMB server. It is a
// convenience function that connects to the given network address,
// initiates the SMB handshake, and then sets up a Client.
func (f *Fs) dial(ctx context.Context, network, addr string) (*conn, error) {
dialer := fshttp.NewDialer(ctx)
tconn, err := dialer.Dial(network, addr)
if err != nil {
return nil, err
}

pass := ""
if f.opt.Pass != "" {
pass, err = obscure.Reveal(f.opt.Pass)
if err != nil {
return nil, err
}
}

d := &smb2.Dialer{
Initiator: &smb2.NTLMInitiator{
User: f.opt.User,
Password: pass,
Domain: f.opt.Domain,
},
}

session, err := d.DialContext(ctx, tconn)
if err != nil {
return nil, err
}

return &conn{
smbSession: session,
conn: &tconn,
}, nil
}

// conn encapsulates a SMB client and corresponding SMB client
type conn struct {
conn *net.Conn
smbSession *smb2.Session
smbShare *smb2.Share
shareName string
}

// Closes the connection
func (c *conn) close() (err error) {
if c.smbShare != nil {
err = c.smbShare.Umount()
}
sessionLogoffErr := c.smbSession.Logoff()
if err != nil {
return err
}
return sessionLogoffErr
}

// True if it's closed
func (c *conn) closed() bool {
var nopErr error
if c.smbShare != nil {
// stat the current directory
_, nopErr = c.smbShare.Stat(".")
} else {
// list the shares
_, nopErr = c.smbSession.ListSharenames()
}
return nopErr == nil
}

// Show that we are using a SMB session
//
// Call removeSession() when done
func (f *Fs) addSession() {
atomic.AddInt32(&f.sessions, 1)
}

// Show the SMB session is no longer in use
func (f *Fs) removeSession() {
atomic.AddInt32(&f.sessions, -1)
}

// getSessions shows whether there are any sessions in use
func (f *Fs) getSessions() int32 {
return atomic.LoadInt32(&f.sessions)
}

// Open a new connection to the SMB server.
func (f *Fs) newConnection(ctx context.Context, share string) (c *conn, err error) {
c, err = f.dial(ctx, "tcp", f.opt.Host+":"+f.opt.Port)
if err != nil {
return nil, fmt.Errorf("couldn't connect SMB: %w", err)
}
if share != "" {
// mount the specified share as well if user requested
c.smbShare, err = c.smbSession.Mount(share)
if err != nil {
_ = c.smbSession.Logoff()
return nil, fmt.Errorf("couldn't initialize SMB: %w", err)
}
c.smbShare = c.smbShare.WithContext(ctx)
}
return c, nil
}

// Ensure the specified share is mounted or the session is unmounted
func (c *conn) mountShare(share string) (err error) {
if c.shareName == share {
return nil
}
if c.smbShare != nil {
err = c.smbShare.Umount()
Lesmiscore marked this conversation as resolved.
Show resolved Hide resolved
}
if err != nil {
return
}
if share != "" {
c.smbShare, err = c.smbSession.Mount(share)
if err != nil {
return
}
}
c.shareName = share
return nil
}

// Get a SMB connection from the pool, or open a new one
func (f *Fs) getConnection(ctx context.Context, share string) (c *conn, err error) {
accounting.LimitTPS(ctx)
f.poolMu.Lock()
for len(f.pool) > 0 {
c = f.pool[0]
f.pool = f.pool[1:]
err = c.mountShare(share)
if err == nil {
break
}
fs.Debugf(f, "Discarding unusable SMB connection: %v", err)
c = nil
}
f.poolMu.Unlock()
if c != nil {
return c, nil
}
err = f.pacer.Call(func() (bool, error) {
c, err = f.newConnection(ctx, share)
if err != nil {
return true, err
}
return false, nil
})
return c, err
}

// Return a SMB connection to the pool
//
// It nils the pointed to connection out so it can't be reused
func (f *Fs) putConnection(pc **conn) {
c := *pc
*pc = nil

var nopErr error
if c.smbShare != nil {
// stat the current directory
_, nopErr = c.smbShare.Stat(".")
} else {
// list the shares
_, nopErr = c.smbSession.ListSharenames()
}
if nopErr != nil {
fs.Debugf(f, "Connection failed, closing: %v", nopErr)
_ = c.close()
return
}

f.poolMu.Lock()
f.pool = append(f.pool, c)
if f.opt.IdleTimeout > 0 {
f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer
}
f.poolMu.Unlock()
}

// Drain the pool of any connections
func (f *Fs) drainPool(ctx context.Context) (err error) {
f.poolMu.Lock()
defer f.poolMu.Unlock()
if sessions := f.getSessions(); sessions != 0 {
fs.Debugf(f, "Not closing %d unused connections as %d sessions active", len(f.pool), sessions)
if f.opt.IdleTimeout > 0 {
f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer
}
return nil
}
if f.opt.IdleTimeout > 0 {
f.drain.Stop()
}
if len(f.pool) != 0 {
fs.Debugf(f, "Closing %d unused connections", len(f.pool))
}
for i, c := range f.pool {
if !c.closed() {
cErr := c.close()
if cErr != nil {
err = cErr
}
}
f.pool[i] = nil
}
f.pool = nil
return err
}