-
Notifications
You must be signed in to change notification settings - Fork 1
/
history.py
176 lines (141 loc) · 7.58 KB
/
history.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import sqlite3
import browser_history
from datetime import datetime
import time
from urllib.parse import urlparse
import threading
import logging
import os
import sys
import copy
# logging.basicConfig(filename=r'C:\Users\Nam Anh\twert\logs\history.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
# logging.warning("path of history.py: {}".format(os.path.dirname(os.path.abspath(__file__))))
database_path = "nblocker.sqlite3"
program_start_time = datetime.now()
def find_nth(haystack, needle, n):
start = haystack.find(needle)
while (start>= 0 and n > 1):
start = haystack.find(needle, start + len(needle))
n-=1
return start
def absolute_history_path(path):
if os.path.isabs(path):
return path
# logging.warning("path of sys.executable: {}".format(os.path.dirname(sys.executable)))
exec_path = str(os.path.dirname(sys.executable)) + "\\"
tmp_conn = create_connection(database_path)
tmp_cur = tmp_conn.cursor()
user_path = tmp_cur.execute("select username from current_user").fetchall()[0][0]
tmp_conn.close()
# logging.warning("user path: {}".format(os.path.join(user_path, path)))
return os.path.join(user_path, path)
def copy_history_file(absolute_og_path, new_path):
pass
def convert_browser_path():
# read from copy location, absolute_history _path is the path of the og
browser_history.browsers.Chromium.windows_path = absolute_history_path(browser_history.browsers.Chromium.windows_path)
browser_history.browsers.Chrome.windows_path = absolute_history_path(browser_history.browsers.Chrome.windows_path)
browser_history.browsers.Firefox.windows_path = absolute_history_path(browser_history.browsers.Firefox.windows_path)
browser_history.browsers.Edge.windows_path = absolute_history_path(browser_history.browsers.Edge.windows_path)
browser_history.browsers.Opera.windows_path = absolute_history_path(browser_history.browsers.Opera.windows_path)
browser_history.browsers.OperaGX.windows_path = absolute_history_path(browser_history.browsers.OperaGX.windows_path)
browser_history.browsers.Brave.windows_path = absolute_history_path(browser_history.browsers.Brave.windows_path)
browser_history.browsers.Vivaldi.windows_path = absolute_history_path(browser_history.browsers.Vivaldi.windows_path)
def parse_domain(domain: str):
return urlparse(domain).netloc
# start_time = time.time()
# history = browser_history.get_history()
# for num, entry in enumerate(history.histories):
# print("{}. time:{} , domain:{} ".format(num, entry[0], parse_domain(entry[1])))
# print("time to print history: {}".format(time.time() - start_time))
def create_connection(db_file : str):
conn = None
try:
conn = sqlite3.connect(db_file)
except Exception as e:
logging.error(e)
return conn
def diff_history(datetime_history_list, history_list_after):
diff = []
for entry in range(len(history_list_after)-1, 0, -1):
if (history_list_after[entry][0] > datetime_history_list):
diff.append(history_list_after[entry])
else:
break
return diff
def log_a_list(prefix, tmp_list, num):
num = min(len(tmp_list), num)
for i in range(1,num + 1):
# logging.warning("{}. {} :{}".format(num - i + 1, prefix, tmp_list[-i]))
pass
def add_all_history(conn, datetime_history_list):
# copy new file over , check against old file, add diff to admin, delete old file
convert_browser_path()
# logging.warning("Chromium.windows_path: {}".format(browser_history.browsers.Chromium.windows_path))
# logging.warning("Chrome.windows_path: {}".format(browser_history.browsers.Chrome.windows_path))
# logging.warning("Firefox.windows_path: {}".format(browser_history.browsers.Firefox.windows_path))
# logging.warning("Edge.windows_path: {}".format(browser_history.browsers.Edge.windows_path))
# logging.warning("Opera.windows_path: {}".format(browser_history.browsers.Opera.windows_path))
# logging.warning("OperaGX.windows_path: {}".format(browser_history.browsers.OperaGX.windows_path))
# logging.warning("Brave.windows_path: {}".format(browser_history.browsers.Brave.windows_path))
# logging.warning("Vivaldi.windows_path: {}".format(browser_history.browsers.Vivaldi.windows_path))
history_list_future = []
try:
history_list_future = browser_history.get_history().histories
log_a_list("history_list_future current entry",history_list_future, 100)
except Exception as e:
# logging.warning(e)
pass
diff = diff_history(datetime_history_list, history_list_future)
log_a_list("diff current entry",diff, 10)
diff.reverse()
for entry in diff:
domain = parse_domain(entry[1])
timestamp = entry[0]
# logging.warning("time:{} , domain:{} ".format(timestamp, domain))
sql = """INSERT INTO domains (domain,hit_count,first_seen,last_seen, browser)
VALUES (?,1,?,?, ?)
ON CONFLICT (domain)
DO UPDATE SET hit_count=hit_count+1, last_seen=?, browser = ?
"""
sql1 = """INSERT INTO domains (domain,hit_count,first_seen,last_seen, browser)
VALUES (?,1,?,?, ?)
"""
cursor = conn.cursor()
cursor.execute(sql, (domain, timestamp, timestamp, 'TRUE', timestamp , 'TRUE'))
#cursor.execute(sql1, (domain, timestamp, timestamp, 'TRUE'))
conn.commit()
# logging.warning("time add history: {}".format(history_list_future[-1][0]))
return history_list_future[-1][0]
def thread_run():
convert_browser_path()
# logging.warning("Chromium.windows_path: {}".format(browser_history.browsers.Chromium.windows_path))
# logging.warning("Chrome.windows_path: {}".format(browser_history.browsers.Chrome.windows_path))
# logging.warning("Firefox.windows_path: {}".format(browser_history.browsers.Firefox.windows_path))
# logging.warning("Edge.windows_path: {}".format(browser_history.browsers.Edge.windows_path))
# logging.warning("Opera.windows_path: {}".format(browser_history.browsers.Opera.windows_path))
# logging.warning("OperaGX.windows_path: {}".format(browser_history.browsers.OperaGX.windows_path))
# logging.warning("Brave.windows_path: {}".format(browser_history.browsers.Brave.windows_path))
# logging.warning("Vivaldi.windows_path: {}".format(browser_history.browsers.Vivaldi.windows_path))
conn = create_connection(database_path)
datetime_current_history = browser_history.get_history().histories[-1][0]
# logging.warning("beginning time: {}".format(datetime_current_history))
while True:
try:
datetime_current_history = add_all_history(conn, datetime_current_history)
time.sleep(90)
except (KeyboardInterrupt, SystemExit):
try:
log_a_list("all history", browser_history.get_history().histories, 1000)
except Exception as e:
# logging.warning("error when adding all history: {}".format(e))
conn.close()
conn.close()
break
def start_thread():
rThread = threading.Thread(target=thread_run,daemon=True)
rThread.start()
pass
# still can't resolve bug where history is inserted into the browser's history non-linearly, meaning that sometimes old entries can be added later.
# tried to minimize this bug by adding the new browser history entries every 15 minutes.
# can optimize by changing history_sql in browsers.py and generic.py. Current approach in clunky, can replace the diff function with the correct sql call where visit_time > datetime_current_history.