Skip to content

Commit

Permalink
Use the database connection class in the scheduler. #483
Browse files Browse the repository at this point in the history
  • Loading branch information
mfeit-internet2 committed Jan 18, 2020
1 parent 142d2c5 commit b2a84de
Showing 1 changed file with 31 additions and 52 deletions.
83 changes: 31 additions & 52 deletions pscheduler-server/pscheduler-server/daemons/scheduler
Expand Up @@ -781,10 +781,8 @@ class SchedulerWorker(object):
self.task, str(ex))

try:
with self.pg.cursor() as cursor:
cursor.execute(
"""SELECT api_run_post(%s, %s, NULL, %s)""",
[self.task, self.trynext, str(ex)])
self.pg.query("""SELECT api_run_post(%s, %s, NULL, %s)""",
[self.task, self.trynext, str(ex)])
except Exception as nsex:
log.error("%d: Insertion of non-starter failed: %s",
self.number, str(nsex))
Expand All @@ -808,19 +806,16 @@ def main_program():

# TODO: All DB transactions need to be error checked

pg = pscheduler.pg_connection(dsn)
with pg.cursor() as cursor:
cursor.execute("LISTEN task_change")

with pg.cursor() as cursor:
cursor.execute("SELECT heartbeat_boot('scheduler')")
pg = pscheduler.PgConnection(dsn, name="scheduler")
pg.listen("task_change")
pg.query("SELECT heartbeat_boot('scheduler')")

# This is separate so non-start activity doesn't get queued up
# behind the connection that's doing the work in this function's
# loop. Having the non-starers queue up isn't a big deal because
# they're quick.

nonstart_pg = pscheduler.pg_connection(dsn)
nonstart_pg = pscheduler.PgConnection(dsn)

# What workers are running
max_workers = options.max_parallel
Expand All @@ -835,28 +830,16 @@ def main_program():

if wait:

with pg.cursor() as cursor:
cursor.execute("SELECT heartbeat('scheduler', %s)", [refresh])
pg.query("SELECT heartbeat('scheduler', %s)", [refresh])

log.debug("Waiting.")

try:
if pscheduler.polled_select(
[pg], [], [],
pscheduler.timedelta_as_seconds(refresh)) \
!= ([], [], []):
# Notified
log.debug("Task change")
pg.poll()
del pg.notifies[:]

except select.error as ex:
err_no, message = ex
if err_no != errno.EINTR:
log.exception()
raise ex
else:
log.debug("Interrupted during select")
if pg.wait(pscheduler.timedelta_as_seconds(refresh)):
log.debug("Task change.")
# Discard pending notifications
print(pg.notifications())
# _ = pg.notifications()
else:
log.debug("Timed out.")

else:

Expand Down Expand Up @@ -890,29 +873,23 @@ def main_program():
log.debug("Limiting query to %d tasks", limit)
query += " LIMIT %d" % (limit)

for row in pg.query(query, args):

with pg.cursor() as cursor:
cursor.execute(query, args)

# Check if any notifications arrived while this query executed.
if pg.notifies:
wait = False
del pg.notifies[:]

for row in cursor:
number, task, key, runs, trynext, anytime, json, participants = row

number, task, key, runs, trynext, anytime, json, participants = row
with workers_lock:
if number not in workers:
worker = SchedulerWorker(number, nonstart_pg, log,
workers, workers_lock, task, key,
runs, trynext, anytime, json,
participants)
log.debug("Starting thread for ID %d", number)
workers[number] = worker
worker.start()
wait = False

with workers_lock:
if number not in workers:
worker = SchedulerWorker(number, nonstart_pg, log,
workers, workers_lock, task, key,
runs, trynext, anytime, json,
participants)
log.debug("Starting thread for ID %d", number)
workers[number] = worker
worker.start()
wait = False
if pg.notifications():
wait = False



Expand All @@ -922,5 +899,7 @@ if options.daemon:
with daemon.DaemonContext(pidfile=pidfile):
pscheduler.safe_run(lambda: main_program())
else:
pscheduler.safe_run(lambda: main_program())
# TODO: Re-enable this.
#pscheduler.safe_run(lambda: main_program())
main_program()

0 comments on commit b2a84de

Please sign in to comment.