forked from kossiitkgp/OnThisDay
-
Notifications
You must be signed in to change notification settings - Fork 3
/
db_handler.py
150 lines (130 loc) · 4.45 KB
/
db_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
handle database operations
"""
import os
import json
import sqlite3
from sqlite3 import Error
from contextlib import contextmanager
DB_PATH = os.path.join(os.path.expanduser('~'), ".mnemo.db")
FETCH_QUERY = 'SELECT * FROM slack_calls'
# indexes
TMID = 1
TMNM = 2
CHID = 3
ACTK = 4
BTTK = 5
class DBHandler:
"""
Provides endpoints to perform DB operations
"""
def __init__(self):
self._instantiate()
def fetch(self):
"""
gets all the rows of the table
:return: list having all row data with above indexing scheme
"""
try:
with self._connect() as db_cur:
db_cur.execute(FETCH_QUERY)
return db_cur.fetchall()
except Exception as err:
print('Failed to fetch data')
print(err)
return None
def remove(self, team_id):
"""
removes entry with the given team id
:param team_id: unique slack ID for the team
:return: True if successful, False otherwise
"""
del_query = """DELETE FROM slack_calls
WHERE indexed_team_id={team_id}
""".format(
team_id=team_id
)
try:
with self._connect() as db_cur:
db_cur.execute(del_query)
return True
except Exception as err:
print('Failed to delete for %s', team_id)
print(err)
return False
def add(self, path_to_json):
"""
create a new entry in the database table
:param path_to_json: absolute path to JSON file
:return: True if successful, False otherwise
"""
with open(path_to_json, "r") as data_file:
auth_data = json.load(data_file)
auth_data = {
'access_token': auth_data['access_token'],
'bot_access_token': auth_data['bot']['bot_access_token'],
'channel_id': auth_data['incoming_webhook']['channel_id'],
'team_id': auth_data['team_id'],
'team_name': auth_data['team_name']
}
try:
with self._connect() as db_cur:
cols = ', '.join('"{}"'.format(col) for col in auth_data.keys())
vals = ', '.join('"{}"'.format(col) for col in auth_data.values())
try:
command = """INSERT INTO "slack_calls"
({keys})
VALUES ({values})""".format(
keys=cols,
values=vals
)
# print(command)
db_cur.execute(command)
except Error as err:
print("Failed to insert data")
print(err)
raise Exception
except Exception as err:
print("Failed to add %s to database", auth_data['team_name'])
print(err)
return False
def _instantiate(self):
"""
Instantiate a db file with the required table(s)
TO BE CALLED MANUALLY FROM WITHIN SCRIPT
:return: None
"""
print("creating database with table(s) at " + DB_PATH)
with self._connect() as db_cur:
# create disasters table
db_cur.execute("""CREATE TABLE IF NOT EXISTS slack_calls (
id integer PRIMARY KEY ,
team_id text UNIQUE NOT NULL ,
team_name text ,
channel_id text UNIQUE NOT NULL ,
access_token text UNIQUE NOT NULL ,
bot_access_token text UNIQUE NOT NULL ,
);
""")
# indexing columns for faster access
db_cur.execute("""CREATE UNIQUE INDEX IF NOT EXISTS indexed_team_id ON slack_calls (
team_id
);
""")
@staticmethod
@contextmanager
def _connect():
"""
provides connection to the database
:yields: Cursor to database
:return: None
"""
try:
conn = sqlite3.connect(DB_PATH, timeout=30)
yield conn.cursor()
conn.commit()
conn.close()
except Error as err:
print("Failed to connect to the database")
print(err)
raise Exception