Skip to content

Commit

Permalink
avoid reusing context object
Browse files Browse the repository at this point in the history
fix #1182
  • Loading branch information
chrislusf committed Feb 26, 2020
1 parent bd3254b commit 892e726
Show file tree
Hide file tree
Showing 86 changed files with 501 additions and 568 deletions.
6 changes: 3 additions & 3 deletions weed/command/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}

b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", strings.Split(*b.masters, ","))
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()

Expand Down Expand Up @@ -314,8 +314,8 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}

func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) {
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
fileGetClient, err := client.FileGet(ctx, &volume_server_pb.FileGetRequest{FileId: fid})
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
fileGetClient, err := client.FileGet(context.Background(), &volume_server_pb.FileGetRequest{FileId: fid})
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions weed/command/filer_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func runCopy(cmd *Command, args []string) bool {
}

func readFilerConfiguration(ctx context.Context, grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) {
err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
Expand Down Expand Up @@ -257,13 +257,13 @@ func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask)
}

if chunkCount == 1 {
return worker.uploadFileAsOne(ctx, task, f)
return worker.uploadFileAsOne(task, f)
}

return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
}

func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {

// upload the file content
fileName := filepath.Base(f.Name())
Expand All @@ -276,7 +276,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
if task.fileSize > 0 {

// assign a volume
err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {

request := &filer_pb.AssignVolumeRequest{
Count: 1,
Expand All @@ -286,7 +286,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
ParentPath: task.destinationUrlPath,
}

assignResult, assignError = client.AssignVolume(ctx, request)
assignResult, assignError = client.AssignVolume(context.Background(), request)
if assignError != nil {
return fmt.Errorf("assign volume failure %v: %v", request, assignError)
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
}

if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Expand All @@ -342,7 +342,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
},
}

if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
Expand All @@ -353,7 +353,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
return nil
}

func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {

fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
Expand All @@ -377,7 +377,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
// assign a volume
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
Expand All @@ -386,7 +386,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
ParentPath: task.destinationUrlPath,
}

assignResult, assignError = client.AssignVolume(ctx, request)
assignResult, assignError = client.AssignVolume(context.Background(), request)
if assignError != nil {
return fmt.Errorf("assign volume failure %v: %v", request, assignError)
}
Expand Down Expand Up @@ -449,7 +449,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
return uploadError
}

if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Expand All @@ -470,7 +470,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
},
}

if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
Expand Down Expand Up @@ -499,9 +499,9 @@ func detectMimeType(f *os.File) string {
return mimeType
}

func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
func withFilerClient(filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {

return util.WithCachedGrpcClient(ctx, func(ctx context.Context, clientConn *grpc.ClientConn) error {
return util.WithCachedGrpcClient(func(clientConn *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(clientConn)
return fn(client)
}, filerAddress, grpcDialOption)
Expand Down
2 changes: 1 addition & 1 deletion weed/command/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s3opt *S3Options) startS3Server() bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
ctx := context.Background()

err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
Expand Down
2 changes: 1 addition & 1 deletion weed/filer2/filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Filer struct {
func NewFiler(masters []string, grpcDialOption grpc.DialOption, bucketFolder string) *Filer {
f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
}
Expand Down
20 changes: 10 additions & 10 deletions weed/filer2/filer_client_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ func VolumeId(fileId string) string {
}

type FilerClient interface {
WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error
WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
}

func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
var vids []string
for _, chunkView := range chunkViews {
vids = append(vids, VolumeId(chunkView.FileId))
}

vid2Locations := make(map[string]*filer_pb.Locations)

err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: vids,
})
if err != nil {
Expand Down Expand Up @@ -93,19 +93,19 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath F
return
}

func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {

dir, name := fullFilePath.DirAndName()

err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
}

// glog.V(3).Infof("read %s request: %v", fullFilePath, request)
resp, err := client.LookupDirectoryEntry(ctx, request)
resp, err := client.LookupDirectoryEntry(context.Background(), request)
if err != nil {
if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
return nil
Expand All @@ -126,9 +126,9 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath FullPat
return
}

func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {

err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

lastEntryName := ""

Expand All @@ -140,7 +140,7 @@ func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath
}

glog.V(3).Infof("read directory: %v", request)
stream, err := client.ListEntries(ctx, request)
stream, err := client.ListEntries(context.Background(), request)
if err != nil {
return fmt.Errorf("list %s: %v", fullDirPath, err)
}
Expand Down
8 changes: 4 additions & 4 deletions weed/filer2/filer_delete_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs
}
if isCollection {
collectionName := entry.Name()
f.doDeleteCollection(ctx, collectionName)
f.doDeleteCollection(collectionName)
f.deleteBucket(collectionName)
}

Expand Down Expand Up @@ -110,10 +110,10 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
return nil
}

