This repository has been archived by the owner on Jan 31, 2018. It is now read-only.
/
rsloader.go
112 lines (91 loc) · 3.09 KB
/
rsloader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package loadclient
import (
"bytes"
"encoding/json"
"github.com/twitchscience/aws_utils/common"
"github.com/twitchscience/aws_utils/monitoring"
"github.com/twitchscience/rs_ingester/backend"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface"
"github.com/twitchscience/rs_ingester/metadata"
"github.com/twitchscience/scoop_protocol/scoop_protocol"
)
//RSLoader contains the redshift backend, stats module, and s3 bucket for the loader
type RSLoader struct {
rsBackend backend.Backend
bucket string
stats monitoring.SafeStatter
s3Uploader s3manageriface.UploaderAPI
}
//NewRSLoader returns a RSLoader instance
func NewRSLoader(s3Uploader s3manageriface.UploaderAPI, rsBackend backend.Backend, manifestBucket string, stats monitoring.SafeStatter) (Loader, error) {
return &RSLoader{
rsBackend: rsBackend,
bucket: manifestBucket,
stats: stats,
s3Uploader: s3Uploader}, nil
}
//LoadManifest takes a load manifest object and uses the RSBackend to load the manifest into redshift
func (rsl *RSLoader) LoadManifest(manifest *metadata.LoadManifest) LoadError {
start := time.Now()
manifestURL, err := rsl.CreateManifestInBucket(manifest)
if err != nil {
return &loadError{msg: err.Error(), isRetryable: true}
}
err = rsl.rsBackend.ManifestCopy(&scoop_protocol.ManifestRowCopyRequest{
ManifestURL: manifestURL,
TableName: manifest.TableName,
})
if err != nil {
return &loadError{msg: err.Error(), isRetryable: true}
}
rsl.stats.SafeTimingDuration(manifest.TableName, time.Since(start), 1.0)
return nil
}
//CheckLoad checks the status of a current manifest load into Redshift
func (rsl *RSLoader) CheckLoad(manifestUUID string) (scoop_protocol.LoadStatus, error) {
url := manifestURL(rsl.bucket, manifestUUID)
loadstatus, err := rsl.rsBackend.LoadCheck(&scoop_protocol.LoadCheckRequest{
ManifestURL: url,
})
if err != nil {
return "", err
}
return loadstatus.LoadStatus, nil
}
//HealthCheck Checks to see if the connection to Redshift is still healthy
func (rsl *RSLoader) HealthCheck() error {
return rsl.rsBackend.HealthCheck()
}
//CreateManifestInBucket takes a load manifest, converts into json, and loads it into a provided s3 bucket
func (rsl *RSLoader) CreateManifestInBucket(manifest *metadata.LoadManifest) (string, error) {
manifestJSON, err := makeManifestJSON(manifest)
if err != nil {
return "", err
}
url := manifestURL(rsl.bucket, manifest.UUID)
_, err = rsl.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(rsl.bucket),
Key: aws.String(manifest.UUID + ".json"),
Body: bytes.NewReader(manifestJSON),
})
if err != nil {
return "", err
}
return url, err
}
func makeManifestJSON(mani *metadata.LoadManifest) ([]byte, error) {
m := manifest{}
for _, k := range mani.Loads {
m.Entries = append(m.Entries,
entry{URL: common.NormalizeS3URL(k.KeyName),
Mandatory: true},
)
}
return json.Marshal(m)
}
func manifestURL(bucketName, uuid string) string {
return common.NormalizeS3URL(bucketName + "/" + uuid + ".json")
}