Skip to content
This repository has been archived by the owner on Aug 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #237 from verless/fix-plugin-concurrency
Browse files Browse the repository at this point in the history
Make ProcessPage of the plugins safe for concurrent usage.
  • Loading branch information
aligator committed Nov 19, 2020
2 parents da9b759 + 1f9d270 commit 83e8762
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 16 deletions.
51 changes: 48 additions & 3 deletions core/build.go
Expand Up @@ -22,7 +22,7 @@ import (
)

const (
// parallelism specifies the number of parallel workers.
// parallelism specifies the number of parallel workers
parallelism int = 4
)

Expand Down Expand Up @@ -172,6 +172,10 @@ func (b *Build) Run() error {
}
}()

if err := b.preProcessing(); err != nil {
return err
}

wg := sync.WaitGroup{}
wg.Add(parallelism)

Expand Down Expand Up @@ -207,6 +211,18 @@ func (b *Build) Run() error {
return fmt.Errorf("errors while processing files: %v", collectedErrors)
}

if err := b.postProcessing(); err != nil {
return err
}

if err := b.render(); err != nil {
return err
}

return nil
}

func (b *Build) render() error {
site, err := b.Builder.Dispatch()
if err != nil {
return err
Expand All @@ -231,6 +247,35 @@ func (b *Build) Run() error {
return nil
}

func (b *Build) preProcessing() error {
for _, p := range b.Plugins {
prePostPlugin, ok := p.(plugin.PrePostProcessPlugin)
if !ok {
continue
}

if err := prePostPlugin.PreProcessPages(); err != nil {
return err
}
}

return nil
}

func (b *Build) postProcessing() error {
for _, p := range b.Plugins {
prePostPlugin, ok := p.(plugin.PrePostProcessPlugin)
if !ok {
continue
}

if err := prePostPlugin.PostProcessPages(); err != nil {
return err
}
}
return nil
}

func (b *Build) processFile(contentDir, file string) error {
src, err := ioutil.ReadFile(filepath.Join(contentDir, file))
if err != nil {
Expand All @@ -256,8 +301,8 @@ func (b *Build) processFile(contentDir, file string) error {
return err
}

for _, plugin := range b.Plugins {
if err := plugin.ProcessPage(&page); err != nil {
for _, p := range b.Plugins {
if err := p.ProcessPage(&page); err != nil {
return err
}
}
Expand Down
52 changes: 51 additions & 1 deletion plugin/atom/atom.go
Expand Up @@ -34,6 +34,8 @@ func New(meta *model.Meta, fs afero.Fs, outputDir string) *atom {
outputDir: outputDir,
}

a.feedItems = make(chan *feeds.Item)

return &a
}

Expand All @@ -42,8 +44,47 @@ func New(meta *model.Meta, fs afero.Fs, outputDir string) *atom {
type atom struct {
meta *model.Meta
feed *feeds.Feed
feedItems chan *feeds.Item
fs afero.Fs
outputDir string

// workerShouldStop is a channel which indicates that the worker should stop working.
// To stop it, pass true to it. The worker will close the channel.
workerShouldStop chan bool
// workerFinishedSignal gets closed by the worker to indicate that it finished all it's work.
workerFinishedSignal chan bool
}

// PreProcessPages starts a worker goroutine which handles the a.feed.Add.
// This improves speed as the ProcessPage can add new items in a non blocking way.
func (a *atom) PreProcessPages() error {
a.workerShouldStop = make(chan bool)
a.workerFinishedSignal = make(chan bool)

go func() {
defer func() {
close(a.workerShouldStop)
close(a.workerFinishedSignal)
}()

for {
select {
case shouldStop := <-a.workerShouldStop:
if shouldStop {
return
}
default:
}

select {
case feedItem := <-a.feedItems:
a.feed.Add(feedItem)
default:
}
}
}()

return nil
}

// ProcessPage takes a page to be processed by the plugin, reads
Expand All @@ -63,7 +104,16 @@ func (a *atom) ProcessPage(page *model.Page) error {
Created: page.Date,
}

a.feed.Add(item)
a.feedItems <- item
return nil
}

// PostProcessPages stops and waits for the worker from PreProcessPages to finish.
func (a *atom) PostProcessPages() error {
if a.workerShouldStop != nil {
a.workerShouldStop <- true
}
_, _ = <-a.workerFinishedSignal
return nil
}

Expand Down
12 changes: 12 additions & 0 deletions plugin/atom/atom_test.go
Expand Up @@ -38,6 +38,13 @@ func TestAtom_ProcessPage(t *testing.T) {
Base: "https://example.com",
}, afero.NewOsFs(), "")

// Note that hte ProcessPage functionality is heavily coupled with the Pre/Post methods.
// That's why we have to call it here also.
err := a.PreProcessPages()
if test.ExpectedError(t, nil, err) != test.IsCorrectNil {
return
}

for i, page := range testCase.pages {
t.Logf("process page number %v, route '%v'", i, page.Route)
err := a.ProcessPage(&page)
Expand All @@ -52,6 +59,11 @@ func TestAtom_ProcessPage(t *testing.T) {
test.Equals(t, canonicalLink, item.Link.Href)
}

err = a.PostProcessPages()
if test.ExpectedError(t, nil, err) != test.IsCorrectNil {
return
}

test.Equals(t, len(testCase.pages), len(a.feed.Items))
}
}
11 changes: 11 additions & 0 deletions plugin/plugin.go
Expand Up @@ -20,6 +20,17 @@ type Plugin interface {
PostWrite() error
}

type PrePostProcessPlugin interface {
Plugin

// PreProcessPages will be invoked before all pages get processed.
// So this will be called only once.
PreProcessPages() error
// PostProcessPages will be invoked after all pages were processed.
// So this will be called only once.
PostProcessPages() error
}

// LoadAll returns a map of all available plugins. Each entry
// is a function that returns a fully initialized plugin instance.
func LoadAll(cfg *config.Config, fs afero.Fs, outputDir string) map[string]func() Plugin {
Expand Down
7 changes: 6 additions & 1 deletion plugin/related/related.go
Expand Up @@ -2,12 +2,15 @@
package related

import (
"sync"

"github.com/verless/verless/model"
"github.com/verless/verless/tree"
)

type related struct {
pages map[string]*model.Page
pages map[string]*model.Page
pagesMutex sync.RWMutex
}

// New initializes and returns a related plugin instance.
Expand All @@ -20,7 +23,9 @@ func New() *related {
// ProcessPage adds a given pointer to a Page instance to the plugin's page
// map. This prevents that each page has to be resolved from the tre later.
func (r *related) ProcessPage(page *model.Page) error {
r.pagesMutex.Lock()
r.pages[page.Href] = page
r.pagesMutex.Unlock()
return nil
}

Expand Down
21 changes: 14 additions & 7 deletions plugin/tags/tags.go
Expand Up @@ -4,6 +4,7 @@ package tags
import (
"path/filepath"
"strings"
"sync"

"github.com/verless/verless/model"
"github.com/verless/verless/tree"
Expand All @@ -18,7 +19,7 @@ const (
// build path and outputs the tag directories to outputDir.
func New() *tags {
t := tags{
m: make(map[string]*model.ListPage),
tags: make(map[string]*model.ListPage),
}

return &t
Expand All @@ -27,21 +28,27 @@ func New() *tags {
// tags is the actual tags plugin that maintains a map with all
// tags from all processed pages.
type tags struct {
m map[string]*model.ListPage
tags map[string]*model.ListPage
tagsMutex sync.Mutex
}

// ProcessPage creates a new map entry for each tag in the processed
// page and adds the page to the entry's list page.
func (t *tags) ProcessPage(page *model.Page) error {
for _, tag := range page.Tags {
//sanitizing the tags like "Making Coffee" to "making-coffee"
// Sanitizing the tags like "Making Coffee" to "making-coffee".
tag.Name = strings.Replace(tag.Name, " ", "-", -1)
tag.Name = strings.ToLower(tag.Name)

if _, exists := t.m[tag.Name]; !exists {
t.tagsMutex.Lock()
_, tagExists := t.tags[tag.Name]

if !tagExists {
t.createListPage(tag.Name)
}
t.m[tag.Name].Pages = append(t.m[tag.Name].Pages, page)

t.tags[tag.Name].Pages = append(t.tags[tag.Name].Pages, page)
t.tagsMutex.Unlock()
}

return nil
Expand All @@ -57,7 +64,7 @@ func (t *tags) PreWrite(site *model.Site) error {
return err
}

for tag, listPage := range t.m {
for tag, listPage := range t.tags {
path := filepath.ToSlash(filepath.Join(tagsDir, tag))

node := model.NewNode()
Expand All @@ -78,7 +85,7 @@ func (t *tags) PostWrite() error {

// createListPage initializes a new list page for a given key.
func (t *tags) createListPage(key string) {
t.m[key] = &model.ListPage{
t.tags[key] = &model.ListPage{
Pages: make([]*model.Page, 0),
Page: model.Page{
Route: tagsDir + "/" + key,
Expand Down
8 changes: 4 additions & 4 deletions plugin/tags/tags_test.go
Expand Up @@ -53,11 +53,11 @@ func TestTags_ProcessPage(t *testing.T) {
}

for _, tag := range page.Tags {
taggerTag, exists := tagger.m[tag.Name]
taggerTag, exists := tagger.tags[tag.Name]

test.Assert(t, exists, "tag should exist")
test.NotEquals(t, nil, taggerTag)
test.Assert(t, len(tagger.m[tag.Name].Pages) > 0, "tag should exist")
test.Assert(t, len(tagger.tags[tag.Name].Pages) > 0, "tag should exist")
}
}
}
Expand All @@ -83,7 +83,7 @@ func TestTags_PreWrite(t *testing.T) {
t.Log(name)

tagger := New()
tagger.m = testCase.tagsListPages
tagger.tags = testCase.tagsListPages
s := model.NewSite()
err := tagger.PreWrite(&s)
if test.ExpectedError(t, testCase.expectedError, err) != test.IsCorrectNil {
Expand All @@ -94,7 +94,7 @@ func TestTags_PreWrite(t *testing.T) {
test.Equals(t, true, ok)
test.NotEquals(t, nil, tags)

for tag := range tagger.m {
for tag := range tagger.tags {
child, ok := tags.Children()[tag]
test.Equals(t, true, ok)
test.NotEquals(t, nil, child)
Expand Down

0 comments on commit 83e8762

Please sign in to comment.