-
Notifications
You must be signed in to change notification settings - Fork 4
/
aiosqliteclass.py
377 lines (313 loc) · 12.7 KB
/
aiosqliteclass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
import asyncio
import aiosqlite
import logging
logger = logging.getLogger(__name__)
async def create_conn():
db = Sqlite3Class()
await db._init()
return db
class Sqlite3Class:
# self.cursor.row_factory = lambda cursor, row: row[0]
def __init__(self, db_name="pythonsqlite.db"):
self.db_name = db_name
self.conn = None
self.cursor = None
async def _init(self):
self.conn = await aiosqlite.connect(self.db_name)
self.cursor = await self.conn.cursor()
async def _close(self):
await self.conn.close()
async def get_tag_id_from_tag(self, intag):
await self.cursor.execute("SELECT id FROM tag_table WHERE tag_name=? ", (intag,))
result = await self.cursor.fetchone()
if result:
return result[0]
else:
return None
async def get_object_id_from_object(self, inurl):
"""
Retrieves the object ID from the object table based on the given object URL.
Args:
inurl (str): The URL of the object.
Returns:
dict or None: A dictionary containing the object details if found, or None if not found.
"""
print("query_url", inurl)
await self.cursor.execute(
"SELECT id FROM object_table WHERE object_name=? ", (inurl,)
)
result = await self.cursor.fetchone()
if result:
return result[0]
else:
return None
async def create_tag(self, intag):
await self.cursor.execute(
"INSERT INTO tag_table (tag_name) VALUES (?)", (intag,)
)
await self.conn.commit()
result_tag_id = self.cursor.lastrowid
return result_tag_id
async def create_object(self, inurl):
await self.cursor.execute(
"INSERT INTO object_table (object_name) VALUES (?)", (inurl,)
)
await self.conn.commit()
result_tag_id = self.cursor.lastrowid
return result_tag_id
async def create_mapping(self, tagid, gifid):
try:
await self.cursor.execute(
"INSERT INTO object_tag_mapping VALUES (?,?)", (gifid, tagid)
)
await self.conn.commit()
except Exception as e:
print(e)
pass
async def tag(self, inurl, intag):
urlid = await self.get_object_id_from_object(inurl)
print("tag urlid", urlid)
if not urlid:
urlid = await self.create_object(inurl)
tagid = await self.get_tag_id_from_tag(intag)
print("tag tagid", tagid)
if not tagid:
tagid = await self.create_tag(intag)
await self.create_mapping(tagid, urlid)
async def get_objects_by_tag_name(self, intag):
await self.cursor.execute(
"SELECT object_name from object_tag_mapping JOIN object_table ON object_reference = object_table.id JOIN tag_table ON tag_reference = tag_table.id WHERE tag_name = ?",
(intag,),
)
object_list = []
result = await self.cursor.fetchall()
if result:
for item in result:
if item:
object_list.append(item[0])
return object_list
async def get_tag_names_by_object_name(self, inurl):
await self.cursor.execute(
"SELECT tag_name from object_tag_mapping JOIN tag_table ON tag_reference = tag_table.id JOIN object_table ON object_reference = object_table.id WHERE object_name = ?",
(inurl,),
)
result = await self.cursor.fetchall()
tag_names = []
if result:
for item in result:
tag_names.append(item[0])
return tag_names
async def get_tag_names_by_object_id(self, gif_id):
await self.cursor.execute(
"SELECT tag_name FROM object_tag_mapping "
"JOIN tag_table ON tag_reference = tag_table.id "
"WHERE object_reference = ?",
(gif_id,),
)
result = await self.cursor.fetchall()
return result
async def untag(self, inurl, intag):
print('untagging....')
urlid = await self.get_object_id_from_object(inurl)
tagid = await self.get_tag_id_from_tag(intag)
print(urlid)
print(tagid)
if urlid and tagid:
await self.remove_mapping_by_ids(urlid, tagid)
test_tag_has_object = await self.get_objects_by_tag_name(intag)
print(test_tag_has_object)
if not test_tag_has_object:
await self.cursor.execute(
"DELETE FROM tag_table WHERE tag_name = ?", (intag,)
)
await self.conn.commit()
test_object_has_tags = await self.get_tag_names_by_object_name(inurl)
print(test_object_has_tags)
if not test_object_has_tags:
await self.remove_object_by_object_id(urlid)
async def untag_simple(self, in_string):
tag_id = await self.get_tag_id_from_tag(in_string)
object_id = await self.get_object_id_from_object(in_string)
if object_id and tag_id:
return None
else:
if tag_id:
mapped_objects = await self.get_object_ids_by_tag_id(tag_id)
if mapped_objects:
if len(mapped_objects) > 1:
return None
else:
object_id = mapped_objects[0][0]
await self.remove_mapping_by_ids(object_id, tag_id)
await self.conn.commit()
await self.remove_tag_by_tag_id(tag_id)
mapped_tags = await self.get_tag_ids_by_object_id(object_id)
if not mapped_tags:
await self.remove_object_by_object_id(object_id)
return 1
elif object_id:
mapped_tags = await self.get_tag_ids_by_object_id(object_id)
if mapped_tags:
if len(mapped_tags) > 1:
return None
else:
tag_id = mapped_tags[0][1]
await self.remove_mapping_by_ids(object_id, tag_id)
await self.conn.commit()
await self.remove_object_by_object_id(object_id)
mapped_objects = await self.get_object_ids_by_tag_id(tag_id)
if not mapped_objects:
await self.remove_tag_by_tag_id(tag_id)
return 1
async def untag_by_tag_id(self, tag_id):
tuples = await self.get_object_ids_by_tag_id(tag_id)
if tuples:
if len(tuples) > 1:
pass
else:
object_id = tuples[0][0]
print(object_id)
print(tag_id)
if object_id and tag_id:
await self.remove_mapping_by_ids(object_id, tag_id)
await self.conn.commit()
await self.remove_tag_by_tag_id(tag_id)
test_object_has_tags = await self.get_tag_ids_by_object_id(object_id)
print(test_object_has_tags)
if not test_object_has_tags:
await self.remove_object_by_object_id(object_id)
async def untag_by_tag_name(self, tag_name):
tuples = await self.get_objects_by_tag_name(tag_name)
if tuples:
if len(tuples) > 1:
pass
else:
object_id = tuples[0][0]
tag_id = tuples[0][1]
print(object_id)
print(tag_id)
if object_id and tag_id:
await self.remove_mapping_by_ids(object_id, tag_id)
await self.remove_tag_by_tag_id(tag_id)
test_object_has_tags = await self.get_tag_names_by_object_id(object_id)
print(test_object_has_tags)
if not test_object_has_tags:
await self.remove_object_by_object_id(object_id)
async def untag_by_object_id_and_object_data(self, object_id,object_data):
tag_ids = await self.get_tag_ids_by_object_id(object_id)
object_data_from_db = await self.get_object_by_object_id(object_id)
if object_data_from_db is None:
print("object not found")
print(object_id)
print(object_data)
else:
if object_data_from_db[1].strip().strip("'") == object_data:
print(object_data)
#print(object_id)
#print(tag_ids)
await self.remove_object_by_object_id(object_id)
if object_id and tag_ids:
for tag_id in tag_ids:
tag_name = await self.get_tag_name_by_tag_id(tag_id[1])
print(tag_name[1])
#print(tag_id[1])
await self.remove_mapping_by_ids(object_id, tag_id[1])
await self.conn.commit()
test_tag_has_objects = await self.get_object_ids_by_tag_id(tag_id[1])
#print(test_tag_has_objects)
if test_tag_has_objects:
pass
else:
print("dead:",tag_name[1])
await self.remove_tag_by_tag_id(tag_id[1])
print(tag_name)
else:
print("object data does not match")
print(object_data_from_db)
print(object_data)
async def info(self, in_string):
resulting_tags = []
resulting_urls = []
object_result = await self.get_objects_by_tag_name(in_string)
for object in object_result:
if object:
resulting_urls.append(object)
tag_result = await self.get_tag_names_by_object_name(in_string)
for tag in tag_result:
if tag:
resulting_tags.append(tag)
print(resulting_tags, resulting_urls)
return resulting_urls, resulting_tags
async def get_tag_ids_by_object_id(self, object_id):
"""
Retrieves the tag IDs associated with the given object ID.
Args:
object_id (int): The ID of the object.
Returns:
tuples of (object_id,tag_id) or None.
"""
await self.cursor.execute(
"SELECT * FROM object_tag_mapping WHERE object_reference=?", (object_id,)
)
result = await self.cursor.fetchall()
if result:
return result
else:
return None
async def get_object_ids_by_tag_id(self, tag_id):
await self.cursor.execute(
"SELECT object_reference FROM object_tag_mapping WHERE tag_reference=?", (tag_id,)
)
result = await self.cursor.fetchall()
if result:
return result
else:
return None
async def get_object_by_object_id(self, object_id):
await self.cursor.execute(
"SELECT object_name FROM object_table WHERE id=?", (object_id,)
)
result = await self.cursor.fetchone()
if result:
return result
else:
return None
async def get_tag_name_by_tag_id(self, tag_id):
await self.cursor.execute(
"SELECT tag_name FROM tag_table WHERE id=?", (tag_id,)
)
result = await self.cursor.fetchone()
if result:
return result
else:
return None
async def remove_tag_by_tag_id(self, tag_id):
await self.cursor.execute(
"DELETE FROM tag_table WHERE id = ?", (tag_id,)
)
await self.conn.commit()
async def remove_object_by_object_id(self, object_id):
await self.cursor.execute(
"DELETE FROM object_table WHERE id = ?", (object_id,)
)
await self.conn.commit()
async def remove_mapping_by_ids(self, object_id,tag_id):
await self.cursor.execute(
"DELETE FROM object_tag_mapping WHERE object_reference = ? AND tag_reference = ?",
(object_id, tag_id,),
)
await self.conn.commit()
async def run():
db = Sqlite3Class()
await db._init()
await db._close()
if __name__ == "__main__":
# logging.basicConfig(level=logging.DEBUG)
# query_in = input("Enter search term: ")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
except KeyboardInterrupt:
print("[KeyboardInterrupt] Exiting.")
finally:
loop.close()