-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
executable file
·326 lines (283 loc) · 10.9 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
#!/usr/bin/python3
import importlib
import os
import sys
from datetime import datetime
from PyQt5 import QtCore, QtWidgets, uic
from PyQt5.Qt import QMessageBox, QWidget, pyqtSlot
from custom_tableview import CustomTableView
from model import TableModel
from paginator import QueryPaginator
# we have to inherit the form from UI file, so we need to load it first.
FORM_CLASS, _ = uic.loadUiType(
os.path.join(os.path.dirname(__file__), "app.ui")
)
# back button name
BACK_BUTTON_NAME = "pbBack"
class MainForm(QWidget, FORM_CLASS):
def __init__(self, parent=None):
# initialization
super().__init__(parent)
self.setupUi(self)
self.__add_custom_tableview()
self.leRowsPerPage.setText("001000")
self.conn = None
self.model = None
self.db_provider = None
# paginator
self.paginator = None
# describe DB providers here
# order needs to be maintained to ensure that SQLite is the first one
self.providers = {"SQLite": "sqlite3", "PostgreSQL": "psycopg2"}
# add items to the list
self.cmbProvider.addItems(self.providers.keys())
# connect signals
self.pbCheck.clicked.connect(self.check_connection)
self.pbExecute.clicked.connect(self.execute_query)
self.pbClose.clicked.connect(self.close_all)
self.leConnection.editingFinished.connect(self.reset_conn)
self.cmbProvider.currentIndexChanged.connect(self.provider_changed)
self.pbForth.clicked.connect(self.page)
self.pbBack.clicked.connect(self.page)
# signals and slots for CustomTableView
self.tbvResults.top_reached_signal.connect(self.pbBack.click)
self.tbvResults.bottom_reached_signal.connect(self.pbForth.click)
def __add_custom_tableview(self) -> bool:
"""
Removes standard QTableView, adds custom CustomTableView to the form.
"""
self.verticalLayout_2.removeWidget(self.tbvResults)
self.tbvResults.deleteLater()
self.tbvResults = None
self.tbvResults = CustomTableView(self.gpbResults)
self.tbvResults.setFrameShape(QtWidgets.QFrame.Panel)
self.tbvResults.setObjectName("tbvResults")
self.tbvResults.horizontalHeader().setVisible(True)
self.verticalLayout_2.addWidget(self.tbvResults)
self.verticalLayout_2.insertWidget(0, self.tbvResults)
return True
def close_all(self):
# finish up the connection
self.reset_conn()
self.close()
def keyPressEvent(self, *args, **kwargs):
"""
Overriding of parent's (Qt) method, that's why camelCase used.
Ctrl+Enter executes query.
"""
# check if Ctrl+Enter|Return pressed
if (
args[0].key() == QtCore.Qt.Key_Enter
or args[0].key() == QtCore.Qt.Key_Return
) and args[0].modifiers() == QtCore.Qt.ControlModifier:
# execute query
self.execute_query()
elif args[0].key() == QtCore.Qt.Key_Escape:
self.pbClose.clicked.emit()
else:
# regular reaction
return QWidget.keyPressEvent(self, *args, **kwargs)
@pyqtSlot()
def check_connection(self, user: bool = True) -> bool:
"""Check button handler."""
try:
self.conn = self.try_to_connect()
except (
ModuleNotFoundError,
RuntimeError,
ValueError,
self.db_provider.Error,
self.db_provider.Warning,
) as err:
self.message_box(
"Error!",
f"Connection can not be established due to: {str(err)}",
QMessageBox.Critical,
)
return False
else:
if user:
self.message_box(
"Success!",
"Connection established",
QMessageBox.Information,
)
return True
def __import_provider(self, provider_name: str):
"""Trying to import module for the provider and return it."""
return importlib.import_module(str(self.providers[provider_name]))
def __is_sqlite_db(self) -> bool:
"""Checks if DB provider is SQLite"""
return self.db_provider.__name__ == "sqlite3"
def __is_db_in_memory(self, db_type: str = ":memory:") -> bool:
"""
Checks if database connection is of type :memory:.
Makes sense for SQLite only.
"""
return db_type == ":memory:"
def __is_file(self, path_str: str = "") -> bool:
"""Checks if the path is a file."""
return os.path.isfile(path_str)
def __create_sqlite_file(self, file_name: str = ""):
"""
Ckecks if user wants to create new SQLite file.
Otherwise rises an exception.
"""
question = (
f"The file {file_name} does not exist! Do you want to create it?"
)
res = self.message_box(
"Attention!",
question,
QMessageBox.Warning,
buttons=QMessageBox.Ok | QMessageBox.Cancel,
)
if res != QMessageBox.Ok:
raise RuntimeError("no such file!")
def try_to_connect(self):
"""
Connects to the database with the connection string
provided by the user.
"""
if self.conn:
return self.conn
# it's unknown what type of provider will user choose, so import here
self.db_provider = self.__import_provider(
self.cmbProvider.currentData(0)
)
db_conn_str = self.leConnection.text().strip()
if not db_conn_str:
raise ValueError("no connection string provided!")
# since SQLite DB is a file we need to check if there is such file
# or user wants to create new one
if (
self.__is_sqlite_db()
and not self.__is_db_in_memory(db_conn_str)
and not self.__is_file(db_conn_str)
):
# Exception might be risen here
self.__create_sqlite_file(db_conn_str)
return self.db_provider.connect(db_conn_str)
def __is_query_exists(self) -> bool:
"""Checks if query text is present."""
if self.teQuery.toPlainText().strip():
return True
self.message_box("Error!", "No query provided", QMessageBox.Critical)
return False
@pyqtSlot()
def execute_query(self):
"""Handles 'Execute' button."""
if not self.check_connection(False) or not self.__is_query_exists():
return False
# let's try to create paginator object and execute query inside of it
try:
self.paginator = QueryPaginator(
rows_num=int(self.leRowsPerPage.displayText().strip()),
connection=self.conn,
query=self.teQuery.toPlainText().strip(),
)
except (
ValueError,
self.db_provider.Error,
self.db_provider.Warning,
) as err:
self.lblCurrentPage.setText("")
self.message_box("Error!", str(err), QMessageBox.Critical)
return False
# model is created when forth or back button pressed
# in order to create model we must emit signal
self.pbForth.clicked.emit()
def __is_forward(self, sender) -> bool:
"""Checks if direction is forward"""
if sender().objectName() == BACK_BUTTON_NAME:
return False
return True
@pyqtSlot()
def page(self):
"""Pages query results"""
if not self.paginator:
return
self.model = None
# if data is not fetched from paginator yet we must recreate model
# because we dont know beforehead if data will arrive.
# otherwise we can end up in situation when paginator is already
# fetched (might be partially), model is None and
# no new data will arrive in for loop (paginator.feeder)
# this potential situation leads to empty model in view
if not self.paginator.fetched:
self.model = TableModel(columns=self.paginator.headers())
# let's save current state of the cursor
savedCursor = self.cursor()
try:
# set the cursor to the wait cursor
self.setCursor(QtCore.Qt.WaitCursor)
# feed the model
for row in self.paginator.feeder(self.__is_forward(self.sender)):
# we must be ensured that model is changed when
# actual data arrives only
if not self.model:
self.model = TableModel(columns=self.paginator.headers())
# data
self.model.input_data.append(row[0])
# row number
self.model.rows.append(row[1])
# return the cursor to the previous state
self.setCursor(savedCursor)
except (self.db_provider.Error, self.db_provider.Warning) as err:
# return the cursor to the previous state
self.setCursor(savedCursor)
# unset current page
self.lblCurrentPage.setText("")
self.message_box("Error!", str(err), QMessageBox.Critical)
# clean the model
self.tbvResults.setModel(None)
return False
if not self.model:
return False
# clean the model
self.__update_model_in_view()
# update last query result time and page number
self.__update_time_and_page()
if self.__is_forward(self.sender):
self.tbvResults.selectRow(0)
else:
self.tbvResults.selectRow(self.model.rowCount(None) - 1)
return True
def __update_model_in_view(self):
"""Updates model in view."""
self.tbvResults.setModel(None)
self.tbvResults.setModel(self.model)
self.tbvResults.selectRow(0)
def __update_time_and_page(self):
"""Updates execution time and page number"""
self.lblUpdateTime.setText(
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
self.lblCurrentPage.setText(str(self.paginator.current_page))
@pyqtSlot()
def reset_conn(self):
"""Resets connection"""
if self.conn:
self.conn.close()
self.conn = None
@pyqtSlot(int)
def provider_changed(self, index):
"""Handles provider change in the providers list box. Resets conn."""
self.reset_conn()
def message_box(self, text, informative, icon, buttons=QMessageBox.Ok):
"""Wraps up the QMessageBox"""
msg = QMessageBox(self)
msg.setStandardButtons(buttons)
msg.setDefaultButton(QMessageBox.Ok)
msg.setText(text)
msg.setInformativeText(informative)
msg.setIcon(icon)
return msg.exec_()
if __name__ == "__main__":
# create the application
app = QtWidgets.QApplication(sys.argv)
# create form
form = MainForm()
form.show()
# start main loop
sys.exit(app.exec_())