-
Notifications
You must be signed in to change notification settings - Fork 0
/
gcs.py
134 lines (108 loc) · 4.24 KB
/
gcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import json
from oauth2client.client import SignedJwtAssertionCredentials
from httplib2 import Http, HttpLib2Error
from googleapiclient import discovery
from googleapiclient import http
from googleapiclient import errors
from io import BytesIO
DEFAULT_BUCKET = 'blkdev_1'
RETRYABLE_ERRORS = (HttpLib2Error, IOError)
MAX_RETRIES = 5
KB_4 = 4 * 1024
def get_gcs():
"""
Gets a handle to the Google Cloud Storage service
:return: handle to be used in subsequent calls
"""
with open('/home/wspeirs/src/cldblkdev/cloud_backup_secret.json') as f:
secrets = json.load(f)
credentials = SignedJwtAssertionCredentials(secrets['client_email'],
secrets['private_key'],
'https://www.googleapis.com/auth/devstorage.read_write')
authed_http = credentials.authorize(Http())
return discovery.build('storage', 'v1', http=authed_http)
def list(gcs, bucket=DEFAULT_BUCKET):
"""
Returns a list of all of the sectors found in the bucket
:param gcs: handle to the Google Cloud Storage service
:param bucket: optional bucket name
:return: list of sectors in the bucket
"""
ret = []
json = gcs.objects().list(bucket=bucket, projection='noAcl').execute()
if 'items' in json:
ret += [int(x['name']) for x in json['items']]
else:
return ret
while 'nextPageToken' in json:
json = gcs.objects().list(bucket=bucket, projection='noAcl', pageToken=json['nextPageToken']).execute()
ret += [int(x['name']) for x in json['items']]
return ret
def get(gcs, file_name, bucket=DEFAULT_BUCKET, chunk_size=KB_4):
"""
Reads a file from Google Cloud Storage
:param gcs: handle to the Google Cloud Storage service
:param file_name: the file to get
:param bucket: the name of the bucket
:param chunk_size: the size of the chunks to upload
:return: an array containing the bytes of the file
"""
req = gcs.objects().get_media(bucket=bucket, object=str(file_name))
buff = BytesIO()
downloader = http.MediaIoBaseDownload(buff, req, chunksize=chunk_size)
done = False
retry_count = 0
while not done:
try:
progress, done = downloader.next_chunk()
print "Progress: %d%%\tDone: %s" % (int(progress.progress()*100), str(done))
except errors.HttpError as err:
print err
if err.resp.status == 404:
return bytearray("\0" * KB_4)
if err.resp.status < 500:
raise
except RETRYABLE_ERRORS as err:
print err
retry_count += 1
if retry_count > MAX_RETRIES:
print "Too many failures"
raise
return buff.getvalue()
def put(gcs, file_name, file_contents, bucket=DEFAULT_BUCKET, chunk_size=KB_4):
"""
Creates or updates a file in Google Cloud Storage
:param gcs: handle to the Google Cloud Storage service
:param file_name: the name of the file to create
:param file_contents: the contents of the file
:param bucket: the name of the bucket
:param chunk_size: the size of the chunks to upload
"""
buff = BytesIO(file_contents)
upload = http.MediaIoBaseUpload(buff, mimetype='binary/octet-stream', chunksize=chunk_size, resumable=True)
req = gcs.objects().insert(bucket=bucket, name=file_name, media_body=upload)
done = False
retry_count = 0
while not done:
try:
progress, done = req.next_chunk()
if progress:
print "Progress: %d%%" % (int(progress.progress()*100))
except errors.HttpError as err:
print err
if err.resp.status < 500:
raise
except RETRYABLE_ERRORS as err:
print err
retry_count += 1
if retry_count > MAX_RETRIES:
print "Too many failures"
raise
def delete(gcs, file_name, bucket=DEFAULT_BUCKET):
"""
Removes a file from the Google Cloud Storage
:param gcs: handle to the Google Cloud Storage service
:param file_name: the name of the file to delete
:param bucket: the name of the bucket
"""
gcs.objects().delete(bucket=bucket, object=file_name).execute()