/
THB.py
executable file
·267 lines (229 loc) · 11.2 KB
/
THB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
#! /usr/bin/env python
# THB.py
# Skeletorfw
# 30/04/17
# Version 1.0.1
#
# Python 3.4.1
#
# Bot to pull askreddit threads and trend popularity over time
# import os # Will need for log rotation if done in here.
import csv
import logging
import time
from datetime import datetime
from operator import itemgetter
# V2 Multithreaded refactor
import threading
import sched
import sqlite3
from os.path import isfile
from queue import Queue
from random import randint
import praw
from prawcore import RequestException
debugmode = False
# Set up logging
# logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create logfile handler
handler = logging.FileHandler('log/THB.out')
handler.setLevel(logging.INFO) # File logging level
if debugmode:
handler.setLevel(logging.DEBUG) # File logging level
# Create formatter and add to handler
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(funcName)s - %(message)s')
handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(handler)
# BOT CONFIG
reddit = praw.Reddit('bot1')
sr = reddit.subreddit("askreddit")
base_threads_to_pull = 5 # Calculated that over a 1/10m every 24h (288 runs) this should never hit 500
maxthreads = 500
threadpull_mod = 10
archiveage = 86400 # 24h = 86400s
dbpath = "data/THB_db.sqlite3"
# DB setup
if not isfile(dbpath):
# Create db!
logger.warning("DB not found at {}!".format(dbpath))
logger.warning("Creating new DB file and setting up tables...")
with open("tools/create_THB_db.sql", "r") as f:
createscript = f.read()
db = sqlite3.connect(dbpath)
cursor = db.cursor()
cursor.executescript(createscript)
db.commit()
logger.warning("DB successfully created.")
db.close()
# Set up FIFO thread queue for db commits
q = Queue()
##### THB V2
s = sched.scheduler(time.time, time.sleep)
class SubmissionPoller(threading.Thread):
def __init__(self, submission_id, run_number, q, timeout=300, daemon=True): # Daemon is true for testing
threading.Thread.__init__(self)
self.name = submission_id # Thread name (inbuilt)
self.submission_id = submission_id # Reddit post submission ID
self.run_number = run_number # Run number (1 - 143) - t0 is set when the thread is initially found
self.q = q # Queue object for threads to add their submissions to.
self.timeout = timeout # How long before the run func gives up on polling reddit and schedules the next job anyway
self.daemon = daemon # Is this a daemon thread?
self.starttime = time.time()
self.submission_data = None
def run(self):
logger.debug("Checking submission {}, run {}/144".format(self.submission_id, self.run_number))
self.checkstats()
def checkstats(self):
successful = False
while True:
try:
# Try to retrieve data
self.submission_data = reddit.submission(self.submission_id)
successful = True
break
except RequestException:
logger.exception("Request exception for {}".format(self.submission_id))
if time.time() - self.starttime >= self.timeout:
logger.warning("Run {}/144 of submission {} timed out!"
.format(self.run_number + 1, self.submission_id))
break
else:
time.sleep(5)
if successful:
# commit to db
logger.info("{} at run {}/144: Score:{}, Comments:{}".format(self.submission_id, self.run_number, self.submission_data.score, self.submission_data.num_comments))
self.write_to_db_queue()
# Schedule next job for starttime + 10m
if self.run_number < 144:
s.enterabs(self.starttime + 600, 1, launch_submissionpoller, argument=(self.submission_id,
self.run_number + 1,
self.q,
self.timeout,
self.daemon))
logger.debug("Submitted poll {} for {}".format(self.run_number + 1, self.submission_id))
else:
logger.info("Completed tracking of {}".format(self.submission_id))
def write_to_db_queue(self):
# Check if self.submission_data is filled
logger.debug("Writing data for {} from run {}/144 to db queue...".format(self.submission_id, self.run_number))
self.q.put({"newentry": False,
"id": self.submission_id,
"c_name": "c_t{}".format(self.run_number),
"comments": self.submission_data.num_comments,
"s_name": "s_t{}".format(self.run_number),
"score": self.submission_data.score})
class SubmissionGetter(threading.Thread):
def __init__(self, subreddit, db_queue, name="submissiongetter", daemon=True):
threading.Thread.__init__(self)
self.name = name # Thread name (inbuilt)
self.subreddit = subreddit
self.q = db_queue
self.daemon = daemon # Is this a daemon thread?
def run(self):
logger.info("Starting SubmissionGetter thread")
logger.info("Tracking {}".format(self.subreddit.display_name))
subcounter = 0
while True:
try:
for submission in self.subreddit.stream.submissions(skip_existing=True):
print(subcounter)
if subcounter % threadpull_mod == 0 and len(s.queue) < maxthreads:
logger.info("Tracking ID: {}, Title: {}, Submitted: {}\nTracking {} threads"
.format(submission.id,
submission.title,
datetime.utcfromtimestamp(submission.created_utc).strftime('%Y-%m-%d %H:%M:%S'),
len(s.queue)))
self.q.put({"newentry": True,
"id": submission.id,
"title": submission.title,
"author": submission.author.name,
"ts": submission.created_utc,
"comments": submission.num_comments,
"score": submission.score})
s.enterabs(time.time() + 600, 1, launch_submissionpoller, argument=(submission.id,
1,
self.q,
300,
False))
logger.debug("Submitted next poll for {} in 10m, Now watching {} threads"
.format(submission.id, len(s.queue) - 1))
else:
logger.debug("Skipping submission.")
subcounter += 1
except RequestException:
logger.exception("Request exception occurred when streaming posts. Restarting stream.")
except Exception:
logger.exception("Unhandled exception occurred when streaming posts. Restarting stream.")
class DBWriter(threading.Thread):
def __init__(self, db_queue, db_path, delaytime=300, name="DBWriter", daemon=None):
threading.Thread.__init__(self)
self.q = db_queue
self.dbpath = db_path
self.name = name # Thread name (inbuilt)
self.delaytime = delaytime # Delay time between db runs in seconds
self.daemon = daemon # Is this a daemon thread?
def run(self):
logger.info("Starting db commit run")
if q.qsize() > 0:
logger.info("Committing {} elements to the db".format(q.qsize()))
tocommit = [q.get_nowait() for x in range(q.qsize())]
newentries = [x for x in tocommit if x["newentry"] is True]
updateentries = [x for x in tocommit if x["newentry"] is False]
db = sqlite3.connect(self.dbpath)
cur = db.cursor()
try:
if newentries:
logger.debug("Writing {} new entries to db".format(len(newentries)))
cur.executemany(''' INSERT INTO submissions(id, title, author, date, c_t0, s_t0)
VALUES(:id, :title, :author, :ts, :comments, :score)''',
newentries)
db.commit()
if updateentries:
logger.debug("Updating {} entries in db".format(len(updateentries)))
for update in updateentries:
try:
cur.execute(''' UPDATE submissions SET {} = :comments, {} = :score WHERE id=:id'''
.format(update["c_name"], update["s_name"]),
update)
db.commit()
except Exception:
logger.exception("Failed to update {} in db.".format(update["id"]))
db.rollback()
except Exception:
logger.exception("An unhandled exception occurred when writing to the db.")
db.rollback()
finally:
db.close()
else:
logger.info("Nothing to commit")
self.reschedule()
def reschedule(self):
# logger.debug("Time is {}, Launching next db run at {}".format(time.time(), time.time() + self.delaytime))
s.enterabs(time.time() + self.delaytime, 1, launch_dbwriter, argument=(self.q,
self.dbpath,
self.delaytime,
self.name,
self.daemon))
def launch_submissionpoller(submission_id, run_number, queue, timeout, daemon):
thread = SubmissionPoller(submission_id, run_number, queue, timeout, daemon)
thread.start()
def launch_dbwriter(db_queue, db_path, delaytime, name, daemon):
thread = DBWriter(db_queue, db_path, delaytime, name, daemon)
thread.start()
def main():
logger.info("Starting THBv2")
submissiongetter = SubmissionGetter(sr, q)
submissiongetter.start()
dbwrite = DBWriter(q, dbpath, delaytime=60)
dbwrite.start()
while True:
try:
s.run(blocking=False)
time.sleep(1)
except KeyboardInterrupt:
print("Exiting THB...")
break
main()