-
Notifications
You must be signed in to change notification settings - Fork 18
/
result.py
123 lines (97 loc) · 3.43 KB
/
result.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from clickhouse_driver.progress import Progress
class QueryResult(object):
"""
Stores query result from multiple blocks.
"""
def __init__(
self, packet_generator,
with_column_types=False, columnar=False):
self.packet_generator = packet_generator
self.with_column_types = with_column_types
self.data = []
self.columns_with_types = []
self.columnar = columnar
super(QueryResult, self).__init__()
def store(self, packet):
block = getattr(packet, 'block', None)
if block is None:
return
# Header block contains no rows. Pick columns from it.
if block.rows:
if self.columnar:
columns = block.get_columns()
if self.data:
# Extend corresponding column.
for i, column in enumerate(columns):
self.data[i] += column
else:
self.data.extend(columns)
else:
self.data.extend(block.get_rows())
elif not self.columns_with_types:
self.columns_with_types = block.columns_with_types
async def get_result(self):
"""
:return: Stored query result.
"""
async for packet in self.packet_generator:
self.store(packet)
if self.with_column_types:
return self.data, self.columns_with_types
else:
return self.data
class ProgressQueryResult(QueryResult):
"""
Stores query result and progress information from multiple blocks.
Provides iteration over query progress.
"""
def __init__(
self, packet_generator,
with_column_types=False, columnar=False):
self.progress_totals = Progress()
super(ProgressQueryResult, self).__init__(
packet_generator, with_column_types, columnar
)
def __aiter__(self):
return self
async def __anext__(self):
while True:
packet = await self.packet_generator.__anext__()
progress_packet = getattr(packet, 'progress', None)
if progress_packet:
self.progress_totals.increment(progress_packet)
return (
self.progress_totals.rows, self.progress_totals.total_rows
)
else:
self.store(packet)
async def get_result(self):
# Read all progress packets.
async for _ in self:
pass
return await super(ProgressQueryResult, self).get_result()
class IterQueryResult(object):
"""
Provides iteration over returned data by chunks (streaming by chunks).
"""
def __init__(
self, packet_generator,
with_column_types=False):
self.packet_generator = packet_generator
self.with_column_types = with_column_types
self.first_block = True
super(IterQueryResult, self).__init__()
def __aiter__(self):
return self
async def __anext__(self):
packet = await self.packet_generator.__anext__()
block = getattr(packet, 'block', None)
if block is None:
return []
if self.first_block and self.with_column_types:
self.first_block = False
rv = [block.columns_with_types]
rv.extend(block.get_rows())
return rv
else:
return block.get_rows()