Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdcp result chunked upload #4662

Merged
merged 3 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ STATISTICS:
-mp, -metrics-port int port to expose nuclei metrics on (default 9092)

CLOUD:
-auth configure projectdiscovery cloud (pdcp) api key
-cup, -cloud-upload upload scan results to pdcp dashboard
-auth configure projectdiscovery cloud (pdcp) api key
-cup, -cloud-upload upload scan results to pdcp dashboard
-sid, -scan-id string upload scan results to given scan id


EXAMPLES:
Expand Down
7 changes: 4 additions & 3 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,10 @@ UNCOVER引擎:
-si, -stats-inerval int 显示统计信息更新的间隔秒数(默认:5)
-mp, -metrics-port int 更改metrics服务的端口(默认:9092)

云服务:
-auth 配置projectdiscovery云(pdcp)API密钥
-cup, -cloud-upload 将扫描结果上传到pdcp仪表板
云服务:
-auth 配置projectdiscovery云服务(pdcp)API密钥
-cup, -cloud-upload 将扫描结果上传到pdcp仪表板
-sid, -scan-id string 将扫描结果上传到指定的扫描ID

例子:
扫描一个单独的URL:
Expand Down
5 changes: 3 additions & 2 deletions README_ID.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,9 @@ STATISTICS:
-mp, -metrics-port int port to expose nuclei metrics on (default 9092)

CLOUD:
-auth configure projectdiscovery cloud (pdcp) api key
-cup, -cloud-upload upload scan results to pdcp dashboard
-auth configure projectdiscovery cloud (pdcp) api key
-cup, -cloud-upload upload scan results to pdcp dashboard
-sid, -scan-id string upload scan results to given scan id


EXAMPLES:
Expand Down
5 changes: 3 additions & 2 deletions README_KR.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,9 @@ STATISTICS:
-mp, -metrics-port int nuclei 메트릭스를 노출할 포트 (기본값 9092)

CLOUD:
-auth projectdiscovery cloud (pdcp) api 키 설정
-cup, -cloud-upload 스캔 결과를 pdcp 대시보드에 업로드
-auth projectdiscovery 클라우드 (pdcp) API 키 구성
-cup, -cloud-upload 스캔 결과를 pdcp 대시보드에 업로드
-sid, -scan-id string 주어진 스캔 ID에 스캔 결과 업로드


