Skip to content

Commit 0533228

Browse files
committed
feat: add local cas generation
1 parent 7cd0253 commit 0533228

File tree

4 files changed

+318
-13
lines changed

4 files changed

+318
-13
lines changed

drivers/local/cas.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package local
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"encoding/hex"
7+
"hash"
8+
"os"
9+
"path/filepath"
10+
"strconv"
11+
"strings"
12+
"time"
13+
14+
"github.com/OpenListTeam/OpenList/v4/internal/model"
15+
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
16+
)
17+
18+
const localCASSliceSize int64 = 10 * 1024 * 1024
19+
20+
type casUploadInfo struct {
21+
Name string
22+
Size int64
23+
MD5 string
24+
SliceMD5 string
25+
}
26+
27+
type casPayload struct {
28+
Name string `json:"name"`
29+
Size int64 `json:"size"`
30+
MD5 string `json:"md5"`
31+
SliceMD5 string `json:"sliceMd5"`
32+
CreateTime string `json:"create_time"`
33+
}
34+
35+
type casHasherWriter struct {
36+
fileMD5 hash.Hash
37+
sliceMD5 hash.Hash
38+
written int64
39+
currentSliceSize int64
40+
sliceMD5Hexs []string
41+
}
42+
43+
func newCASHasherWriter() *casHasherWriter {
44+
return &casHasherWriter{
45+
fileMD5: utils.MD5.NewFunc(),
46+
sliceMD5: utils.MD5.NewFunc(),
47+
}
48+
}
49+
50+
func (w *casHasherWriter) Write(p []byte) (int, error) {
51+
total := len(p)
52+
for len(p) > 0 {
53+
remaining := localCASSliceSize - w.currentSliceSize
54+
n := len(p)
55+
if int64(n) > remaining {
56+
n = int(remaining)
57+
}
58+
chunk := p[:n]
59+
_, _ = w.fileMD5.Write(chunk)
60+
_, _ = w.sliceMD5.Write(chunk)
61+
w.written += int64(n)
62+
w.currentSliceSize += int64(n)
63+
p = p[n:]
64+
if w.currentSliceSize == localCASSliceSize {
65+
w.finishSlice()
66+
}
67+
}
68+
return total, nil
69+
}
70+
71+
func (w *casHasherWriter) finishSlice() {
72+
if w.currentSliceSize == 0 {
73+
return
74+
}
75+
w.sliceMD5Hexs = append(w.sliceMD5Hexs, strings.ToUpper(hex.EncodeToString(w.sliceMD5.Sum(nil))))
76+
w.sliceMD5.Reset()
77+
w.currentSliceSize = 0
78+
}
79+
80+
func (w *casHasherWriter) Info(name string) *casUploadInfo {
81+
if w.written > localCASSliceSize && w.currentSliceSize > 0 {
82+
w.finishSlice()
83+
}
84+
85+
fileMD5Hex := hex.EncodeToString(w.fileMD5.Sum(nil))
86+
sliceMD5Hex := fileMD5Hex
87+
if w.written > localCASSliceSize {
88+
sliceMD5Hex = utils.GetMD5EncodeStr(strings.Join(w.sliceMD5Hexs, "\n"))
89+
}
90+
91+
return &casUploadInfo{
92+
Name: name,
93+
Size: w.written,
94+
MD5: fileMD5Hex,
95+
SliceMD5: sliceMD5Hex,
96+
}
97+
}
98+
99+
func (d *Local) shouldUploadCAS(name string) bool {
100+
return (d.GenerateCAS || d.GenerateCASAndDeleteSource) && !strings.HasSuffix(strings.ToLower(name), ".cas")
101+
}
102+
103+
func (d *Local) shouldDeleteSource() bool {
104+
return d.DeleteSource || d.GenerateCASAndDeleteSource
105+
}
106+
107+
func (d *Local) uploadCAS(ctx context.Context, dstDir model.Obj, info *casUploadInfo) error {
108+
if info == nil || !d.shouldUploadCAS(info.Name) {
109+
return nil
110+
}
111+
if err := ctx.Err(); err != nil {
112+
return err
113+
}
114+
115+
content, err := utils.Json.Marshal(casPayload{
116+
Name: info.Name,
117+
Size: info.Size,
118+
MD5: info.MD5,
119+
SliceMD5: info.SliceMD5,
120+
CreateTime: strconv.FormatInt(time.Now().Unix(), 10),
121+
})
122+
if err != nil {
123+
return err
124+
}
125+
content = []byte(base64.StdEncoding.EncodeToString(content))
126+
127+
casPath := filepath.Join(dstDir.GetPath(), info.Name+".cas")
128+
if err = os.WriteFile(casPath, content, 0o666); err != nil {
129+
return err
130+
}
131+
d.updateDirSize(dstDir.GetPath())
132+
return nil
133+
}
134+
135+
func (d *Local) deleteSource(ctx context.Context, fullPath string, info *casUploadInfo) error {
136+
if info == nil || !d.shouldDeleteSource() || !d.shouldUploadCAS(info.Name) {
137+
return nil
138+
}
139+
return d.Remove(ctx, &model.Object{
140+
Path: fullPath,
141+
Name: info.Name,
142+
Size: info.Size,
143+
})
144+
}
145+
146+
func (d *Local) updateDirSize(dirPath string) {
147+
if d.directoryMap.Has(dirPath) {
148+
d.directoryMap.UpdateDirSize(dirPath)
149+
d.directoryMap.UpdateDirParents(dirPath)
150+
}
151+
}

