-
Notifications
You must be signed in to change notification settings - Fork 3
/
storages.py
146 lines (122 loc) · 5.9 KB
/
storages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""Abstract definition of storage connections"""
import functools
import pathlib
@functools.lru_cache(maxsize=None)
def storage(alias) -> 'Storage':
"""Returns a storage configuration by alias"""
from . import config
storages = config.storages()
if alias not in storages:
raise KeyError(f'storage alias "{alias}" not configured')
return storages[alias]
class Storage:
"""Generic storage connection definition"""
def __repr__(self) -> str:
return (f'<{self.__class__.__name__}: '
+ ', '.join([f'{var}={"*****" if (var == "password" or "secret" in var) else getattr(self, var)}'
for var in vars(self) if getattr(self, var)])
+ '>')
class LocalStorage(Storage):
def __init__(self, base_path: pathlib.Path):
"""
Connection information for a local path data bucket
"""
self.base_path = base_path
class SftpStorage(Storage):
def __init__(self, host: str, port: int = None, user: str = None, password: str = None,
insecure: bool = False, identity_file: str = None, public_identity_file: str = None):
"""
Connection information for a SFTP server
Args:
host: host name
port: tcp port
user: username
password: password
insecure: if True, the known_hosts file will not be checked
identity_file: path to a private key file to be used for private/public key
authentication
public_identity_file: path to a public key file to be used for
private/public key authentication
"""
self.host = host
self.port = port
self.user = user
self.password = password
self.insecure = insecure
self.identity_file = identity_file
self.public_identity_file = public_identity_file
class GoogleCloudStorage(Storage):
def __init__(self, bucket_name: str, project_id: str = None, location: str = None,
service_account_file: str = None, service_account_info: dict = None):
"""
Connection information for a Google Cloud Storage bucket
Args:
bucket_name: name of the GCS bucket
project_id: Google Cloud project ID for new buckets
location: Default geographic location to use when creating buckets
service_account_file: The name of the private key file provided by Google
when creating a service account. (it's a JSON file).
service_account_info: The (parsed JSON) content of a service account file
(use when you don't want to provide a
`service_account_file`)
"""
self.bucket_name = bucket_name
self.project_id = project_id
self.location = location
self.service_account_file = service_account_file
self.service_account_info = service_account_info
@property
def base_uri(self):
"""Returns the base URI for the storage bucket"""
return f'gs://{self.bucket_name}'
def build_uri(self, path: str):
"""Returns a URI for a path on the storage"""
return f"{self.base_uri}/{path}"
class AzureStorage(Storage):
def __init__(self, account_name: str, container_name: str, sas: str = None,
storage_type: str = 'blob', account_key: str = None,
spa_tenant: str = None, spa_application: str = None, spa_client_secret: str = None):
"""
Connection information for a Azure sstorage bucket
Possible authentication methods:
SAS => "Shared access signature", see https://docs.microsoft.com/en-us/azure/storage/common/storage-sas-overview
SPA => "Service principal"
Args:
account_name: The storage account name
container_name: The container name within the storage
storage_type: The storage type. Supports 'blob' or 'dfs'.
sas: The SAS token
account_key: The storage account key
spa_tenant: The service principal tenant id
spa_application: The service principal application id
spa_client_secret: The service principal client secret
"""
if sas is None and account_key is None and spa_client_secret is None:
raise ValueError('You have to provide either parameter sas, account_key or spa_client_secret for type AzureStorage.')
self.account_name = account_name
self.account_key = account_key
self.container_name = container_name
self.storage_type = storage_type
self.sas = (sas[1:] if sas.startswith('?') else sas) if sas else None
self.spa_tenant = spa_tenant
self.spa_application = spa_application
self.spa_client_secret = spa_client_secret
@property
def base_uri(self):
return self.build_base_uri()
def build_base_uri(self, storage_type: str = None):
return f'https://{self.account_name}.{storage_type or self.storage_type}.core.windows.net/{self.container_name}'
def build_uri(self, path: str = None, storage_type: str = None):
"""Returns a URI for a path on the storage"""
if path and not path.startswith('/'):
path = '/' + path
return (f"{self.build_base_uri(storage_type)}{path}"
+ (f'?{self.sas}' if self.sas else ''))
def connection_string(self):
# see https://docs.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string
if self.account_key:
return f'DefaultEndpointsProtocol=https;AccountName={self.account_name};AccountKey={self.account_key}'
else:
return ('DefaultEndpointsProtocol=https'
+ f';BlobEndpoint=https://{self.account_name}.{self.storage_type}.core.windows.net'
+ f';SharedAccessSignature={self.sas}')