예시:
Expand Down
7 changes: 7 additions & 0 deletions cmd/nuclei/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func main() {
defer cancel()
stackMonitor.RegisterCallback(func(dumpID string) error {
resumeFileName := fmt.Sprintf("crash-resume-file-%s.dump", dumpID)
if options.EnableCloudUpload {
gologger.Info().Msgf("Uploading scan results to cloud...")
}
nucleiRunner.Close()
gologger.Info().Msgf("Creating resume file: %s\n", resumeFileName)
err := nucleiRunner.SaveResumeConfig(resumeFileName)
Expand All @@ -143,6 +146,9 @@ func main() {
for range c {
gologger.Info().Msgf("CTRL+C pressed: Exiting\n")
gologger.Info().Msgf("Attempting graceful shutdown...")
if options.EnableCloudUpload {
gologger.Info().Msgf("Uploading scan results to cloud...")
}
nucleiRunner.Close()
if options.ShouldSaveResume() {
gologger.Info().Msgf("Creating resume file: %s\n", resumeFileName)
Expand Down Expand Up @@ -380,6 +386,7 @@ on extensive configurability, massive extensibility and ease of use.`)
flagSet.CreateGroup("cloud", "Cloud",
flagSet.BoolVar(&pdcpauth, "auth", false, "configure projectdiscovery cloud (pdcp) api key"),
flagSet.BoolVarP(&options.EnableCloudUpload, "cloud-upload", "cup", false, "upload scan results to pdcp dashboard"),
flagSet.StringVarP(&options.ScanID, "scan-id", "sid", "", "upload scan results to given scan id"),
)

flagSet.SetCustomHelpText(`EXAMPLES:
Expand Down
18 changes: 18 additions & 0 deletions internal/pdcp/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package pdcp

import (
pdcpauth "github.com/projectdiscovery/utils/auth/pdcp"
urlutil "github.com/projectdiscovery/utils/url"
)

func getScanDashBoardURL(id string) string {
ux, _ := urlutil.Parse(pdcpauth.DashBoardURL)
ux.Path = "/scans/" + id
ux.Update()
return ux.String()
}

type uploadResponse struct {
ID string `json:"id"`
Message string `json:"message"`
}
224 changes: 149 additions & 75 deletions internal/pdcp/writer.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package pdcp

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"sync/atomic"
"time"

"github.com/projectdiscovery/gologger"
"github.com/projectdiscovery/nuclei/v3/pkg/catalog/config"
"github.com/projectdiscovery/nuclei/v3/pkg/output"
"github.com/projectdiscovery/retryablehttp-go"
pdcpauth "github.com/projectdiscovery/utils/auth/pdcp"
errorutil "github.com/projectdiscovery/utils/errors"
fileutil "github.com/projectdiscovery/utils/file"
folderutil "github.com/projectdiscovery/utils/folder"
urlutil "github.com/projectdiscovery/utils/url"
)

const (
uploadEndpoint = "/v1/scans/import"
appendEndpoint = "/v1/scans/%s/import"
flushTimer = time.Duration(1) * time.Minute
MaxChunkSize = 1024 * 1024 * 4 // 4 MB
)

var _ output.Writer = &UploadWriter{}
Expand All @@ -34,31 +34,28 @@ var _ output.Writer = &UploadWriter{}
type UploadWriter struct {
*output.StandardWriter
creds *pdcpauth.PDCPCredentials
tempFile *os.File
done atomic.Bool
uploadURL *url.URL
client *retryablehttp.Client
cancel context.CancelFunc
done chan struct{}
scanID string
counter atomic.Int32
}

// NewUploadWriter creates a new upload writer
func NewUploadWriter(creds *pdcpauth.PDCPCredentials) (*UploadWriter, error) {
func NewUploadWriter(ctx context.Context, creds *pdcpauth.PDCPCredentials) (*UploadWriter, error) {
if creds == nil {
return nil, fmt.Errorf("no credentials provided")
}
u := &UploadWriter{creds: creds}
// create a temporary file in cache directory
cacheDir := folderutil.AppCacheDirOrDefault("", config.BinaryName)
if !fileutil.FolderExists(cacheDir) {
_ = fileutil.CreateFolder(cacheDir)
u := &UploadWriter{
creds: creds,
done: make(chan struct{}, 1),
}

var err error
// tempfile is created in nuclei-results-<unix-timestamp>.json format
u.tempFile, err = os.OpenFile(filepath.Join(cacheDir, "nuclei-results-"+strconv.Itoa(int(time.Now().Unix()))+".json"), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, errorutil.NewWithErr(err).Msgf("could not create temporary file")
}
reader, writer := io.Pipe()
// create standard writer
u.StandardWriter, err = output.NewWriter(
output.WithWriter(u.tempFile),
output.WithWriter(writer),
output.WithJson(true, true),
)
if err != nil {
Expand All @@ -71,87 +68,164 @@ func NewUploadWriter(creds *pdcpauth.PDCPCredentials) (*UploadWriter, error) {
tmp.Path = uploadEndpoint
tmp.Update()
u.uploadURL = tmp.URL

// create http client
opts := retryablehttp.DefaultOptionsSingle
opts.NoAdjustTimeout = true
opts.Timeout = time.Duration(3) * time.Minute
u.client = retryablehttp.NewClient(opts)

// create context
ctx, u.cancel = context.WithCancel(ctx)
// start auto commit
// upload every 1 minute or when buffer is full
go u.autoCommit(ctx, reader)
return u, nil
}

type uploadResponse struct {
ID string `json:"id"`
Message string `json:"message"`
// SetScanID sets the scan id for the upload writer
func (u *UploadWriter) SetScanID(id string) {
u.scanID = id
}

// Upload uploads the results to pdcp server
func (u *UploadWriter) Upload() {
defer u.done.Store(true)

_ = u.tempFile.Sync()
info, err := u.tempFile.Stat()
if err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
return
func (u *UploadWriter) autoCommit(ctx context.Context, r *io.PipeReader) {
reader := bufio.NewReader(r)
ch := make(chan string, 4)

// continuously read from the reader and send to channel
go func() {
defer r.Close()
defer close(ch)
for {
data, err := reader.ReadString('\n')
if err != nil {
return
}
u.counter.Add(1)
ch <- data
}
}()

// wait for context to be done
defer func() {
u.done <- struct{}{}
close(u.done)
// if no scanid is generated no results were uploaded
if u.scanID == "" {
gologger.Verbose().Msgf("Scan results upload to cloud skipped, no results found to upload")
} else {
gologger.Info().Msgf("%v Scan results uploaded to cloud, you can view scan results at %v", u.counter.Load(), getScanDashBoardURL(u.scanID))
}
}()
// temporary buffer to store the results
buff := &bytes.Buffer{}
ticker := time.NewTicker(flushTimer)

for {
select {
case <-ctx.Done():
// flush before exit
if buff.Len() > 0 {
if err := u.uploadChunk(buff); err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
}
}
return
case <-ticker.C:
// flush the buffer
if buff.Len() > 0 {
if err := u.uploadChunk(buff); err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
}
}
case line, ok := <-ch:
if !ok {
if buff.Len() > 0 {
if err := u.uploadChunk(buff); err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
}
}
return
}
if buff.Len()+len(line) > MaxChunkSize {
// flush existing buffer
if err := u.uploadChunk(buff); err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
}
} else {
buff.WriteString(line)
}
}
}
if info.Size() == 0 {
gologger.Verbose().Msgf("Scan results upload to cloud skipped, no results found to upload")
return
}
_, _ = u.tempFile.Seek(0, 0)
}

id, err := u.upload()
if err != nil {
gologger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
return
// uploadChunk uploads a chunk of data to the server
func (u *UploadWriter) uploadChunk(buff *bytes.Buffer) error {
if err := u.upload(buff.Bytes()); err != nil {
return errorutil.NewWithErr(err).Msgf("could not upload chunk")
}
gologger.Info().Msgf("Scan results uploaded! View them at %v", getScanDashBoardURL(id))
// if successful, reset the buffer
buff.Reset()
// log in verbose mode
gologger.Warning().Msgf("Uploaded results chunk, you can view scan results at %v", getScanDashBoardURL(u.scanID))
return nil
}

func (u *UploadWriter) upload() (string, error) {
req, err := retryablehttp.NewRequest(http.MethodPost, u.uploadURL.String(), u.tempFile)
func (u *UploadWriter) upload(data []byte) error {
req, err := u.getRequest(data)
if err != nil {
return "", errorutil.NewWithErr(err).Msgf("could not create cloud upload request")
return errorutil.NewWithErr(err).Msgf("could not create upload request")
}
req.Header.Set(pdcpauth.ApiKeyHeaderName, u.creds.APIKey)
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Accept", "application/json")

opts := retryablehttp.DefaultOptionsSingle
// we are uploading nuclei results which can be large
// server has a size limit of ~20ish MB
opts.Timeout = time.Duration(3) * time.Minute
client := retryablehttp.NewClient(opts)
resp, err := client.Do(req)
resp, err := u.client.Do(req)
if err != nil {
return "", errorutil.NewWithErr(err).Msgf("could not upload results")
return errorutil.NewWithErr(err).Msgf("could not upload results")
}
defer resp.Body.Close()
bin, err := io.ReadAll(resp.Body)
if err != nil {
return "", errorutil.NewWithErr(err).Msgf("could not get id from response")
return errorutil.NewWithErr(err).Msgf("could not get id from response")
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("could not upload results got status code %v", resp.StatusCode)
return fmt.Errorf("could not upload results got status code %v on %v", resp.StatusCode, resp.Request.URL.String())
}
var uploadResp uploadResponse
if err := json.Unmarshal(bin, &uploadResp); err != nil {
return "", errorutil.NewWithErr(err).Msgf("could not unmarshal response got %v", string(bin))
return errorutil.NewWithErr(err).Msgf("could not unmarshal response got %v", string(bin))
}
u.removeTempFile()
return uploadResp.ID, nil
if uploadResp.ID != "" && u.scanID == "" {
u.scanID = uploadResp.ID
}
return nil
}

// removeTempFile removes the temporary file
func (u *UploadWriter) removeTempFile() {
_ = os.Remove(u.tempFile.Name())
// getRequest returns a new request for upload
// if scanID is not provided create new scan by uploading the data
// if scanID is provided append the data to existing scan
func (u *UploadWriter) getRequest(bin []byte) (*retryablehttp.Request, error) {
var method, url string

if u.scanID == "" {
u.uploadURL.Path = uploadEndpoint
method = http.MethodPost
url = u.uploadURL.String()
} else {
u.uploadURL.Path = fmt.Sprintf(appendEndpoint, u.scanID)
method = http.MethodPatch
url = u.uploadURL.String()
}
req, err := retryablehttp.NewRequest(method, url, bytes.NewReader(bin))
if err != nil {
return nil, errorutil.NewWithErr(err).Msgf("could not create cloud upload request")
}
req.Header.Set(pdcpauth.ApiKeyHeaderName, u.creds.APIKey)
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Accept", "application/json")
return req, nil
}

// Close closes the upload writer
func (u *UploadWriter) Close() {
if !u.done.Load() {
u.Upload()
}
}

func getScanDashBoardURL(id string) string {
ux, _ := urlutil.Parse(pdcpauth.DashBoardURL)
ux.Path = "/scans/" + id
ux.Update()
return ux.String()
u.cancel()
<-u.done
u.StandardWriter.Close()
}