drivers/local/cas_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package local
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/base64"
7+
"os"
8+
"path/filepath"
9+
"strconv"
10+
"testing"
11+
"time"
12+
13+
"github.com/OpenListTeam/OpenList/v4/internal/driver"
14+
"github.com/OpenListTeam/OpenList/v4/internal/model"
15+
"github.com/OpenListTeam/OpenList/v4/internal/stream"
16+
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
17+
)
18+
19+
func TestLocalPutGenerateCAS(t *testing.T) {
20+
root := t.TempDir()
21+
drv := &Local{
22+
Addition: Addition{
23+
RootPath: driver.RootPath{RootFolderPath: root},
24+
GenerateCAS: true,
25+
},
26+
}
27+
if err := drv.Init(context.Background()); err != nil {
28+
t.Fatalf("init local driver: %v", err)
29+
}
30+
31+
data := []byte("hello openlist cas")
32+
modTime := time.Unix(1710000000, 0)
33+
file := &stream.FileStream{
34+
Ctx: context.Background(),
35+
Obj: &model.Object{
36+
Name: "hello.txt",
37+
Size: int64(len(data)),
38+
Modified: modTime,
39+
Ctime: modTime,
40+
},
41+
Reader: bytes.NewReader(data),
42+
}
43+
dstDir := &model.Object{
44+
Path: root,
45+
Name: filepath.Base(root),
46+
IsFolder: true,
47+
}
48+
49+
if err := drv.Put(context.Background(), dstDir, file, func(float64) {}); err != nil {
50+
t.Fatalf("put file: %v", err)
51+
}
52+
53+
if _, err := os.Stat(filepath.Join(root, "hello.txt")); err != nil {
54+
t.Fatalf("stat source file: %v", err)
55+
}
56+
57+
casContent, err := os.ReadFile(filepath.Join(root, "hello.txt.cas"))
58+
if err != nil {
59+
t.Fatalf("read cas file: %v", err)
60+
}
61+
rawPayload, err := base64.StdEncoding.DecodeString(string(casContent))
62+
if err != nil {
63+
t.Fatalf("decode cas file: %v", err)
64+
}
65+
66+
var payload casPayload
67+
if err = utils.Json.Unmarshal(rawPayload, &payload); err != nil {
68+
t.Fatalf("unmarshal cas payload: %v", err)
69+
}
70+
71+
if payload.Name != "hello.txt" {
72+
t.Fatalf("unexpected cas name: %s", payload.Name)
73+
}
74+
if payload.Size != int64(len(data)) {
75+
t.Fatalf("unexpected cas size: %d", payload.Size)
76+
}
77+
78+
expectMD5 := utils.HashData(utils.MD5, data)
79+
if payload.MD5 != expectMD5 {
80+
t.Fatalf("unexpected cas md5: %s", payload.MD5)
81+
}
82+
if payload.SliceMD5 != expectMD5 {
83+
t.Fatalf("unexpected cas slice md5: %s", payload.SliceMD5)
84+
}
85+
if _, err = strconv.ParseInt(payload.CreateTime, 10, 64); err != nil {
86+
t.Fatalf("unexpected cas create time: %v", err)
87+
}
88+
}
89+
90+
func TestLocalPutGenerateCASAndDeleteSource(t *testing.T) {
91+
root := t.TempDir()
92+
drv := &Local{
93+
Addition: Addition{
94+
RootPath: driver.RootPath{RootFolderPath: root},
95+
GenerateCAS: true,
96+
DeleteSource: true,
97+
},
98+
}
99+
if err := drv.Init(context.Background()); err != nil {
100+
t.Fatalf("init local driver: %v", err)
101+
}
102+
103+
data := []byte("hello openlist cas delete source")
104+
file := &stream.FileStream{
105+
Ctx: context.Background(),
106+
Obj: &model.Object{
107+
Name: "delete.txt",
108+
Size: int64(len(data)),
109+
Modified: time.Unix(1710000000, 0),
110+
},
111+
Reader: bytes.NewReader(data),
112+
}
113+
dstDir := &model.Object{
114+
Path: root,
115+
Name: filepath.Base(root),
116+
IsFolder: true,
117+
}
118+
119+
if err := drv.Put(context.Background(), dstDir, file, func(float64) {}); err != nil {
120+
t.Fatalf("put file: %v", err)
121+
}
122+
123+
if _, err := os.Stat(filepath.Join(root, "delete.txt")); !os.IsNotExist(err) {
124+
t.Fatalf("source file should be removed, got err=%v", err)
125+
}
126+
if _, err := os.Stat(filepath.Join(root, "delete.txt.cas")); err != nil {
127+
t.Fatalf("cas file should exist: %v", err)
128+
}
129+
}

