-
Notifications
You must be signed in to change notification settings - Fork 44
/
gs.py
153 lines (124 loc) · 4.9 KB
/
gs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from __future__ import absolute_import
from fnmatch import fnmatch
import logging
from io import BytesIO, StringIO
from ...exceptions import FileSystemNotSupported
from ...utils import Tokenizer, parse_file_uri
from .file_system import FileSystem
log = logging.getLogger(__name__)
try:
from gcloud import storage
except ImportError as e:
storage = None
class GS(FileSystem):
""":class:`.FileSystem` implementation for Google Storage.
Paths are of the form `gs://bucket_name/file_path` or
`gs://project_name:bucket_name/file_path`.
"""
#: Set a default project name.
project_name = None
#: Default mime type.
mime_type = 'text/plain'
_clients = {}
def __init__(self, file_name):
if storage is None:
raise FileSystemNotSupported(
'Google Storage is not supported. Install "gcloud".'
)
super(GS, self).__init__(file_name)
# obtain key
t = Tokenizer(self.file_name)
t.next('://') # skip scheme
bucket_name = t.next('/')
if ':' in bucket_name:
project_name, _, bucket_name = bucket_name.partition(':')
else:
project_name = GS.project_name
blob_name = t.next()
client = GS._get_client(project_name)
bucket = client.get_bucket(bucket_name)
self.blob = bucket.get_blob(blob_name)
if not self.blob:
self.blob = bucket.blob(blob_name)
@staticmethod
def _get_client(project_name):
if project_name not in GS._clients:
if storage is None:
raise FileSystemNotSupported(
'Google Storage is not supported. Install "gcloud".'
)
GS._clients[project_name] = storage.Client(project_name)
return GS._clients[project_name]
@staticmethod
def resolve_filenames(expr):
files = []
t = Tokenizer(expr)
scheme = t.next('://')
bucket_name = t.next('/')
if ':' in bucket_name:
project_name, _, bucket_name = bucket_name.partition(':')
else:
project_name = GS.project_name
prefix = t.next(['*', '?'])
bucket = GS._get_client(project_name).get_bucket(bucket_name)
expr_s = len(scheme) + 3 + len(project_name) + 1 + len(bucket_name) + 1
expr = expr[expr_s:]
for k in bucket.list_blobs(prefix=prefix):
if fnmatch(k.name, expr) or fnmatch(k.name, expr + '/part*'):
files.append('{0}://{1}:{2}/{3}'.format(
scheme, project_name, bucket_name, k.name))
return files
@staticmethod
def resolve_content(expr):
scheme, raw_bucket_name, folder_path, pattern = parse_file_uri(expr)
if ':' in raw_bucket_name:
project_name, _, bucket_name = raw_bucket_name.partition(':')
else:
project_name = GS.project_name
bucket_name = raw_bucket_name
folder_path = folder_path[1:] # Remove leading slash
expr = "{0}{1}".format(folder_path, pattern)
# Match all files inside folders that match expr
pattern_expr = "{0}{1}*".format(expr, "" if expr.endswith("/") else "/")
bucket = GS._get_client(project_name).get_bucket(bucket_name)
files = []
for k in bucket.list_blobs(prefix=folder_path):
if not k.name.endswith("/") and (
fnmatch(k.name, expr) or fnmatch(k.name, pattern_expr)
):
files.append(
'{0}://{1}/{2}'.format(scheme, raw_bucket_name, k.name)
)
return files
def exists(self):
t = Tokenizer(self.file_name)
t.next('//') # skip scheme
bucket_name = t.next('/')
if ':' in bucket_name:
project_name, _, bucket_name = bucket_name.partition(':')
else:
project_name = GS.project_name
blob_name = t.next()
bucket = GS._get_client(project_name).get_bucket(bucket_name)
return (bucket.get_blob(blob_name) or
list(bucket.list_blobs(prefix='{}/'.format(blob_name))))
def load(self):
log.debug('Loading {0} with size {1}.'
''.format(self.blob.name, self.blob.size))
return BytesIO(self.blob.download_as_string())
def load_text(self, encoding='utf8', encoding_errors='ignore'):
log.debug('Loading {0} with size {1}.'
''.format(self.blob.name, self.blob.size))
return StringIO(
self.blob.download_as_string().decode(
encoding, encoding_errors
)
)
def dump(self, stream):
log.debug('Dumping to {0}.'.format(self.blob.name))
self.blob.upload_from_string(stream.read(),
content_type=self.mime_type)
return self
def make_public(self, recursive=False):
self.blob.make_public(recursive)
return self