func (f *Filer) doDeleteCollection(ctx context.Context, collectionName string) (err error) {
func (f *Filer) doDeleteCollection(collectionName string) (err error) {

return f.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: collectionName,
})
if err != nil {
Expand Down
40 changes: 20 additions & 20 deletions weed/filesys/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
},
OExcl: req.Flags&fuse.OpenExclusive != 0,
}
glog.V(1).Infof("create: %v", req.String())
glog.V(1).Infof("create %s/%s: %v", dir.Path, req.Name, req.Flags)

if err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if err := filer_pb.CreateEntry(client, request); err != nil {
if strings.Contains(err.Error(), "EEXIST") {
return fuse.EEXIST
}
Expand Down Expand Up @@ -167,15 +167,15 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
},
}

err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

request := &filer_pb.CreateEntryRequest{
Directory: dir.Path,
Entry: newEntry,
}

glog.V(1).Infof("mkdir: %v", request)
if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("mkdir %s/%s: %v", dir.Path, req.Name, err)
return err
}
Expand All @@ -200,7 +200,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.

if entry == nil {
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath)
entry, err = filer2.GetEntry(dir.wfs, fullFilePath)
if err != nil {
glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
Expand Down Expand Up @@ -239,7 +239,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {

cacheTtl := 5 * time.Minute

readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, filer2.FullPath(dir.Path), "", func(entry *filer_pb.Entry, isLast bool) {
readErr := filer2.ReadDirAllEntries(dir.wfs, filer2.FullPath(dir.Path), "", func(entry *filer_pb.Entry, isLast bool) {
fullpath := filer2.NewFullPath(dir.Path, entry.Name)
inode := fullpath.AsInode()
if entry.IsDirectory {
Expand All @@ -262,29 +262,29 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {

if !req.Dir {
return dir.removeOneFile(ctx, req)
return dir.removeOneFile(req)
}

return dir.removeFolder(ctx, req)
return dir.removeFolder(req)

}

func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error {
func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {

filePath := filer2.NewFullPath(dir.Path, req.Name)
entry, err := filer2.GetEntry(ctx, dir.wfs, filePath)
entry, err := filer2.GetEntry(dir.wfs, filePath)
if err != nil {
return err
}
if entry == nil {
return nil
}

dir.wfs.deleteFileChunks(ctx, entry.Chunks)
dir.wfs.deleteFileChunks(entry.Chunks)

dir.wfs.cacheDelete(filePath)

return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

request := &filer_pb.DeleteEntryRequest{
Directory: dir.Path,
Expand All @@ -293,7 +293,7 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro
}

glog.V(3).Infof("remove file: %v", request)
_, err := client.DeleteEntry(ctx, request)
_, err := client.DeleteEntry(context.Background(), request)
if err != nil {
glog.V(3).Infof("not found remove file %s/%s: %v", dir.Path, req.Name, err)
return fuse.ENOENT
Expand All @@ -304,11 +304,11 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro

}

func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error {
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {

dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.Name))

return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

request := &filer_pb.DeleteEntryRequest{
Directory: dir.Path,
Expand All @@ -317,7 +317,7 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error
}

glog.V(3).Infof("remove directory entry: %v", request)
_, err := client.DeleteEntry(ctx, request)
_, err := client.DeleteEntry(context.Background(), request)
if err != nil {
glog.V(3).Infof("not found remove %s/%s: %v", dir.Path, req.Name, err)
return fuse.ENOENT
Expand Down Expand Up @@ -419,7 +419,7 @@ func (dir *Dir) Forget() {
func (dir *Dir) maybeLoadEntry(ctx context.Context) error {
if dir.entry == nil {
parentDirPath, name := filer2.FullPath(dir.Path).DirAndName()
entry, err := dir.wfs.maybeLoadEntry(ctx, parentDirPath, name)
entry, err := dir.wfs.maybeLoadEntry(parentDirPath, name)
if err != nil {
return err
}
Expand All @@ -432,15 +432,15 @@ func (dir *Dir) saveEntry(ctx context.Context) error {

parentDir, name := filer2.FullPath(dir.Path).DirAndName()

return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

request := &filer_pb.UpdateEntryRequest{
Directory: parentDir,
Entry: dir.entry,
}

glog.V(1).Infof("save dir entry: %v", request)
_, err := client.UpdateEntry(ctx, request)
_, err := client.UpdateEntry(context.Background(), request)
if err != nil {
glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err)
return fuse.EIO
Expand Down
Loading

0 comments on commit 892e726

Please sign in to comment.