Skip to content

Commit

Permalink
describe recursive wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jonesrussell committed Feb 27, 2024
1 parent 6d986c0 commit 23e6c9c
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 31 deletions.
20 changes: 16 additions & 4 deletions cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"context"

"github.com/jonesrussell/page-prowler/internal/common"
"github.com/jonesrussell/page-prowler/internal/crawler"
"github.com/jonesrussell/page-prowler/internal/worker"
Expand All @@ -10,11 +12,21 @@ import (
var workerCmd = &cobra.Command{
Use: "worker",
Short: "Start the Asynq worker",
Run: func(cmd *cobra.Command, args []string) {
concurrency := 10 // Replace with the concurrency level you want
manager := cmd.Context().Value(common.CrawlManagerKey).(*crawler.CrawlManager)
worker.StartWorker(concurrency, manager, Debug)
PreRunE: func(cmd *cobra.Command, args []string) error {
w := worker.NewCrawlerWorker() // Create a new CrawlerWorker instance
cmd.SetContext(context.WithValue(cmd.Context(), common.CrawlerWorkerKey, w)) // Set the CrawlerWorker in the context
return nil
},
RunE: CrawlerWorkerCmdRun,
}

func CrawlerWorkerCmdRun(cmd *cobra.Command, args []string) error {
concurrency := 10 // Replace with the concurrency level you want
manager := cmd.Context().Value(common.CrawlManagerKey).(crawler.CrawlManagerInterface)

w := cmd.Context().Value(common.CrawlerWorkerKey).(worker.CrawlerWorkerInterface) // Retrieve the CrawlerWorker from the context
w.StartCrawlerWorker(concurrency, manager, Debug) // Start the CrawlerWorker
return nil
}

func init() {
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ=
github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0=
Expand All @@ -151,8 +150,6 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/vektra/mockery v1.1.2 h1:uc0Yn67rJpjt8U/mAZimdCKn9AeA97BOkjpmtBSlfP4=
github.com/vektra/mockery v1.1.2/go.mod h1:VcfZjKaFOPO+MpN4ZvwPjs4c48lkq1o3Ym8yHZJu0jU=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand All @@ -161,7 +158,6 @@ github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk=
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80=
Expand All @@ -182,14 +178,12 @@ golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDf
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE=
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
Expand All @@ -201,7 +195,6 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -245,7 +238,6 @@ golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200323144430-8dcfad9e016e/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
Expand Down
10 changes: 8 additions & 2 deletions internal/common/types.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package common

import "github.com/jonesrussell/page-prowler/internal/crawler"
import (
"github.com/jonesrussell/page-prowler/internal/crawler"
"github.com/jonesrussell/page-prowler/internal/worker"
)

type CrawlManagerKeyType = crawler.CrawlManagerInterface
type CrawlerWorkerKeyType = worker.CrawlerWorkerInterface

var crawlManagerKey crawler.CrawlManagerInterface
var crawlerWorkerKey worker.CrawlerWorkerInterface

// CrawlManagerKey is the key for storing and retrieving the CrawlManagerInterface from the context.
var CrawlManagerKey = &crawlManagerKey
var CrawlerWorkerKey = &crawlerWorkerKey

const CrawlManagerKeyStr = "crawlManagerKey"
const CrawlerWorkerKeyStr = "crawlerWorkerKey"
61 changes: 44 additions & 17 deletions internal/worker/crawl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,89 @@ import (
"github.com/jonesrussell/page-prowler/internal/tasks"
)

// AsynqLoggerWrapper wraps the logger to provide different log levels.
type AsynqLoggerWrapper struct {
logger logger.Logger
}

// Debug logs debug level messages.
func (l *AsynqLoggerWrapper) Debug(args ...interface{}) {
l.logger.Debug(fmt.Sprint(args...), nil)
}

// Info logs info level messages.
func (l *AsynqLoggerWrapper) Info(args ...interface{}) {
l.logger.Info(fmt.Sprint(args...), nil)
}

// Warn logs warning level messages.
func (l *AsynqLoggerWrapper) Warn(args ...interface{}) {
l.logger.Warn(fmt.Sprint(args...), nil)
}

// Error logs error level messages.
func (l *AsynqLoggerWrapper) Error(args ...interface{}) {
l.logger.Error(fmt.Sprint(args...), nil)
}

// Fatal logs fatal level messages.
func (l *AsynqLoggerWrapper) Fatal(args ...interface{}) {
l.logger.Fatal(fmt.Sprint(args...), nil)
}

// Define an interface for the worker
type WorkerInterface interface {
StartWorker(concurrency int, crawlerService crawler.CrawlManagerInterface, debug bool)
// CrawlerWorkerInterface defines the methods a CrawlerWorker should have.
type CrawlerWorkerInterface interface {
StartCrawlerWorker(
concurrency int,
crawlManager crawler.CrawlManagerInterface,
debug bool,
)
}

// Define a struct that implements the Workerinterface
type Worker struct {
// CrawlerWorker is a struct that implements the CrawlerWorkerInterface.
type CrawlerWorker struct {
}

// Implement the StartWorker method for the crawlWorkerImpl struct
func (w *Worker) StartWorker(concurrency int, crawlerService crawler.CrawlManagerInterface, debug bool) {
// StartCrawlerWorker starts the worker with the specified concurrency level.
func (w *CrawlerWorker) StartCrawlerWorker(
concurrency int,
crawlManager crawler.CrawlManagerInterface,
debug bool,
) {
cm, ok := crawlManager.(*crawler.CrawlManager)
if !ok {
cm.Logger().Error("failed to assert type of crawlManager", nil)
return
}

// Initialize a new Asynq server with the default settings.
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: crawlerService.(*crawler.CrawlManager).Client.Options().Addr,
Password: crawlerService.(*crawler.CrawlManager).Client.Options().Password,
DB: crawlerService.(*crawler.CrawlManager).Client.Options().DB,
Addr: cm.Client.Options().Addr,
Password: cm.Client.Options().Password,
DB: cm.Client.Options().DB,
},
asynq.Config{
Concurrency: concurrency,
Logger: &AsynqLoggerWrapper{logger: crawlerService.Logger()}, // Use the Logger from CrawlManager
Logger: &AsynqLoggerWrapper{logger: cm.Logger()}, // Use the Logger from CrawlManager
},
)

// mux maps a task type to a handler
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.CrawlTaskType, func(ctx context.Context, task *asynq.Task) error {
return handleCrawlTask(ctx, task, crawlerService.(*crawler.CrawlManager), debug)
return handleCrawlTask(ctx, task, cm, debug)
})

// Run the server with the handler mux.
if err := srv.Run(mux); err != nil {
crawlerService.Logger().Fatal("could not run server", map[string]interface{}{"error": err})
cm.Logger().Fatal("could not run server", map[string]interface{}{"error": err})
}
}

// NewWorker returns a new instance of CrawlWorker
func NewWorker() Worker {
return Worker{}
// NewCrawlerWorker returns a new instance of CrawlerWorker that implements CrawlerWorkerInterface.
func NewCrawlerWorker() CrawlerWorkerInterface {
return &CrawlerWorker{}
}

func handleCrawlTask(ctx context.Context, task *asynq.Task, crawlerService *crawler.CrawlManager, debug bool) error {
Expand All @@ -83,5 +103,12 @@ func handleCrawlTask(ctx context.Context, task *asynq.Task, crawlerService *craw
return err
}

return crawlerService.StartCrawling(ctx, payload.URL, payload.SearchTerms, payload.CrawlSiteID, payload.MaxDepth, debug)
return crawlerService.StartCrawling(
ctx,
payload.URL,
payload.SearchTerms,
payload.CrawlSiteID,
payload.MaxDepth,
debug,
)
}

0 comments on commit 23e6c9c

Please sign in to comment.