drivers/local/driver.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"io"
89
"io/fs"
910
"net/http"
1011
"os"
@@ -406,28 +407,49 @@ func (d *Local) Remove(ctx context.Context, obj model.Obj) error {
406407
}
407408

408409
func (d *Local) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
410+
var err error
409411
fullPath := filepath.Join(dstDir.GetPath(), stream.GetName())
410412
out, err := os.Create(fullPath)
411413
if err != nil {
412414
return err
413415
}
416+
closed := false
414417
defer func() {
415-
_ = out.Close()
418+
if !closed {
419+
_ = out.Close()
420+
}
416421
if errors.Is(err, context.Canceled) {
417422
_ = os.Remove(fullPath)
418423
}
419424
}()
420-
err = utils.CopyWithCtx(ctx, out, stream, stream.GetSize(), up)
425+
426+
var info *casUploadInfo
427+
if d.shouldUploadCAS(stream.GetName()) {
428+
casHasher := newCASHasherWriter()
429+
err = utils.CopyWithCtx(ctx, io.MultiWriter(out, casHasher), stream, stream.GetSize(), up)
430+
if err == nil {
431+
info = casHasher.Info(stream.GetName())
432+
}
433+
} else {
434+
err = utils.CopyWithCtx(ctx, out, stream, stream.GetSize(), up)
435+
}
421436
if err != nil {
422437
return err
423438
}
439+
if err = out.Close(); err != nil {
440+
return err
441+
}
442+
closed = true
424443
err = os.Chtimes(fullPath, stream.ModTime(), stream.ModTime())
425444
if err != nil {
426445
log.Errorf("[local] failed to change time of %s: %s", fullPath, err)
427446
}
428-
if d.directoryMap.Has(dstDir.GetPath()) {
429-
d.directoryMap.UpdateDirSize(dstDir.GetPath())
430-
d.directoryMap.UpdateDirParents(dstDir.GetPath())
447+
d.updateDirSize(dstDir.GetPath())
448+
if err = d.uploadCAS(ctx, dstDir, info); err != nil {
449+
return err
450+
}
451+
if err = d.deleteSource(ctx, fullPath, info); err != nil {
452+
return err
431453
}
432454

433455
return nil

drivers/local/meta.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@ import (
77

88
type Addition struct {
99
driver.RootPath
10-
DirectorySize bool `json:"directory_size" default:"false" help:"This might impact host performance"`
11-
Thumbnail bool `json:"thumbnail" required:"true" help:"enable thumbnail"`
12-
ThumbCacheFolder string `json:"thumb_cache_folder"`
13-
ThumbConcurrency string `json:"thumb_concurrency" default:"16" required:"false" help:"Number of concurrent thumbnail generation goroutines. This controls how many thumbnails can be generated in parallel."`
14-
VideoThumbPos string `json:"video_thumb_pos" default:"20%" required:"false" help:"The position of the video thumbnail. If the value is a number (integer ot floating point), it represents the time in seconds. If the value ends with '%', it represents the percentage of the video duration."`
15-
ShowHidden bool `json:"show_hidden" default:"true" required:"false" help:"show hidden directories and files"`
16-
MkdirPerm string `json:"mkdir_perm" default:"777"`
17-
RecycleBinPath string `json:"recycle_bin_path" default:"delete permanently" help:"path to recycle bin, delete permanently if empty or keep 'delete permanently'"`
10+
DirectorySize bool `json:"directory_size" default:"false" help:"This might impact host performance"`
11+
Thumbnail bool `json:"thumbnail" required:"true" help:"enable thumbnail"`
12+
ThumbCacheFolder string `json:"thumb_cache_folder"`
13+
ThumbConcurrency string `json:"thumb_concurrency" default:"16" required:"false" help:"Number of concurrent thumbnail generation goroutines. This controls how many thumbnails can be generated in parallel."`
14+
VideoThumbPos string `json:"video_thumb_pos" default:"20%" required:"false" help:"The position of the video thumbnail. If the value is a number (integer ot floating point), it represents the time in seconds. If the value ends with '%', it represents the percentage of the video duration."`
15+
ShowHidden bool `json:"show_hidden" default:"true" required:"false" help:"show hidden directories and files"`
16+
MkdirPerm string `json:"mkdir_perm" default:"777"`
17+
RecycleBinPath string `json:"recycle_bin_path" default:"delete permanently" help:"path to recycle bin, delete permanently if empty or keep 'delete permanently'"`
18+
GenerateCAS bool `json:"generate_cas" help:"上传文件后,在同目录生成一个同名的 .cas 元数据文件"`
19+
DeleteSource bool `json:"delete_source" help:"成功生成 .cas 文件后,自动删除原始源文件"`
20+
GenerateCASAndDeleteSource bool `json:"generate_cas_and_delete_source" ignore:"true"`
1821
}
1922

2023
var config = driver.Config{

0 commit comments

Comments
 (0)