8
8
import time
9
9
10
10
import progressbar
11
+ # This actually imports progressbar2 but `import progressbar2' itself doesn't work.
12
+ # In case of problems with the progressbar/progressbar2, check that you have the
13
+ # progressbar2 installed and the path to it or venv is specified.
14
+
11
15
import psycopg2 .extensions
12
16
13
17
import common
@@ -22,7 +26,10 @@ def setup_tpcds(config):
22
26
try :
23
27
conn = psycopg2 .connect (** config )
24
28
cur = conn .cursor ()
29
+ except Exception as e :
30
+ raise DataLoadException ('Load failed: %s' % e )
25
31
32
+ try :
26
33
# Create pg_query_state extension
27
34
cur .execute ('CREATE EXTENSION IF NOT EXISTS pg_query_state' )
28
35
@@ -55,13 +62,13 @@ def run_tpcds(config):
55
62
TPC_DS_STATEMENT_TIMEOUT = 20000 # statement_timeout in ms
56
63
57
64
print ('Preparing TPC-DS queries...' )
65
+ err_count = 0
58
66
queries = []
59
67
for query_file in sorted (os .listdir ('tmp_stress/tpcds-result-reproduction/query_qualification/' )):
60
68
with open ('tmp_stress/tpcds-result-reproduction/query_qualification/%s' % query_file , 'r' ) as f :
61
69
queries .append (f .read ())
62
70
63
71
acon , = common .n_async_connect (config )
64
- pid = acon .get_backend_pid ()
65
72
66
73
print ('Starting TPC-DS queries...' )
67
74
timeout_list = []
@@ -84,8 +91,25 @@ def run_tpcds(config):
84
91
PG_QS_DELAY , BEFORE_GETTING_QS_DELAY = 0.1 , 0.1
85
92
BEFORE_GETTING_QS , GETTING_QS = range (2 )
86
93
state , n_first_getting_qs_retries = BEFORE_GETTING_QS , 0
94
+
95
+ pg_qs_args = {
96
+ 'config' : config ,
97
+ 'pid' : acon .get_backend_pid ()
98
+ }
99
+
87
100
while True :
88
- result , notices = common .pg_query_state (config , pid )
101
+ try :
102
+ result , notices = common .pg_query_state (** pg_qs_args )
103
+ except Exception as e :
104
+ # do not consider the test failed if the "error in message
105
+ # queue data transmitting" is received, this may happen with
106
+ # some small probability, but if it happens too often it is
107
+ # a problem, we will handle this case after the loop
108
+ if "error in message queue data transmitting" in e .pgerror :
109
+ err_count += 1
110
+ else :
111
+ raise e
112
+
89
113
# run state machine to determine the first getting of query state
90
114
# and query finishing
91
115
if state == BEFORE_GETTING_QS :
@@ -109,6 +133,12 @@ def run_tpcds(config):
109
133
except psycopg2 .extensions .QueryCanceledError :
110
134
timeout_list .append (i + 1 )
111
135
136
+ if err_count > 2 :
137
+ print ("\n ERROR: error in message queue data transmitting" )
138
+ raise Exception ('error was received %d times' % err_count )
139
+ elif err_count > 0 :
140
+ print (err_count , " times there was error in message queue data transmitting" )
141
+
112
142
common .n_close ((acon ,))
113
143
114
144
if len (timeout_list ) > 0 :
0 commit comments