/
status.py
354 lines (297 loc) · 13.5 KB
/
status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
"""Script for checking daily status of databases and codebases.
"""
from backup import upload_file
import classes
import datetime
import functions
import praw
import os
from shutil import copyfile
from test_db import main_test_db
import time
def init():
"""Initializes databases, Reddit bot"""
global hist_db, journal_db, log_db, r, soc_db, cont_db
hist_db = classes.HistoryDB()
journal_db = classes.JournalDB()
log_db = classes.LoggingDB()
r = praw.Reddit('bot1')
soc_db = classes.SocMediaDB()
cont_db = classes.ContestDB()
def main():
"""Main script to check status"""
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__)))
message = "**Daily Status Check** \n \n"
# Check Soc Media DB Fresh Count
fresh_count = soc_db.fresh_count
if fresh_count <= 10:
message += "* *NOTE: ONLY {} FRESH SOC MEDIA MAPS LEFT!* * \n\n".format(fresh_count)
else:
message += "{maps} Maps Left \nThat's {days} days, {hours} hours of content. \n\n".format(
maps=fresh_count,
days=int(fresh_count/8),
hours=((fresh_count % 8) * 3)
)
# Test Functions
functions_result = test_functions()
if functions_result == '':
message += 'Functions Test Passed \n'
else:
message += 'Functions Test Failed: \n{} \n\n'.format(functions_result)
message += " \n*** \n"
# Test database integrity
message += test_db_integrity()
message += " \n*** \n"
# Create report of quantities for each time zone group
message += functions.create_time_zone_table(soc_db.zone_dict)
# Make posts older than a year fresh again
if soc_db.fresh_count < 10:
try:
soc_db.make_fresh_again(current_time=int(time.time()), limit=10)
message += "Ran the Make Fresh Again script \n"
except Exception as e:
error_message = ("Could not run soc_db.make_fresh_again \n{} \n{} \n".format(str(e), str(type(e))))
my_diag.traceback = error_message
my_diag.severity = 2
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__))) # Re-initialize the diagnostic
print(error_message)
# Get failures from last 24 hours and report on them
message += functions.fails_last_24_report(db_obj=log_db)
# Can get successes from last 24 hours and report on them, but removed it because it is too much text
# Use the function success_last_24_report(db_obj=log_db) if you'd like to bring it back
# Check Where in the World
message += check_where_in_world()
# Check count of remaining where in world maps
message += remaining_where_in_world()
# Post stats on the map contest
message += check_map_contest()
# Test the database
test_db_time, report = main_test_db()
message += " \n--------------- \n"
message += "Test_DB benchmark time = {} \n".format(test_db_time)
message += "Total rows in Soc Media DB = {} \n \n".format(soc_db.rows_count)
message += "Total rows in History DB = {} \n \n".format(hist_db.rows_count)
message += report + " \n"
# Send results to myself on Reddit
print(message)
try:
functions.send_reddit_message_to_self(title="Status Report", message=message)
except Exception as e:
message += "Could not send message on Reddit. \n{} \n{}".format(str(e), str(type(e)))
with open('data/daily_status.txt', 'w') as text_file:
text_file.write(message)
# Make backup
try:
make_backup()
print("Backing up Database to Google Drive")
except Exception as e:
error_message = ("Could not make backup! \n{} \n{} \n".format(str(e), str(type(e))))
my_diag.traceback = error_message
my_diag.severity = 2
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__))) # Re-initialize the diagnostic
print(error_message)
# Log success of script
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=1)
log_db.close()
journal_db.update_todays_status(benchmark_time=test_db_time)
def make_backup(source_db_path='data/mapporn.db'):
"""Make backup of the database
:param source_db_path: Path to database, defaults to production database, option to change
it for using testing database
:type source_db_path: str
"""
backup_filename = 'backup' + str(time.strftime("%Y%m%d")) + '.db'
backup_filepath = 'data/backup/' + backup_filename
copyfile(source_db_path, backup_filepath)
upload_file(backup_filepath, backup_filename)
def test_functions():
# TODO Add type annotation
"""Tests functions
:return: Error Message
:rtype: str
"""
init()
error_message = ''
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__)))
my_diag.traceback = 'test_function script'
# Test Functions in Checkinbox.py
try: # Test get_time_zone()
assert functions.get_time_zone('London') == 0
assert functions.get_time_zone('909523[reteopipgfrtAfrica436i') == 1
assert functions.get_time_zone('354tp4t[fds..dsfDenverre9sg') == -7
except AssertionError as e:
error_message += ("get_time_zone function not working \n{} \n".format(str(e)))
my_diag.traceback = error_message
my_diag.severity = 2
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__))) # Re-initialize the diagnostic
print(error_message)
random_string = functions.create_random_string(10)
try:
assert functions.get_time_zone(random_string) == 99
except AssertionError as e:
error_message += ("get_time_zone function not working \n{} \n{} \n".format(str(e), random_string))
error_message += ("get_time_zone function not working \n{} \n".format(str(e)))
my_diag.traceback = error_message
my_diag.severity = 1
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__))) # Re-initialize the diagnostic
try: # Test split_message()
assert functions.split_message("https://redd.it/9cmxi1\ntext goes here\n12") == \
['https://redd.it/9cmxi1', 'text goes here', '12']
assert functions.split_message("1\n2\n3") == ['1', '2', '3']
assert functions.split_message('https://redd.it/9e6vbg') == ['https://redd.it/9e6vbg']
except AssertionError as e:
error_message += ("Could not run split_message() function \n{} \n\n".format(e))
my_diag.traceback = error_message
my_diag.severity = 2
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__))) # Re-initialize the diagnostic
print(error_message)
# Test Functions in functions.py
try: # Test create_random_string() function
for x in range(6, 15, 2):
assert len(functions.create_random_string(x)) == x
except AssertionError as e:
error_message += ("create_random_string() test FAILED \n{} \n\n".format(e))
my_diag.traceback = error_message
my_diag.severity = 2
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
# Leaving this here in case more tests are added below
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__)))
print(error_message, my_diag)
return error_message
def test_db_integrity():
# TODO: add type annotation (waiting for rpi to upgrade to Python 3.6)
"""Tests integrity of databases
:return: Error message
:rtype: str
"""
# Integrity Checks on databases
init()
error_message = ''
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__)))
my_diag.traceback = 'test_db_integrity script'
# hist_db_integrity check
try:
hist_db_integrity = hist_db.check_integrity()
if hist_db_integrity.startswith("PASS"):
error_message += " {} \n".format(hist_db_integrity)
else:
error_message += "* *{}* \n".format(hist_db_integrity)
except Exception as e:
error_message += ("Could not do hist_db Integrity Test \n{} \n{}".format(str(e), str(type(e))))
my_diag.traceback = error_message
my_diag.severity = 2
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__))) # Re-initialize the diagnostic
print(error_message)
# soc_db_integrity check
try:
soc_db_integrity = soc_db.check_integrity()
if soc_db_integrity.startswith("PASS"):
error_message += " {} \n".format(soc_db_integrity)
else:
error_message += "* *{}* \n".format(soc_db_integrity)
except Exception as e:
error_message += ("Could not do soc_db Integrity Test \n{} \n{} \n".format(str(e), str(type(e))))
my_diag.traceback = error_message
my_diag.severity = 2
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__))) # Re-initialize the diagnostic
print(error_message)
# log_db_integrity check
try:
log_db_integrity = log_db.check_integrity()
if log_db_integrity.startswith("PASS"):
error_message += " {} \n".format(log_db_integrity)
else:
error_message += "* *{}* \n".format(log_db_integrity)
except Exception as e:
print(e)
error_message += ("Could not do log_db Integrity Test \n{} \n{} \n".format(str(e), str(type(e))))
my_diag.traceback = error_message
my_diag.severity = 2
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__))) # Re-initialize the diagnostic
print(error_message)
# jour_db_integrity check
try:
jour_db_integrity = journal_db.check_integrity()
if jour_db_integrity.startswith("PASS"):
error_message += " {} \n".format(jour_db_integrity)
else:
error_message += "* *{}* \n".format(jour_db_integrity)
except Exception as e:
error_message += ("Could not do journal_db Integrity Test\n{}\n".format(str(e), str(type(e))))
my_diag.traceback = error_message
my_diag.severity = 2
log_db.add_row_to_db(diagnostics=my_diag.make_dict(), passfail=0)
my_diag = classes.Diagnostic(script=str(os.path.basename(__file__))) # Re-initialize the diagnostic
print(error_message, my_diag)
return error_message
def check_where_in_world():
"""Check where in the world function. Make sure that file sizes are Twitter compliant
:return: Error Message
:rtype: str
"""
error_message = ''
for file in os.listdir('WW'):
fsize = os.stat('WW/' + file).st_size
try:
assert fsize <= 3200000
except AssertionError:
error_message += 'WhereWorld image files must be smaller than 3.2mb ' \
'File: {} is size {}'.format(file, int(fsize / float(1000000)))
return error_message
def check_map_contest():
"""Checks status of map contest
Status message:
How long since last contest?
How many maps are submitted currently?
:return: status message
:rtype: str
"""
message = ''
try:
map_subms = cont_db.live_count
with open('data/votingpostdata.txt', 'r') as f:
last_contest_raw = f.read()
last_contest_time = r.submission(id=last_contest_raw).created
days_since_contest = int((datetime.datetime.now().timestamp() - last_contest_time)/60/60/24)
message = "**{}** maps submitted for this month's map contest. \n \n".format(str(map_subms))
if days_since_contest > 45:
message += "It's been **{}** days since a map contest, " \
"perhaps it's time to run the voting post script. \n \n".format(days_since_contest)
else:
message += "**{}** days since the last map contest. \n \n".format(str(days_since_contest))
except Exception as e:
message += 'Could not check map contest, problem with script \n{} \n \n'.format(e)
return message
def remaining_where_in_world():
"""Return string with how many where in world maps are left
:return: Message with count of how many where in world maps are left
:rtype: str
"""
count = 0
now = datetime.datetime.now()
this_week = str(now.isocalendar()[1]).zfill(2)
two_digit_year = now.strftime('%y')
my_time = (str(two_digit_year) + str(this_week))
for file in os.listdir('WW'):
if str(file) == 'submissions.csv' or (str(file)[-3:] != 'png'):
continue
my_int = int(file[:-4])
if my_int > int(my_time):
count += 1
if count == 0:
return "**NO WHERE IN WORLD MAPS LEFT!!!** \n"
elif count <= 5:
return "**Only {} Where in World maps left!!** \nConsider adding more!! \n \n".format(count)
else:
return "Remaining Where in World Maps: {} \n \n".format(count)
if __name__ == '__main__':
init()
main()