Permalink
Browse files

fs: Protect filesystem access with sync.RWMutex.

Make service methods safe for concurrent use by protecting filesystem
access with a reader/writer mutex.

Similar to shurcooL/reactions@dc7624d.
  • Loading branch information...
dmitshur committed Oct 6, 2018
1 parent 176b1ea commit 627ab5aea122bf029d0f8f6c3d44e614b4a33361
Showing with 31 additions and 11 deletions.
  1. +2 −2 fs/copier.go
  2. +29 −9 fs/fs.go
View
@@ -9,9 +9,9 @@ import (
"github.com/shurcooL/users"
)
var _ notifications.CopierFrom = service{}
var _ notifications.CopierFrom = &service{}
func (s service) CopyFrom(ctx context.Context, src notifications.Service, dst users.UserSpec) error {
func (s *service) CopyFrom(ctx context.Context, src notifications.Service, dst users.UserSpec) error {
// List all accessible notifications.
ns, err := src.List(ctx, notifications.ListOptions{})
if err != nil {
View
@@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"sync"
"time"
"github.com/shurcooL/notifications"
@@ -17,19 +18,20 @@ import (
// NewService creates a virtual filesystem-backed notifications.Service,
// using root for storage.
func NewService(root webdav.FileSystem, users users.Service) notifications.Service {
return service{
return &service{
fs: root,
users: users,
}
}
type service struct {
fs webdav.FileSystem
fsMu sync.RWMutex
fs webdav.FileSystem
users users.Service
}
func (s service) List(ctx context.Context, opt notifications.ListOptions) (notifications.Notifications, error) {
func (s *service) List(ctx context.Context, opt notifications.ListOptions) (notifications.Notifications, error) {
currentUser, err := s.users.GetAuthenticatedSpec(ctx)
if err != nil {
return nil, err
@@ -38,6 +40,9 @@ func (s service) List(ctx context.Context, opt notifications.ListOptions) (notif
return nil, os.ErrPermission
}
s.fsMu.RLock()
defer s.fsMu.RUnlock()
var ns notifications.Notifications
fis, err := vfsutil.ReadDir(ctx, s.fs, notificationsDir(currentUser))
@@ -129,7 +134,7 @@ func (s service) List(ctx context.Context, opt notifications.ListOptions) (notif
return ns, nil
}
func (s service) Count(ctx context.Context, opt interface{}) (uint64, error) {
func (s *service) Count(ctx context.Context, opt interface{}) (uint64, error) {
currentUser, err := s.users.GetAuthenticatedSpec(ctx)
if err != nil {
return 0, err
@@ -138,6 +143,9 @@ func (s service) Count(ctx context.Context, opt interface{}) (uint64, error) {
return 0, os.ErrPermission
}
s.fsMu.RLock()
defer s.fsMu.RUnlock()
// TODO: Consider reading/parsing entries, in case there's .DS_Store, etc., that should be skipped?
notifications, err := vfsutil.ReadDir(ctx, s.fs, notificationsDir(currentUser))
if os.IsNotExist(err) {
@@ -148,7 +156,7 @@ func (s service) Count(ctx context.Context, opt interface{}) (uint64, error) {
return uint64(len(notifications)), nil
}
func (s service) Notify(ctx context.Context, repo notifications.RepoSpec, threadType string, threadID uint64, nr notifications.NotificationRequest) error {
func (s *service) Notify(ctx context.Context, repo notifications.RepoSpec, threadType string, threadID uint64, nr notifications.NotificationRequest) error {
currentUser, err := s.users.GetAuthenticatedSpec(ctx)
if err != nil {
return err
@@ -157,6 +165,9 @@ func (s service) Notify(ctx context.Context, repo notifications.RepoSpec, thread
return os.ErrPermission
}
s.fsMu.Lock()
defer s.fsMu.Unlock()
type subscription struct {
Participating bool
}
@@ -242,7 +253,7 @@ func (s service) Notify(ctx context.Context, repo notifications.RepoSpec, thread
return nil
}
func (s service) Subscribe(ctx context.Context, repo notifications.RepoSpec, threadType string, threadID uint64, subscribers []users.UserSpec) error {
func (s *service) Subscribe(ctx context.Context, repo notifications.RepoSpec, threadType string, threadID uint64, subscribers []users.UserSpec) error {
currentUser, err := s.users.GetAuthenticatedSpec(ctx)
if err != nil {
return err
@@ -251,6 +262,9 @@ func (s service) Subscribe(ctx context.Context, repo notifications.RepoSpec, thr
return os.ErrPermission
}
s.fsMu.Lock()
defer s.fsMu.Unlock()
for _, subscriber := range subscribers {
err := createEmptyFile(ctx, s.fs, subscriberPath(repo, threadType, threadID, subscriber))
if err != nil {
@@ -261,7 +275,7 @@ func (s service) Subscribe(ctx context.Context, repo notifications.RepoSpec, thr
return nil
}
func (s service) MarkRead(ctx context.Context, repo notifications.RepoSpec, threadType string, threadID uint64) error {
func (s *service) MarkRead(ctx context.Context, repo notifications.RepoSpec, threadType string, threadID uint64) error {
currentUser, err := s.users.GetAuthenticatedSpec(ctx)
if err != nil {
return err
@@ -270,6 +284,9 @@ func (s service) MarkRead(ctx context.Context, repo notifications.RepoSpec, thre
return os.ErrPermission
}
s.fsMu.Lock()
defer s.fsMu.Unlock()
// Return early if the notification doesn't exist, before creating readDir for currentUser.
key := notificationKey(repo, threadType, threadID)
_, err = vfsutil.Stat(ctx, s.fs, notificationPath(currentUser, key))
@@ -303,7 +320,7 @@ func (s service) MarkRead(ctx context.Context, repo notifications.RepoSpec, thre
return nil
}
func (s service) MarkAllRead(ctx context.Context, repo notifications.RepoSpec) error {
func (s *service) MarkAllRead(ctx context.Context, repo notifications.RepoSpec) error {
currentUser, err := s.users.GetAuthenticatedSpec(ctx)
if err != nil {
return err
@@ -312,6 +329,9 @@ func (s service) MarkAllRead(ctx context.Context, repo notifications.RepoSpec) e
return os.ErrPermission
}
s.fsMu.Lock()
defer s.fsMu.Unlock()
// Iterate all user's notifications.
fis, err := vfsutil.ReadDir(ctx, s.fs, notificationsDir(currentUser))
if os.IsNotExist(err) {
@@ -364,7 +384,7 @@ func (s service) MarkAllRead(ctx context.Context, repo notifications.RepoSpec) e
return nil
}
func (s service) user(ctx context.Context, user users.UserSpec) users.User {
func (s *service) user(ctx context.Context, user users.UserSpec) users.User {
u, err := s.users.Get(ctx, user)
if err != nil {
return users.User{

0 comments on commit 627ab5a

Please sign in to comment.