From e3d2df723303fecfb9fb93614b6089f5e2c9952f Mon Sep 17 00:00:00 2001 From: Har01d Date: Wed, 27 Aug 2025 10:49:04 +0800 Subject: [PATCH] add new drivers --- drivers/all.go | 2 + drivers/degoo/driver.go | 203 ++++++++++++++++ drivers/degoo/meta.go | 27 +++ drivers/degoo/types.go | 110 +++++++++ drivers/degoo/upload.go | 198 ++++++++++++++++ drivers/degoo/util.go | 462 +++++++++++++++++++++++++++++++++++++ drivers/teldrive/copy.go | 137 +++++++++++ drivers/teldrive/driver.go | 217 +++++++++++++++++ drivers/teldrive/meta.go | 26 +++ drivers/teldrive/types.go | 77 +++++++ drivers/teldrive/upload.go | 373 ++++++++++++++++++++++++++++++ drivers/teldrive/util.go | 109 +++++++++ 12 files changed, 1941 insertions(+) create mode 100644 drivers/degoo/driver.go create mode 100644 drivers/degoo/meta.go create mode 100644 drivers/degoo/types.go create mode 100644 drivers/degoo/upload.go create mode 100644 drivers/degoo/util.go create mode 100644 drivers/teldrive/copy.go create mode 100644 drivers/teldrive/driver.go create mode 100644 drivers/teldrive/meta.go create mode 100644 drivers/teldrive/types.go create mode 100644 drivers/teldrive/upload.go create mode 100644 drivers/teldrive/util.go diff --git a/drivers/all.go b/drivers/all.go index ca00a90d..514eff34 100644 --- a/drivers/all.go +++ b/drivers/all.go @@ -28,6 +28,7 @@ import ( _ "github.com/OpenListTeam/OpenList/v4/drivers/cloudreve" _ "github.com/OpenListTeam/OpenList/v4/drivers/cloudreve_v4" _ "github.com/OpenListTeam/OpenList/v4/drivers/crypt" + _ "github.com/OpenListTeam/OpenList/v4/drivers/degoo" _ "github.com/OpenListTeam/OpenList/v4/drivers/doubao" _ "github.com/OpenListTeam/OpenList/v4/drivers/doubao_share" _ "github.com/OpenListTeam/OpenList/v4/drivers/dropbox" @@ -66,6 +67,7 @@ import ( _ "github.com/OpenListTeam/OpenList/v4/drivers/smb" _ "github.com/OpenListTeam/OpenList/v4/drivers/strm" _ "github.com/OpenListTeam/OpenList/v4/drivers/teambition" + _ "github.com/OpenListTeam/OpenList/v4/drivers/teldrive" _ "github.com/OpenListTeam/OpenList/v4/drivers/terabox" _ "github.com/OpenListTeam/OpenList/v4/drivers/thunder" _ "github.com/OpenListTeam/OpenList/v4/drivers/thunder_browser" diff --git a/drivers/degoo/driver.go b/drivers/degoo/driver.go new file mode 100644 index 00000000..648a1367 --- /dev/null +++ b/drivers/degoo/driver.go @@ -0,0 +1,203 @@ +package degoo + +import ( + "context" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/errs" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" +) + +type Degoo struct { + model.Storage + Addition + client *http.Client +} + +func (d *Degoo) Config() driver.Config { + return config +} + +func (d *Degoo) GetAddition() driver.Additional { + return &d.Addition +} + +func (d *Degoo) Init(ctx context.Context) error { + + d.client = base.HttpClient + + // Ensure we have a valid token (will login if needed or refresh if expired) + if err := d.ensureValidToken(ctx); err != nil { + return fmt.Errorf("failed to initialize token: %w", err) + } + + return d.getDevices(ctx) +} + +func (d *Degoo) Drop(ctx context.Context) error { + return nil +} + +func (d *Degoo) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) { + items, err := d.getAllFileChildren5(ctx, dir.GetID()) + if err != nil { + return nil, err + } + return utils.MustSliceConvert(items, func(s DegooFileItem) model.Obj { + isFolder := s.Category == 2 || s.Category == 1 || s.Category == 10 + + createTime, modTime, _ := humanReadableTimes(s.CreationTime, s.LastModificationTime, s.LastUploadTime) + + size, err := strconv.ParseInt(s.Size, 10, 64) + if err != nil { + size = 0 // Default to 0 if size parsing fails + } + + return &model.Object{ + ID: s.ID, + Path: s.FilePath, + Name: s.Name, + Size: size, + Modified: modTime, + Ctime: createTime, + IsFolder: isFolder, + } + }), nil +} + +func (d *Degoo) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { + item, err := d.getOverlay4(ctx, file.GetID()) + if err != nil { + return nil, err + } + + return &model.Link{URL: item.URL}, nil +} + +func (d *Degoo) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error { + // This is done by calling the setUploadFile3 API with a special checksum and size. + const query = `mutation SetUploadFile3($Token: String!, $FileInfos: [FileInfoUpload3]!) { setUploadFile3(Token: $Token, FileInfos: $FileInfos) }` + + variables := map[string]interface{}{ + "Token": d.AccessToken, + "FileInfos": []map[string]interface{}{ + { + "Checksum": folderChecksum, + "Name": dirName, + "CreationTime": time.Now().UnixMilli(), + "ParentID": parentDir.GetID(), + "Size": 0, + }, + }, + } + + _, err := d.apiCall(ctx, "SetUploadFile3", query, variables) + if err != nil { + return err + } + + return nil +} + +func (d *Degoo) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { + const query = `mutation SetMoveFile($Token: String!, $Copy: Boolean, $NewParentID: String!, $FileIDs: [String]!) { setMoveFile(Token: $Token, Copy: $Copy, NewParentID: $NewParentID, FileIDs: $FileIDs) }` + + variables := map[string]interface{}{ + "Token": d.AccessToken, + "Copy": false, + "NewParentID": dstDir.GetID(), + "FileIDs": []string{srcObj.GetID()}, + } + + _, err := d.apiCall(ctx, "SetMoveFile", query, variables) + if err != nil { + return nil, err + } + + return srcObj, nil +} + +func (d *Degoo) Rename(ctx context.Context, srcObj model.Obj, newName string) error { + const query = `mutation SetRenameFile($Token: String!, $FileRenames: [FileRenameInfo]!) { setRenameFile(Token: $Token, FileRenames: $FileRenames) }` + + variables := map[string]interface{}{ + "Token": d.AccessToken, + "FileRenames": []DegooFileRenameInfo{ + { + ID: srcObj.GetID(), + NewName: newName, + }, + }, + } + + _, err := d.apiCall(ctx, "SetRenameFile", query, variables) + if err != nil { + return err + } + return nil +} + +func (d *Degoo) Copy(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { + // Copy is not implemented, Degoo API does not support direct copy. + return nil, errs.NotImplement +} + +func (d *Degoo) Remove(ctx context.Context, obj model.Obj) error { + // Remove deletes a file or folder (moves to trash). + const query = `mutation SetDeleteFile5($Token: String!, $IsInRecycleBin: Boolean!, $IDs: [IDType]!) { setDeleteFile5(Token: $Token, IsInRecycleBin: $IsInRecycleBin, IDs: $IDs) }` + + variables := map[string]interface{}{ + "Token": d.AccessToken, + "IsInRecycleBin": false, + "IDs": []map[string]string{{"FileID": obj.GetID()}}, + } + + _, err := d.apiCall(ctx, "SetDeleteFile5", query, variables) + return err +} + +func (d *Degoo) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error { + tmpF, err := file.CacheFullAndWriter(&up, nil) + if err != nil { + return err + } + + parentID := dstDir.GetID() + + // Calculate the checksum for the file. + checksum, err := d.checkSum(tmpF) + if err != nil { + return err + } + + // 1. Get upload authorization via getBucketWriteAuth4. + auths, err := d.getBucketWriteAuth4(ctx, file, parentID, checksum) + if err != nil { + return err + } + + // 2. Upload file. + // support rapid upload + if auths.GetBucketWriteAuth4[0].Error != "Already exist!" { + err = d.uploadS3(ctx, auths, tmpF, file, checksum) + if err != nil { + return err + } + } + + // 3. Register metadata with setUploadFile3. + data, err := d.SetUploadFile3(ctx, file, parentID, checksum) + if err != nil { + return err + } + if !data.SetUploadFile3 { + return fmt.Errorf("setUploadFile3 failed: %v", data) + } + return nil +} diff --git a/drivers/degoo/meta.go b/drivers/degoo/meta.go new file mode 100644 index 00000000..113d2082 --- /dev/null +++ b/drivers/degoo/meta.go @@ -0,0 +1,27 @@ +package degoo + +import ( + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/op" +) + +type Addition struct { + driver.RootID + Username string `json:"username" help:"Your Degoo account email"` + Password string `json:"password" help:"Your Degoo account password"` + RefreshToken string `json:"refresh_token" help:"Refresh token for automatic token renewal, obtained automatically"` + AccessToken string `json:"access_token" help:"Access token for Degoo API, obtained automatically"` +} + +var config = driver.Config{ + Name: "Degoo", + LocalSort: true, + DefaultRoot: "0", + NoOverwriteUpload: true, +} + +func init() { + op.RegisterDriver(func() driver.Driver { + return &Degoo{} + }) +} diff --git a/drivers/degoo/types.go b/drivers/degoo/types.go new file mode 100644 index 00000000..9d793b76 --- /dev/null +++ b/drivers/degoo/types.go @@ -0,0 +1,110 @@ +package degoo + +import ( + "encoding/json" +) + +// DegooLoginRequest represents the login request body. +type DegooLoginRequest struct { + GenerateToken bool `json:"GenerateToken"` + Username string `json:"Username"` + Password string `json:"Password"` +} + +// DegooLoginResponse represents a successful login response. +type DegooLoginResponse struct { + Token string `json:"Token"` + RefreshToken string `json:"RefreshToken"` +} + +// DegooAccessTokenRequest represents the token refresh request body. +type DegooAccessTokenRequest struct { + RefreshToken string `json:"RefreshToken"` +} + +// DegooAccessTokenResponse represents the token refresh response. +type DegooAccessTokenResponse struct { + AccessToken string `json:"AccessToken"` +} + +// DegooFileItem represents a Degoo file or folder. +type DegooFileItem struct { + ID string `json:"ID"` + ParentID string `json:"ParentID"` + Name string `json:"Name"` + Category int `json:"Category"` + Size string `json:"Size"` + URL string `json:"URL"` + CreationTime string `json:"CreationTime"` + LastModificationTime string `json:"LastModificationTime"` + LastUploadTime string `json:"LastUploadTime"` + MetadataID string `json:"MetadataID"` + DeviceID int64 `json:"DeviceID"` + FilePath string `json:"FilePath"` + IsInRecycleBin bool `json:"IsInRecycleBin"` +} + +type DegooErrors struct { + Path []string `json:"path"` + Data interface{} `json:"data"` + ErrorType string `json:"errorType"` + ErrorInfo interface{} `json:"errorInfo"` + Message string `json:"message"` +} + +// DegooGraphqlResponse is the common structure for GraphQL API responses. +type DegooGraphqlResponse struct { + Data json.RawMessage `json:"data"` + Errors []DegooErrors `json:"errors,omitempty"` +} + +// DegooGetChildren5Data is the data field for getFileChildren5. +type DegooGetChildren5Data struct { + GetFileChildren5 struct { + Items []DegooFileItem `json:"Items"` + NextToken string `json:"NextToken"` + } `json:"getFileChildren5"` +} + +// DegooGetOverlay4Data is the data field for getOverlay4. +type DegooGetOverlay4Data struct { + GetOverlay4 DegooFileItem `json:"getOverlay4"` +} + +// DegooFileRenameInfo represents a file rename operation. +type DegooFileRenameInfo struct { + ID string `json:"ID"` + NewName string `json:"NewName"` +} + +// DegooFileIDs represents a list of file IDs for move operations. +type DegooFileIDs struct { + FileIDs []string `json:"FileIDs"` +} + +// DegooGetBucketWriteAuth4Data is the data field for GetBucketWriteAuth4. +type DegooGetBucketWriteAuth4Data struct { + GetBucketWriteAuth4 []struct { + AuthData struct { + PolicyBase64 string `json:"PolicyBase64"` + Signature string `json:"Signature"` + BaseURL string `json:"BaseURL"` + KeyPrefix string `json:"KeyPrefix"` + AccessKey struct { + Key string `json:"Key"` + Value string `json:"Value"` + } `json:"AccessKey"` + ACL string `json:"ACL"` + AdditionalBody []struct { + Key string `json:"Key"` + Value string `json:"Value"` + } `json:"AdditionalBody"` + } `json:"AuthData"` + Error interface{} `json:"Error"` + } `json:"getBucketWriteAuth4"` +} + +// DegooSetUploadFile3Data is the data field for SetUploadFile3. +type DegooSetUploadFile3Data struct { + SetUploadFile3 bool `json:"setUploadFile3"` +} diff --git a/drivers/degoo/upload.go b/drivers/degoo/upload.go new file mode 100644 index 00000000..8144ee7d --- /dev/null +++ b/drivers/degoo/upload.go @@ -0,0 +1,198 @@ +package degoo + +import ( + "bytes" + "context" + "crypto/sha1" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "strconv" + "strings" + + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" +) + +func (d *Degoo) getBucketWriteAuth4(ctx context.Context, file model.FileStreamer, parentID string, checksum string) (*DegooGetBucketWriteAuth4Data, error) { + const query = `query GetBucketWriteAuth4( + $Token: String! + $ParentID: String! + $StorageUploadInfos: [StorageUploadInfo2] + ) { + getBucketWriteAuth4( + Token: $Token + ParentID: $ParentID + StorageUploadInfos: $StorageUploadInfos + ) { + AuthData { + PolicyBase64 + Signature + BaseURL + KeyPrefix + AccessKey { + Key + Value + } + ACL + AdditionalBody { + Key + Value + } + } + Error + } + }` + + variables := map[string]interface{}{ + "Token": d.AccessToken, + "ParentID": parentID, + "StorageUploadInfos": []map[string]string{{ + "FileName": file.GetName(), + "Checksum": checksum, + "Size": strconv.FormatInt(file.GetSize(), 10), + }}} + + data, err := d.apiCall(ctx, "GetBucketWriteAuth4", query, variables) + if err != nil { + return nil, err + } + + var resp DegooGetBucketWriteAuth4Data + err = json.Unmarshal(data, &resp) + if err != nil { + return nil, err + } + + return &resp, nil +} + +// checkSum calculates the SHA1-based checksum for Degoo upload API. +func (d *Degoo) checkSum(file io.Reader) (string, error) { + seed := []byte{13, 7, 2, 2, 15, 40, 75, 117, 13, 10, 19, 16, 29, 23, 3, 36} + hasher := sha1.New() + hasher.Write(seed) + + if _, err := utils.CopyWithBuffer(hasher, file); err != nil { + return "", err + } + + cs := hasher.Sum(nil) + + csBytes := []byte{10, byte(len(cs))} + csBytes = append(csBytes, cs...) + csBytes = append(csBytes, 16, 0) + + return strings.ReplaceAll(base64.StdEncoding.EncodeToString(csBytes), "/", "_"), nil +} + +func (d *Degoo) uploadS3(ctx context.Context, auths *DegooGetBucketWriteAuth4Data, tmpF model.File, file model.FileStreamer, checksum string) error { + a := auths.GetBucketWriteAuth4[0].AuthData + + _, err := tmpF.Seek(0, io.SeekStart) + if err != nil { + return err + } + + ext := utils.Ext(file.GetName()) + key := fmt.Sprintf("%s%s/%s.%s", a.KeyPrefix, ext, checksum, ext) + + var b bytes.Buffer + w := multipart.NewWriter(&b) + err = w.WriteField("key", key) + if err != nil { + return err + } + err = w.WriteField("acl", a.ACL) + if err != nil { + return err + } + err = w.WriteField("policy", a.PolicyBase64) + if err != nil { + return err + } + err = w.WriteField("signature", a.Signature) + if err != nil { + return err + } + err = w.WriteField(a.AccessKey.Key, a.AccessKey.Value) + if err != nil { + return err + } + for _, additional := range a.AdditionalBody { + err = w.WriteField(additional.Key, additional.Value) + if err != nil { + return err + } + } + err = w.WriteField("Content-Type", "") + if err != nil { + return err + } + + _, err = w.CreateFormFile("file", key) + if err != nil { + return err + } + + headSize := b.Len() + err = w.Close() + if err != nil { + return err + } + head := bytes.NewReader(b.Bytes()[:headSize]) + tail := bytes.NewReader(b.Bytes()[headSize:]) + + rateLimitedRd := driver.NewLimitedUploadStream(ctx, io.MultiReader(head, tmpF, tail)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.BaseURL, rateLimitedRd) + if err != nil { + return err + } + req.Header.Add("ngsw-bypass", "1") + req.Header.Add("Content-Type", w.FormDataContentType()) + + res, err := d.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != http.StatusNoContent { + return fmt.Errorf("upload failed with status code %d", res.StatusCode) + } + return nil +} + +var _ driver.Driver = (*Degoo)(nil) + +func (d *Degoo) SetUploadFile3(ctx context.Context, file model.FileStreamer, parentID string, checksum string) (*DegooSetUploadFile3Data, error) { + const query = `mutation SetUploadFile3($Token: String!, $FileInfos: [FileInfoUpload3]!) { + setUploadFile3(Token: $Token, FileInfos: $FileInfos) + }` + + variables := map[string]interface{}{ + "Token": d.AccessToken, + "FileInfos": []map[string]string{{ + "Checksum": checksum, + "CreationTime": strconv.FormatInt(file.CreateTime().UnixMilli(), 10), + "Name": file.GetName(), + "ParentID": parentID, + "Size": strconv.FormatInt(file.GetSize(), 10), + }}} + + data, err := d.apiCall(ctx, "SetUploadFile3", query, variables) + if err != nil { + return nil, err + } + + var resp DegooSetUploadFile3Data + err = json.Unmarshal(data, &resp) + if err != nil { + return nil, err + } + + return &resp, nil +} diff --git a/drivers/degoo/util.go b/drivers/degoo/util.go new file mode 100644 index 00000000..a146e3d4 --- /dev/null +++ b/drivers/degoo/util.go @@ -0,0 +1,462 @@ +package degoo + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/op" +) + +// Thanks to https://github.com/bernd-wechner/Degoo for API research. + +const ( + // API endpoints + loginURL = "https://rest-api.degoo.com/login" + accessTokenURL = "https://rest-api.degoo.com/access-token/v2" + apiURL = "https://production-appsync.degoo.com/graphql" + + // API configuration + apiKey = "da2-vs6twz5vnjdavpqndtbzg3prra" + folderChecksum = "CgAQAg" + + // Token management + tokenRefreshThreshold = 5 * time.Minute + + // Rate limiting + minRequestInterval = 1 * time.Second + + // Error messages + errRateLimited = "rate limited (429), please try again later" + errUnauthorized = "unauthorized access" +) + +var ( + // Global rate limiting - protects against concurrent API calls + lastRequestTime time.Time + requestMutex sync.Mutex +) + +// JWT payload structure for token expiration checking +type JWTPayload struct { + UserID string `json:"userID"` + Exp int64 `json:"exp"` + Iat int64 `json:"iat"` +} + +// Rate limiting helper functions + +// applyRateLimit ensures minimum interval between API requests +func applyRateLimit() { + requestMutex.Lock() + defer requestMutex.Unlock() + + if !lastRequestTime.IsZero() { + if elapsed := time.Since(lastRequestTime); elapsed < minRequestInterval { + time.Sleep(minRequestInterval - elapsed) + } + } + lastRequestTime = time.Now() +} + +// HTTP request helper functions + +// createJSONRequest creates a new HTTP request with JSON body +func createJSONRequest(ctx context.Context, method, url string, body interface{}) (*http.Request, error) { + jsonBody, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("failed to marshal request body: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(jsonBody)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", base.UserAgent) + return req, nil +} + +// checkHTTPResponse checks for common HTTP error conditions +func checkHTTPResponse(resp *http.Response, operation string) error { + if resp.StatusCode == http.StatusTooManyRequests { + return fmt.Errorf("%s %s", operation, errRateLimited) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s failed: %s", operation, resp.Status) + } + return nil +} + +// isTokenExpired checks if the JWT token is expired or will expire soon +func (d *Degoo) isTokenExpired() bool { + if d.AccessToken == "" { + return true + } + + payload, err := extractJWTPayload(d.AccessToken) + if err != nil { + return true // Invalid token format + } + + // Check if token expires within the threshold + expireTime := time.Unix(payload.Exp, 0) + return time.Now().Add(tokenRefreshThreshold).After(expireTime) +} + +// extractJWTPayload extracts and parses JWT payload +func extractJWTPayload(token string) (*JWTPayload, error) { + parts := strings.Split(token, ".") + if len(parts) != 3 { + return nil, fmt.Errorf("invalid JWT format") + } + + // Decode the payload (second part) + payload, err := base64.RawURLEncoding.DecodeString(parts[1]) + if err != nil { + return nil, fmt.Errorf("failed to decode JWT payload: %w", err) + } + + var jwtPayload JWTPayload + if err := json.Unmarshal(payload, &jwtPayload); err != nil { + return nil, fmt.Errorf("failed to parse JWT payload: %w", err) + } + + return &jwtPayload, nil +} + +// refreshToken attempts to refresh the access token using the refresh token +func (d *Degoo) refreshToken(ctx context.Context) error { + if d.RefreshToken == "" { + return fmt.Errorf("no refresh token available") + } + + // Create request + tokenReq := DegooAccessTokenRequest{RefreshToken: d.RefreshToken} + req, err := createJSONRequest(ctx, "POST", accessTokenURL, tokenReq) + if err != nil { + return fmt.Errorf("failed to create refresh token request: %w", err) + } + + // Execute request + resp, err := d.client.Do(req) + if err != nil { + return fmt.Errorf("refresh token request failed: %w", err) + } + defer resp.Body.Close() + + // Check response + if err := checkHTTPResponse(resp, "refresh token"); err != nil { + return err + } + + var accessTokenResp DegooAccessTokenResponse + if err := json.NewDecoder(resp.Body).Decode(&accessTokenResp); err != nil { + return fmt.Errorf("failed to parse access token response: %w", err) + } + + if accessTokenResp.AccessToken == "" { + return fmt.Errorf("empty access token received") + } + + d.AccessToken = accessTokenResp.AccessToken + // Save the updated token to storage + op.MustSaveDriverStorage(d) + + return nil +} + +// ensureValidToken ensures we have a valid, non-expired token +func (d *Degoo) ensureValidToken(ctx context.Context) error { + // Check if token is expired or will expire soon + if d.isTokenExpired() { + // Try to refresh token first if we have a refresh token + if d.RefreshToken != "" { + if refreshErr := d.refreshToken(ctx); refreshErr == nil { + return nil // Successfully refreshed + } else { + // If refresh failed, fall back to full login + fmt.Printf("Token refresh failed, falling back to full login: %v\n", refreshErr) + } + } + + // Perform full login + if d.Username != "" && d.Password != "" { + return d.login(ctx) + } + } + + return nil +} + +// login performs the login process and retrieves the access token. +func (d *Degoo) login(ctx context.Context) error { + if d.Username == "" || d.Password == "" { + return fmt.Errorf("username or password not provided") + } + + creds := DegooLoginRequest{ + GenerateToken: true, + Username: d.Username, + Password: d.Password, + } + + jsonCreds, err := json.Marshal(creds) + if err != nil { + return fmt.Errorf("failed to serialize login credentials: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", loginURL, bytes.NewBuffer(jsonCreds)) + if err != nil { + return fmt.Errorf("failed to create login request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", base.UserAgent) + req.Header.Set("Origin", "https://app.degoo.com") + + resp, err := d.client.Do(req) + if err != nil { + return fmt.Errorf("login request failed: %w", err) + } + defer resp.Body.Close() + + // Handle rate limiting (429 Too Many Requests) + if resp.StatusCode == http.StatusTooManyRequests { + return fmt.Errorf("login rate limited (429), please try again later") + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("login failed: %s", resp.Status) + } + + var loginResp DegooLoginResponse + if err := json.NewDecoder(resp.Body).Decode(&loginResp); err != nil { + return fmt.Errorf("failed to parse login response: %w", err) + } + + if loginResp.RefreshToken != "" { + tokenReq := DegooAccessTokenRequest{RefreshToken: loginResp.RefreshToken} + jsonTokenReq, err := json.Marshal(tokenReq) + if err != nil { + return fmt.Errorf("failed to serialize access token request: %w", err) + } + + tokenReqHTTP, err := http.NewRequestWithContext(ctx, "POST", accessTokenURL, bytes.NewBuffer(jsonTokenReq)) + if err != nil { + return fmt.Errorf("failed to create access token request: %w", err) + } + + tokenReqHTTP.Header.Set("User-Agent", base.UserAgent) + + tokenResp, err := d.client.Do(tokenReqHTTP) + if err != nil { + return fmt.Errorf("failed to get access token: %w", err) + } + defer tokenResp.Body.Close() + + var accessTokenResp DegooAccessTokenResponse + if err := json.NewDecoder(tokenResp.Body).Decode(&accessTokenResp); err != nil { + return fmt.Errorf("failed to parse access token response: %w", err) + } + d.AccessToken = accessTokenResp.AccessToken + d.RefreshToken = loginResp.RefreshToken // Save refresh token + } else if loginResp.Token != "" { + d.AccessToken = loginResp.Token + d.RefreshToken = "" // Direct token, no refresh token available + } else { + return fmt.Errorf("login failed, no valid token returned") + } + + // Save the updated tokens to storage + op.MustSaveDriverStorage(d) + + return nil +} + +// apiCall performs a Degoo GraphQL API request. +func (d *Degoo) apiCall(ctx context.Context, operationName, query string, variables map[string]interface{}) (json.RawMessage, error) { + // Apply rate limiting + applyRateLimit() + + // Ensure we have a valid token before making the API call + if err := d.ensureValidToken(ctx); err != nil { + return nil, fmt.Errorf("failed to ensure valid token: %w", err) + } + + // Update the Token in variables if it exists (after potential refresh) + d.updateTokenInVariables(variables) + + return d.executeGraphQLRequest(ctx, operationName, query, variables) +} + +// updateTokenInVariables updates the Token field in GraphQL variables +func (d *Degoo) updateTokenInVariables(variables map[string]interface{}) { + if variables != nil { + if _, hasToken := variables["Token"]; hasToken { + variables["Token"] = d.AccessToken + } + } +} + +// executeGraphQLRequest executes a GraphQL request with retry logic +func (d *Degoo) executeGraphQLRequest(ctx context.Context, operationName, query string, variables map[string]interface{}) (json.RawMessage, error) { + reqBody := map[string]interface{}{ + "operationName": operationName, + "query": query, + "variables": variables, + } + + // Create and configure request + req, err := createJSONRequest(ctx, "POST", apiURL, reqBody) + if err != nil { + return nil, err + } + + // Set Degoo-specific headers + req.Header.Set("x-api-key", apiKey) + if d.AccessToken != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", d.AccessToken)) + } + + // Execute request + resp, err := d.client.Do(req) + if err != nil { + return nil, fmt.Errorf("GraphQL API request failed: %w", err) + } + defer resp.Body.Close() + + // Check for HTTP errors + if err := checkHTTPResponse(resp, "GraphQL API"); err != nil { + return nil, err + } + + // Parse GraphQL response + var degooResp DegooGraphqlResponse + if err := json.NewDecoder(resp.Body).Decode(°ooResp); err != nil { + return nil, fmt.Errorf("failed to decode GraphQL response: %w", err) + } + + // Handle GraphQL errors + if len(degooResp.Errors) > 0 { + return d.handleGraphQLError(ctx, degooResp.Errors[0], operationName, query, variables) + } + + return degooResp.Data, nil +} + +// handleGraphQLError handles GraphQL-level errors with retry logic +func (d *Degoo) handleGraphQLError(ctx context.Context, gqlError DegooErrors, operationName, query string, variables map[string]interface{}) (json.RawMessage, error) { + if gqlError.ErrorType == "Unauthorized" { + // Re-login and retry + if err := d.login(ctx); err != nil { + return nil, fmt.Errorf("%s, login failed: %w", errUnauthorized, err) + } + + // Update token in variables and retry + d.updateTokenInVariables(variables) + return d.apiCall(ctx, operationName, query, variables) + } + + return nil, fmt.Errorf("GraphQL API error: %s", gqlError.Message) +} + +// humanReadableTimes converts Degoo timestamps to Go time.Time. +func humanReadableTimes(creation, modification, upload string) (cTime, mTime, uTime time.Time) { + cTime, _ = time.Parse(time.RFC3339, creation) + if modification != "" { + modMillis, _ := strconv.ParseInt(modification, 10, 64) + mTime = time.Unix(0, modMillis*int64(time.Millisecond)) + } + if upload != "" { + upMillis, _ := strconv.ParseInt(upload, 10, 64) + uTime = time.Unix(0, upMillis*int64(time.Millisecond)) + } + return cTime, mTime, uTime +} + +// getDevices fetches and caches top-level devices and folders. +func (d *Degoo) getDevices(ctx context.Context) error { + const query = `query GetFileChildren5($Token: String! $ParentID: String $AllParentIDs: [String] $Limit: Int! $Order: Int! $NextToken: String ) { getFileChildren5(Token: $Token ParentID: $ParentID AllParentIDs: $AllParentIDs Limit: $Limit Order: $Order NextToken: $NextToken) { Items { ParentID } NextToken } }` + variables := map[string]interface{}{ + "Token": d.AccessToken, + "ParentID": "0", + "Limit": 10, + "Order": 3, + } + data, err := d.apiCall(ctx, "GetFileChildren5", query, variables) + if err != nil { + return err + } + var resp DegooGetChildren5Data + if err := json.Unmarshal(data, &resp); err != nil { + return fmt.Errorf("failed to parse device list: %w", err) + } + if d.RootFolderID == "0" { + if len(resp.GetFileChildren5.Items) > 0 { + d.RootFolderID = resp.GetFileChildren5.Items[0].ParentID + } + op.MustSaveDriverStorage(d) + } + return nil +} + +// getAllFileChildren5 fetches all children of a directory with pagination. +func (d *Degoo) getAllFileChildren5(ctx context.Context, parentID string) ([]DegooFileItem, error) { + const query = `query GetFileChildren5($Token: String! $ParentID: String $AllParentIDs: [String] $Limit: Int! $Order: Int! $NextToken: String ) { getFileChildren5(Token: $Token ParentID: $ParentID AllParentIDs: $AllParentIDs Limit: $Limit Order: $Order NextToken: $NextToken) { Items { ID ParentID Name Category Size CreationTime LastModificationTime LastUploadTime FilePath IsInRecycleBin DeviceID MetadataID } NextToken } }` + var allItems []DegooFileItem + nextToken := "" + for { + variables := map[string]interface{}{ + "Token": d.AccessToken, + "ParentID": parentID, + "Limit": 1000, + "Order": 3, + } + if nextToken != "" { + variables["NextToken"] = nextToken + } + data, err := d.apiCall(ctx, "GetFileChildren5", query, variables) + if err != nil { + return nil, err + } + var resp DegooGetChildren5Data + if err := json.Unmarshal(data, &resp); err != nil { + return nil, err + } + allItems = append(allItems, resp.GetFileChildren5.Items...) + if resp.GetFileChildren5.NextToken == "" { + break + } + nextToken = resp.GetFileChildren5.NextToken + } + return allItems, nil +} + +// getOverlay4 fetches metadata for a single item by ID. +func (d *Degoo) getOverlay4(ctx context.Context, id string) (DegooFileItem, error) { + const query = `query GetOverlay4($Token: String!, $ID: IDType!) { getOverlay4(Token: $Token, ID: $ID) { ID ParentID Name Category Size CreationTime LastModificationTime LastUploadTime URL FilePath IsInRecycleBin DeviceID MetadataID } }` + variables := map[string]interface{}{ + "Token": d.AccessToken, + "ID": map[string]string{ + "FileID": id, + }, + } + data, err := d.apiCall(ctx, "GetOverlay4", query, variables) + if err != nil { + return DegooFileItem{}, err + } + var resp DegooGetOverlay4Data + if err := json.Unmarshal(data, &resp); err != nil { + return DegooFileItem{}, fmt.Errorf("failed to parse item metadata: %w", err) + } + return resp.GetOverlay4, nil +} diff --git a/drivers/teldrive/copy.go b/drivers/teldrive/copy.go new file mode 100644 index 00000000..1118f63a --- /dev/null +++ b/drivers/teldrive/copy.go @@ -0,0 +1,137 @@ +package teldrive + +import ( + "fmt" + "net/http" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/go-resty/resty/v2" + "golang.org/x/net/context" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" +) + +func NewCopyManager(ctx context.Context, concurrent int, d *Teldrive) *CopyManager { + g, ctx := errgroup.WithContext(ctx) + + return &CopyManager{ + TaskChan: make(chan CopyTask, concurrent*2), + Sem: semaphore.NewWeighted(int64(concurrent)), + G: g, + Ctx: ctx, + d: d, + } +} + +func (cm *CopyManager) startWorkers() { + workerCount := cap(cm.TaskChan) / 2 + for i := 0; i < workerCount; i++ { + cm.G.Go(func() error { + return cm.worker() + }) + } +} + +func (cm *CopyManager) worker() error { + for { + select { + case task, ok := <-cm.TaskChan: + if !ok { + return nil + } + + if err := cm.Sem.Acquire(cm.Ctx, 1); err != nil { + return err + } + + var err error + + err = cm.processFile(task) + + cm.Sem.Release(1) + + if err != nil { + return fmt.Errorf("task processing failed: %w", err) + } + + case <-cm.Ctx.Done(): + return cm.Ctx.Err() + } + } +} + +func (cm *CopyManager) generateTasks(ctx context.Context, srcObj, dstDir model.Obj) error { + if srcObj.IsDir() { + return cm.generateFolderTasks(ctx, srcObj, dstDir) + } else { + // add single file task directly + select { + case cm.TaskChan <- CopyTask{SrcObj: srcObj, DstDir: dstDir}: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (cm *CopyManager) generateFolderTasks(ctx context.Context, srcDir, dstDir model.Obj) error { + objs, err := cm.d.List(ctx, srcDir, model.ListArgs{}) + if err != nil { + return fmt.Errorf("failed to list directory %s: %w", srcDir.GetPath(), err) + } + + err = cm.d.MakeDir(cm.Ctx, dstDir, srcDir.GetName()) + if err != nil || len(objs) == 0 { + return err + } + newDstDir := &model.Object{ + ID: dstDir.GetID(), + Path: dstDir.GetPath() + "/" + srcDir.GetName(), + Name: srcDir.GetName(), + IsFolder: true, + } + + for _, file := range objs { + if utils.IsCanceled(ctx) { + return ctx.Err() + } + + srcFile := &model.Object{ + ID: file.GetID(), + Path: srcDir.GetPath() + "/" + file.GetName(), + Name: file.GetName(), + IsFolder: file.IsDir(), + } + + // 递归生成任务 + if err := cm.generateTasks(ctx, srcFile, newDstDir); err != nil { + return err + } + } + + return nil +} + +func (cm *CopyManager) processFile(task CopyTask) error { + return cm.copySingleFile(cm.Ctx, task.SrcObj, task.DstDir) +} + +func (cm *CopyManager) copySingleFile(ctx context.Context, srcObj, dstDir model.Obj) error { + // `override copy mode` should delete the existing file + if obj, err := cm.d.getFile(dstDir.GetPath(), srcObj.GetName(), srcObj.IsDir()); err == nil { + if err := cm.d.Remove(ctx, obj); err != nil { + return fmt.Errorf("failed to remove existing file: %w", err) + } + } + + // Do copy + return cm.d.request(http.MethodPost, "/api/files/{id}/copy", func(req *resty.Request) { + req.SetPathParam("id", srcObj.GetID()) + req.SetBody(base.Json{ + "newName": srcObj.GetName(), + "destination": dstDir.GetPath(), + }) + }, nil) +} diff --git a/drivers/teldrive/driver.go b/drivers/teldrive/driver.go new file mode 100644 index 00000000..541d2e3b --- /dev/null +++ b/drivers/teldrive/driver.go @@ -0,0 +1,217 @@ +package teldrive + +import ( + "context" + "fmt" + "math" + "net/http" + "net/url" + "strings" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/errs" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/go-resty/resty/v2" + "github.com/google/uuid" +) + +type Teldrive struct { + model.Storage + Addition +} + +func (d *Teldrive) Config() driver.Config { + return config +} + +func (d *Teldrive) GetAddition() driver.Additional { + return &d.Addition +} + +func (d *Teldrive) Init(ctx context.Context) error { + d.Address = strings.TrimSuffix(d.Address, "/") + if d.Cookie == "" || !strings.HasPrefix(d.Cookie, "access_token=") { + return fmt.Errorf("cookie must start with 'access_token='") + } + if d.UploadConcurrency == 0 { + d.UploadConcurrency = 4 + } + if d.ChunkSize == 0 { + d.ChunkSize = 10 + } + + op.MustSaveDriverStorage(d) + return nil +} + +func (d *Teldrive) Drop(ctx context.Context) error { + return nil +} + +func (d *Teldrive) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) { + var listResp ListResp + err := d.request(http.MethodGet, "/api/files", func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "path": dir.GetPath(), + "limit": "1000", // overide default 500, TODO pagination + }) + }, &listResp) + if err != nil { + return nil, err + } + + return utils.SliceConvert(listResp.Items, func(src Object) (model.Obj, error) { + return &model.Object{ + ID: src.ID, + Name: src.Name, + Size: func() int64 { + if src.Type == "folder" { + return 0 + } + return src.Size + }(), + IsFolder: src.Type == "folder", + Modified: src.UpdatedAt, + }, nil + }) +} + +func (d *Teldrive) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { + if d.UseShareLink { + shareObj, err := d.getShareFileById(file.GetID()) + if err != nil || shareObj == nil { + if err := d.createShareFile(file.GetID()); err != nil { + return nil, err + } + shareObj, err = d.getShareFileById(file.GetID()) + if err != nil { + return nil, err + } + } + return &model.Link{ + URL: d.Address + "/api/shares/" + url.PathEscape(shareObj.Id) + "/files/" + url.PathEscape(file.GetID()) + "/" + url.PathEscape(file.GetName()), + }, nil + } + return &model.Link{ + URL: d.Address + "/api/files/" + url.PathEscape(file.GetID()) + "/" + url.PathEscape(file.GetName()), + Header: http.Header{ + "Cookie": {d.Cookie}, + }, + }, nil +} + +func (d *Teldrive) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error { + return d.request(http.MethodPost, "/api/files/mkdir", func(req *resty.Request) { + req.SetBody(map[string]interface{}{ + "path": parentDir.GetPath() + "/" + dirName, + }) + }, nil) +} + +func (d *Teldrive) Move(ctx context.Context, srcObj, dstDir model.Obj) error { + body := base.Json{ + "ids": []string{srcObj.GetID()}, + "destinationParent": dstDir.GetID(), + } + return d.request(http.MethodPost, "/api/files/move", func(req *resty.Request) { + req.SetBody(body) + }, nil) +} + +func (d *Teldrive) Rename(ctx context.Context, srcObj model.Obj, newName string) error { + body := base.Json{ + "name": newName, + } + return d.request(http.MethodPatch, "/api/files/{id}", func(req *resty.Request) { + req.SetPathParam("id", srcObj.GetID()) + req.SetBody(body) + }, nil) +} + +func (d *Teldrive) Copy(ctx context.Context, srcObj, dstDir model.Obj) error { + copyConcurrentLimit := 4 + copyManager := NewCopyManager(ctx, copyConcurrentLimit, d) + copyManager.startWorkers() + copyManager.G.Go(func() error { + defer close(copyManager.TaskChan) + return copyManager.generateTasks(ctx, srcObj, dstDir) + }) + return copyManager.G.Wait() +} + +func (d *Teldrive) Remove(ctx context.Context, obj model.Obj) error { + body := base.Json{ + "ids": []string{obj.GetID()}, + } + return d.request(http.MethodPost, "/api/files/delete", func(req *resty.Request) { + req.SetBody(body) + }, nil) +} + +func (d *Teldrive) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error { + fileId := uuid.New().String() + chunkSizeInMB := d.ChunkSize + chunkSize := chunkSizeInMB * 1024 * 1024 // Convert MB to bytes + totalSize := file.GetSize() + totalParts := int(math.Ceil(float64(totalSize) / float64(chunkSize))) + maxRetried := 3 + + // delete the upload task when finished or failed + defer func() { + _ = d.request(http.MethodDelete, "/api/uploads/{id}", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, nil) + }() + + if obj, err := d.getFile(dstDir.GetPath(), file.GetName(), file.IsDir()); err == nil { + if err = d.Remove(ctx, obj); err != nil { + return err + } + } + // start the upload process + if err := d.request(http.MethodGet, "/api/uploads/fileId", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, nil); err != nil { + return err + } + if totalSize == 0 { + return d.touch(file.GetName(), dstDir.GetPath()) + } + + if totalParts <= 1 { + return d.doSingleUpload(ctx, dstDir, file, up, totalParts, chunkSize, fileId) + } + + return d.doMultiUpload(ctx, dstDir, file, up, maxRetried, totalParts, chunkSize, fileId) +} + +func (d *Teldrive) GetArchiveMeta(ctx context.Context, obj model.Obj, args model.ArchiveArgs) (model.ArchiveMeta, error) { + // TODO get archive file meta-info, return errs.NotImplement to use an internal archive tool, optional + return nil, errs.NotImplement +} + +func (d *Teldrive) ListArchive(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) ([]model.Obj, error) { + // TODO list args.InnerPath in the archive obj, return errs.NotImplement to use an internal archive tool, optional + return nil, errs.NotImplement +} + +func (d *Teldrive) Extract(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) (*model.Link, error) { + // TODO return link of file args.InnerPath in the archive obj, return errs.NotImplement to use an internal archive tool, optional + return nil, errs.NotImplement +} + +func (d *Teldrive) ArchiveDecompress(ctx context.Context, srcObj, dstDir model.Obj, args model.ArchiveDecompressArgs) ([]model.Obj, error) { + // TODO extract args.InnerPath path in the archive srcObj to the dstDir location, optional + // a folder with the same name as the archive file needs to be created to store the extracted results if args.PutIntoNewDir + // return errs.NotImplement to use an internal archive tool + return nil, errs.NotImplement +} + +//func (d *Teldrive) Other(ctx context.Context, args model.OtherArgs) (interface{}, error) { +// return nil, errs.NotSupport +//} + +var _ driver.Driver = (*Teldrive)(nil) diff --git a/drivers/teldrive/meta.go b/drivers/teldrive/meta.go new file mode 100644 index 00000000..23bae5f9 --- /dev/null +++ b/drivers/teldrive/meta.go @@ -0,0 +1,26 @@ +package teldrive + +import ( + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/op" +) + +type Addition struct { + driver.RootPath + Address string `json:"url" required:"true"` + Cookie string `json:"cookie" type:"string" required:"true" help:"access_token=xxx"` + UseShareLink bool `json:"use_share_link" type:"bool" default:"false" help:"Create share link when getting link to support 302. If disabled, you need to enable web proxy."` + ChunkSize int64 `json:"chunk_size" type:"number" default:"10" help:"Chunk size in MiB"` + UploadConcurrency int64 `json:"upload_concurrency" type:"number" default:"4" help:"Concurrency upload requests"` +} + +var config = driver.Config{ + Name: "Teldrive", + DefaultRoot: "/", +} + +func init() { + op.RegisterDriver(func() driver.Driver { + return &Teldrive{} + }) +} diff --git a/drivers/teldrive/types.go b/drivers/teldrive/types.go new file mode 100644 index 00000000..f6399e06 --- /dev/null +++ b/drivers/teldrive/types.go @@ -0,0 +1,77 @@ +package teldrive + +import ( + "context" + "time" + + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" +) + +type ErrResp struct { + Code int `json:"code"` + Message string `json:"message"` +} + +type Object struct { + ID string `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + MimeType string `json:"mimeType"` + Category string `json:"category,omitempty"` + ParentId string `json:"parentId"` + Size int64 `json:"size"` + Encrypted bool `json:"encrypted"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type ListResp struct { + Items []Object `json:"items"` + Meta struct { + Count int `json:"count"` + TotalPages int `json:"totalPages"` + CurrentPage int `json:"currentPage"` + } `json:"meta"` +} + +type FilePart struct { + Name string `json:"name"` + PartId int `json:"partId"` + PartNo int `json:"partNo"` + ChannelId int `json:"channelId"` + Size int `json:"size"` + Encrypted bool `json:"encrypted"` + Salt string `json:"salt"` +} + +type chunkTask struct { + chunkIdx int + fileName string + chunkSize int64 + reader *stream.SectionReader + ss *stream.StreamSectionReader +} + +type CopyManager struct { + TaskChan chan CopyTask + Sem *semaphore.Weighted + G *errgroup.Group + Ctx context.Context + d *Teldrive +} + +type CopyTask struct { + SrcObj model.Obj + DstDir model.Obj +} + +type ShareObj struct { + Id string `json:"id"` + Protected bool `json:"protected"` + UserId int `json:"userId"` + Type string `json:"type"` + Name string `json:"name"` + ExpiresAt time.Time `json:"expiresAt"` +} diff --git a/drivers/teldrive/upload.go b/drivers/teldrive/upload.go new file mode 100644 index 00000000..168d9bef --- /dev/null +++ b/drivers/teldrive/upload.go @@ -0,0 +1,373 @@ +package teldrive + +import ( + "fmt" + "io" + "net/http" + "sort" + "strconv" + "sync" + "time" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/avast/retry-go" + "github.com/go-resty/resty/v2" + "github.com/pkg/errors" + "golang.org/x/net/context" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" +) + +// create empty file +func (d *Teldrive) touch(name, path string) error { + uploadBody := base.Json{ + "name": name, + "type": "file", + "path": path, + } + if err := d.request(http.MethodPost, "/api/files", func(req *resty.Request) { + req.SetBody(uploadBody) + }, nil); err != nil { + return err + } + + return nil +} + +func (d *Teldrive) createFileOnUploadSuccess(name, id, path string, uploadedFileParts []FilePart, totalSize int64) error { + remoteFileParts, err := d.getFilePart(id) + if err != nil { + return err + } + // check if the uploaded file parts match the remote file parts + if len(remoteFileParts) != len(uploadedFileParts) { + return fmt.Errorf("[Teldrive] file parts count mismatch: expected %d, got %d", len(uploadedFileParts), len(remoteFileParts)) + } + formatParts := make([]base.Json, 0) + for _, p := range remoteFileParts { + formatParts = append(formatParts, base.Json{ + "id": p.PartId, + "salt": p.Salt, + }) + } + uploadBody := base.Json{ + "name": name, + "type": "file", + "path": path, + "parts": formatParts, + "size": totalSize, + } + // create file here + if err := d.request(http.MethodPost, "/api/files", func(req *resty.Request) { + req.SetBody(uploadBody) + }, nil); err != nil { + return err + } + + return nil +} + +func (d *Teldrive) checkFilePartExist(fileId string, partId int) (FilePart, error) { + var uploadedParts []FilePart + var filePart FilePart + + if err := d.request(http.MethodGet, "/api/uploads/{id}", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, &uploadedParts); err != nil { + return filePart, err + } + + for _, part := range uploadedParts { + if part.PartId == partId { + return part, nil + } + } + + return filePart, nil +} + +func (d *Teldrive) getFilePart(fileId string) ([]FilePart, error) { + var uploadedParts []FilePart + if err := d.request(http.MethodGet, "/api/uploads/{id}", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, &uploadedParts); err != nil { + return nil, err + } + + return uploadedParts, nil +} + +func (d *Teldrive) singleUploadRequest(fileId string, callback base.ReqCallback, resp interface{}) error { + url := d.Address + "/api/uploads/" + fileId + client := resty.New().SetTimeout(0) + + ctx := context.Background() + + req := client.R(). + SetContext(ctx) + req.SetHeader("Cookie", d.Cookie) + req.SetHeader("Content-Type", "application/octet-stream") + req.SetContentLength(true) + req.AddRetryCondition(func(r *resty.Response, err error) bool { + return false + }) + if callback != nil { + callback(req) + } + if resp != nil { + req.SetResult(resp) + } + var e ErrResp + req.SetError(&e) + _req, err := req.Execute(http.MethodPost, url) + if err != nil { + return err + } + + if _req.IsError() { + return &e + } + return nil +} + +func (d *Teldrive) doSingleUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up model.UpdateProgress, + totalParts int, chunkSize int64, fileId string) error { + + totalSize := file.GetSize() + var fileParts []FilePart + var uploaded int64 = 0 + ss, err := stream.NewStreamSectionReader(file, int(totalSize), &up) + if err != nil { + return err + } + + for uploaded < totalSize { + if utils.IsCanceled(ctx) { + return ctx.Err() + } + curChunkSize := min(totalSize-uploaded, chunkSize) + rd, err := ss.GetSectionReader(uploaded, curChunkSize) + if err != nil { + return err + } + filePart := &FilePart{} + if err := retry.Do(func() error { + + if _, err := rd.Seek(0, io.SeekStart); err != nil { + return err + } + + if err := d.singleUploadRequest(fileId, func(req *resty.Request) { + uploadParams := map[string]string{ + "partName": func() string { + digits := len(fmt.Sprintf("%d", totalParts)) + return file.GetName() + fmt.Sprintf(".%0*d", digits, 1) + }(), + "partNo": strconv.Itoa(1), + "fileName": file.GetName(), + } + req.SetQueryParams(uploadParams) + req.SetBody(driver.NewLimitedUploadStream(ctx, rd)) + req.SetHeader("Content-Length", strconv.FormatInt(curChunkSize, 10)) + }, filePart); err != nil { + return err + } + + return nil + }, + retry.Attempts(3), + retry.DelayType(retry.BackOffDelay), + retry.Delay(time.Second)); err != nil { + return err + } + + if filePart.Name != "" { + fileParts = append(fileParts, *filePart) + uploaded += curChunkSize + up(float64(uploaded) / float64(totalSize)) + ss.FreeSectionReader(rd) + } + + } + + return d.createFileOnUploadSuccess(file.GetName(), fileId, dstDir.GetPath(), fileParts, totalSize) +} + +func (d *Teldrive) doMultiUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up model.UpdateProgress, + maxRetried, totalParts int, chunkSize int64, fileId string) error { + + concurrent := d.UploadConcurrency + g, ctx := errgroup.WithContext(ctx) + sem := semaphore.NewWeighted(int64(concurrent)) + chunkChan := make(chan chunkTask, concurrent*2) + resultChan := make(chan FilePart, concurrent) + totalSize := file.GetSize() + + ss, err := stream.NewStreamSectionReader(file, int(totalSize), &up) + if err != nil { + return err + } + ssLock := sync.Mutex{} + g.Go(func() error { + defer close(chunkChan) + + chunkIdx := 0 + for chunkIdx < totalParts { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + offset := int64(chunkIdx) * chunkSize + curChunkSize := min(totalSize-offset, chunkSize) + + ssLock.Lock() + reader, err := ss.GetSectionReader(offset, curChunkSize) + ssLock.Unlock() + + if err != nil { + return err + } + task := chunkTask{ + chunkIdx: chunkIdx + 1, + chunkSize: curChunkSize, + fileName: file.GetName(), + reader: reader, + ss: ss, + } + // freeSectionReader will be called in d.uploadSingleChunk + select { + case chunkChan <- task: + chunkIdx++ + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + for i := 0; i < int(concurrent); i++ { + g.Go(func() error { + for task := range chunkChan { + if err := sem.Acquire(ctx, 1); err != nil { + return err + } + + filePart, err := d.uploadSingleChunk(ctx, fileId, task, totalParts, maxRetried) + sem.Release(1) + + if err != nil { + return fmt.Errorf("upload chunk %d failed: %w", task.chunkIdx, err) + } + + select { + case resultChan <- *filePart: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + var fileParts []FilePart + var collectErr error + collectDone := make(chan struct{}) + + go func() { + defer close(collectDone) + fileParts = make([]FilePart, 0, totalParts) + + done := make(chan error, 1) + go func() { + done <- g.Wait() + close(resultChan) + }() + + for { + select { + case filePart, ok := <-resultChan: + if !ok { + collectErr = <-done + return + } + fileParts = append(fileParts, filePart) + case err := <-done: + collectErr = err + return + } + } + }() + + <-collectDone + + if collectErr != nil { + return fmt.Errorf("multi-upload failed: %w", collectErr) + } + sort.Slice(fileParts, func(i, j int) bool { + return fileParts[i].PartNo < fileParts[j].PartNo + }) + + return d.createFileOnUploadSuccess(file.GetName(), fileId, dstDir.GetPath(), fileParts, totalSize) +} + +func (d *Teldrive) uploadSingleChunk(ctx context.Context, fileId string, task chunkTask, totalParts, maxRetried int) (*FilePart, error) { + filePart := &FilePart{} + retryCount := 0 + defer task.ss.FreeSectionReader(task.reader) + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + if existingPart, err := d.checkFilePartExist(fileId, task.chunkIdx); err == nil && existingPart.Name != "" { + return &existingPart, nil + } + + err := d.singleUploadRequest(fileId, func(req *resty.Request) { + uploadParams := map[string]string{ + "partName": func() string { + digits := len(fmt.Sprintf("%d", totalParts)) + return task.fileName + fmt.Sprintf(".%0*d", digits, task.chunkIdx) + }(), + "partNo": strconv.Itoa(task.chunkIdx), + "fileName": task.fileName, + } + req.SetQueryParams(uploadParams) + req.SetBody(driver.NewLimitedUploadStream(ctx, task.reader)) + req.SetHeader("Content-Length", strconv.Itoa(int(task.chunkSize))) + }, filePart) + + if err == nil { + return filePart, nil + } + + if retryCount >= maxRetried { + return nil, fmt.Errorf("upload failed after %d retries: %w", maxRetried, err) + } + + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + continue + } + + retryCount++ + utils.Log.Errorf("[Teldrive] upload error: %v, retrying %d times", err, retryCount) + + backoffDuration := time.Duration(retryCount*retryCount) * time.Second + if backoffDuration > 30*time.Second { + backoffDuration = 30 * time.Second + } + + select { + case <-time.After(backoffDuration): + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} diff --git a/drivers/teldrive/util.go b/drivers/teldrive/util.go new file mode 100644 index 00000000..ca3cccf9 --- /dev/null +++ b/drivers/teldrive/util.go @@ -0,0 +1,109 @@ +package teldrive + +import ( + "fmt" + "net/http" + "time" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/go-resty/resty/v2" +) + +// do others that not defined in Driver interface + +func (d *Teldrive) request(method string, pathname string, callback base.ReqCallback, resp interface{}) error { + url := d.Address + pathname + req := base.RestyClient.R() + req.SetHeader("Cookie", d.Cookie) + if callback != nil { + callback(req) + } + if resp != nil { + req.SetResult(resp) + } + var e ErrResp + req.SetError(&e) + _req, err := req.Execute(method, url) + if err != nil { + return err + } + + if _req.IsError() { + return &e + } + return nil +} + +func (d *Teldrive) getFile(path, name string, isFolder bool) (model.Obj, error) { + resp := &ListResp{} + err := d.request(http.MethodGet, "/api/files", func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "path": path, + "name": name, + "type": func() string { + if isFolder { + return "folder" + } + return "file" + }(), + "operation": "find", + }) + }, resp) + if err != nil { + return nil, err + } + if len(resp.Items) == 0 { + return nil, fmt.Errorf("file not found: %s/%s", path, name) + } + obj := resp.Items[0] + return &model.Object{ + ID: obj.ID, + Name: obj.Name, + Size: obj.Size, + IsFolder: obj.Type == "folder", + }, err +} + +func (err *ErrResp) Error() string { + if err == nil { + return "" + } + + return fmt.Sprintf("[Teldrive] message:%s Error code:%d", err.Message, err.Code) +} + +func (d *Teldrive) createShareFile(fileId string) error { + var errResp ErrResp + if err := d.request(http.MethodPost, "/api/files/{id}/share", func(req *resty.Request) { + req.SetPathParam("id", fileId) + req.SetBody(base.Json{ + "expiresAt": getDateTime(), + }) + }, &errResp); err != nil { + return err + } + + if errResp.Message != "" { + return &errResp + } + + return nil +} + +func (d *Teldrive) getShareFileById(fileId string) (*ShareObj, error) { + var shareObj ShareObj + if err := d.request(http.MethodGet, "/api/files/{id}/share", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, &shareObj); err != nil { + return nil, err + } + + return &shareObj, nil +} + +func getDateTime() string { + now := time.Now().UTC() + formattedWithMs := now.Add(time.Hour * 1).Format("2006-01-02T15:04:05.000Z") + return formattedWithMs +}