-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
168 lines (143 loc) · 4.25 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package worker
import (
"errors"
"fmt"
"log"
"strconv"
"strings"
"sync"
"time"
"github.com/musabgultekin/quantumscraper/http"
"github.com/musabgultekin/quantumscraper/metrics"
"github.com/musabgultekin/quantumscraper/urlloader"
"github.com/prometheus/client_golang/prometheus"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
var hostURLsQueue = make(chan []string, 1000)
var foundLinks = make(map[string]struct{})
var foundLinksChan = make(chan map[string]struct{}, 5000)
var logger, _ = zap.NewDevelopment()
type Worker struct {
id int
rateLimiter *rate.Limiter
wg *sync.WaitGroup
}
func NewWorker(id int, wg *sync.WaitGroup) (*Worker, error) {
rateLimiter := rate.NewLimiter(0.5, 1)
return &Worker{id: id, rateLimiter: rateLimiter, wg: wg}, nil
}
func (worker *Worker) Work() error {
defer worker.wg.Done()
for hostUrlList := range hostURLsQueue {
for _, targetURL := range hostUrlList {
if err := worker.HandleUrl(targetURL); err != nil {
if strings.Contains(err.Error(), "no such host") {
break // Since we dont have the host anymore, no need to continue
}
if strings.Contains(err.Error(), "could not connect to proxy:") &&
strings.Contains(err.Error(), "status code: 403") {
continue
}
if strings.Contains(err.Error(), "status not 200") {
continue
}
if strings.Contains(err.Error(), "not HTML") {
continue
}
if errors.Is(err, fasthttp.ErrConnectionClosed) {
continue
}
// log.Println("handle url:", err, targetURL)
// logger.Error("handle url", zap.Error(err))
logger.Debug(err.Error(), zap.String("url", targetURL))
continue
}
}
}
return nil
}
func (worker *Worker) HandleUrl(targetURL string) error {
// if err := worker.rateLimiter.Wait(context.TODO()); err != nil {
// panic(err) // This should never happen, but we need to know if it happens.
// }
// log.Println("Fetching", targetURL)
// logger.Debug("Fetching", zap.String("url", targetURL))
requestStartTime := time.Now()
metrics.RequestInFlightCount.Inc()
resp, status, err := http.GetFast(targetURL)
metrics.RequestInFlightCount.Dec()
metrics.RequestCount.With(prometheus.Labels{"code": strconv.Itoa(status)}).Inc()
metrics.RequestLatency.With(prometheus.Labels{"code": strconv.Itoa(status)}).Observe(time.Since(requestStartTime).Seconds())
// if status == fasthttp.StatusTooManyRequests {
// time.Sleep(time.Second * 5)
// }
if err != nil {
return fmt.Errorf("http get err: %w", err)
}
links, err := extractLinksFromHTML(targetURL, resp)
if err != nil {
return fmt.Errorf("error extract links from html: %w", err)
}
foundLinksChan <- links
// if err := worker.SaveLinks(links); err != nil {
// return fmt.Errorf("save links: %w", err)
// }
// Queue new links
// _ = resp
_ = links
return nil
}
func StartWorkers(urlListURL string, urlListCachePath string, parquetDir string, wg *sync.WaitGroup, concurrency int) error {
// urlLoader, err := urlloader.New(urlListURL, urlListCachePath)
// if err != nil {
// return fmt.Errorf("url loader: %w", err)
// }
// defer urlLoader.Close()
urlLoader, err := urlloader.NewParquet(parquetDir)
if err != nil {
return fmt.Errorf("url loader: %w", err)
}
defer urlLoader.Close()
log.Println("Starting workers")
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
worker, err := NewWorker(i, wg)
if err != nil {
return fmt.Errorf("new worker: %w", err)
}
go worker.Work()
}
// Save loaded URLs
go func() {
for linksBatch := range foundLinksChan {
for link := range linksBatch {
foundLinks[link] = struct{}{}
}
length := len(foundLinks)
metrics.FoundURLsCount.Set(float64(length))
if length >= 100_000_000 {
// if err := storage.WriteLinksToFileRandomFilename(foundLinks, "data/"); err != nil {
// panic(err)
// }
foundLinks = make(map[string]struct{})
}
}
}()
log.Println("Queuing URLs for each host")
for {
urlStrings, err := urlLoader.LoadNextHostURLs()
if err != nil {
return fmt.Errorf("url loader load next domain urls: %w", err)
}
if len(urlStrings) == 0 {
break // end of file
}
hostURLsQueue <- urlStrings
}
log.Println("All URLs queued")
// All hosts queued, we can close the queue
close(hostURLsQueue)
return nil
}