/
bulk_import_model.py
206 lines (173 loc) · 5.83 KB
/
bulk_import_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#!/usr/bin/env python
import time
from tdclient.model import Model
class BulkImport(Model):
"""Bulk-import session on Treasure Data Service
"""
STATUS_UPLOADING = "uploading"
STATUS_PERFORMING = "performing"
STATUS_READY = "ready"
STATUS_COMMITTING = "committing"
STATUS_COMMITTED = "committed"
def __init__(self, client, **kwargs):
super(BulkImport, self).__init__(client)
self._feed(kwargs)
def _feed(self, data=None):
data = {} if data is None else data
self._name = data["name"]
self._database = data.get("database")
self._table = data.get("table")
self._status = data.get("status")
self._upload_frozen = data.get("upload_frozen")
self._job_id = data.get("job_id")
self._valid_records = data.get("valid_records")
self._error_records = data.get("error_records")
self._valid_parts = data.get("valid_parts")
self._error_parts = data.get("error_parts")
def update(self):
data = self._client.api.show_bulk_import(self.name)
self._feed(data)
@property
def name(self):
"""A name of the bulk import session
"""
return self._name
@property
def database(self):
"""A database name in a string which the bulk import session is working on
"""
return self._database
@property
def table(self):
"""A table name in a string which the bulk import session is working on
"""
return self._table
@property
def status(self):
"""The status of the bulk import session in a string
"""
return self._status
@property
def job_id(self):
"""Job ID
"""
return self._job_id
@property
def valid_records(self):
"""The number of valid records.
"""
return self._valid_records
@property
def error_records(self):
"""The number of error records.
"""
return self._error_records
@property
def valid_parts(self):
"""The number of valid parts.
"""
return self._valid_parts
@property
def error_parts(self):
"""The number of error parts.
"""
return self._error_parts
@property
def upload_frozen(self):
"""The number of upload frozen.
"""
return self._upload_frozen
def delete(self):
"""Delete bulk import
"""
return self._client.delete_bulk_import(self.name)
def freeze(self):
"""Freeze bulk import
"""
response = self._client.freeze_bulk_import(self.name)
self.update()
return response
def unfreeze(self):
"""Unfreeze bulk import
"""
response = self._client.unfreeze_bulk_import(self.name)
self.update()
return response
def perform(self, wait=False, wait_interval=5, wait_callback=None):
"""Perform bulk import
"""
self.update()
if not self.upload_frozen:
raise (
RuntimeError('bulk import session "%s" is not frozen' % (self.name,))
)
job = self._client.perform_bulk_import(self.name)
if wait:
job.wait(wait_interval=wait_interval, wait_callback=None)
self.update()
return job
def commit(self, wait=False, wait_interval=5, timeout=None):
"""Commit bulk import
"""
response = self._client.commit_bulk_import(self.name)
if wait:
started_at = time.time()
while self._status != self.STATUS_COMMITTED:
if timeout is None or abs(time.time() - started_at) < timeout:
time.sleep(wait_interval)
else:
raise RuntimeError("timeout") # TODO: throw proper error
self.update()
else:
self.update()
return response
def error_record_items(self):
"""Fetch error record rows.
Yields:
Error record
"""
for record in self._client.bulk_import_error_records(self.name):
yield record
def upload_part(self, part_name, bytes_or_stream, size):
"""Upload a part to bulk import session
Args:
part_name (str): name of a part of the bulk import session
bytes_or_stream (file-like): a file-like object contains the part
size (int): the size of the part
"""
response = self._client.bulk_import_upload_part(
self.name, part_name, bytes_or_stream, size
)
self.update()
return response
def upload_file(self, part_name, fmt, file_like):
"""Upload a part to Bulk Import session, from an existing file on filesystem.
Args:
part_name (str): name of a part of the bulk import session
fmt (str): format of data type (e.g. "msgpack", "json")
file_like (str or file-like): a name of a file, or a file-like object contains the data
"""
response = self._client.bulk_import_upload_file(
self.name, part_name, fmt, file_like
)
self.update()
return response
def delete_part(self, part_name):
"""Delete a part of a Bulk Import session
Args:
part_name (str): name of a part of the bulk import session
Returns:
True if succeeded.
"""
response = self._client.bulk_import_delete_part(self.name, part_name)
self.update()
return response
def list_parts(self):
"""Return the list of available parts uploaded through
:func:`~BulkImportAPI.bulk_import_upload_part`.
Returns:
[str]: The list of bulk import part name.
"""
response = self._client.list_bulk_import_parts(self.name)
self.update()
return response