Skip to content

Commit

Permalink
Pylint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mickeprag committed Jun 19, 2018
1 parent 328e5d3 commit a9cb82f
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 105 deletions.
1 change: 0 additions & 1 deletion scheduler/src/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
# -*- coding: utf-8 -*-

150 changes: 104 additions & 46 deletions scheduler/src/scheduler/base/Scheduler.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# -*- coding: utf-8 -*-
import copy
from datetime import date, datetime, timedelta
import random
import threading
import time
from base import Application, mainthread, Settings, Plugin, implements

from calendar import timegm
from collections import OrderedDict
from datetime import date, datetime, timedelta
from pytz import timezone
from SunCalculator import SunCalculator
from telldus import DeviceManager, Device, IDeviceChange

from base import Application, mainthread, Settings, Plugin, implements
from telldus import DeviceManager, IDeviceChange
from tellduslive.base import TelldusLive, LiveMessage, ITelldusLiveObserver

from .SunCalculator import SunCalculator

class Scheduler(Plugin):
implements(ITelldusLiveObserver, IDeviceChange)

Expand All @@ -23,12 +25,12 @@ def __init__(self):
self.maintenanceJobs = []
self.lastMaintenanceJobId = 0
self.runningJobs = {} #id:s as keys
self.s = Settings('telldus.scheduler')
self.settings = Settings('telldus.scheduler')
Application().registerShutdown(self.stop)
Application().registerMaintenanceJobHandler(self.addMaintenanceJobGeneric)
self.timezone = self.s.get('tz', 'UTC')
self.latitude = self.s.get('latitude', '55.699592')
self.longitude = self.s.get('longitude', '13.187836')
self.timezone = self.settings.get('tz', 'UTC')
self.latitude = self.settings.get('latitude', '55.699592')
self.longitude = self.settings.get('longitude', '13.187836')
self.jobs = []
self.fetchLocalJobs()
self.live = TelldusLive(self.context)
Expand Down Expand Up @@ -73,7 +75,9 @@ def calculateNextRunTime(self, job):
"""Calculates nextRunTime for a job, depending on time, weekday and timezone."""
if not job['active'] or not job['weekdays']:
job['nextRunTime'] = 253402214400 #set to max value, only run just before the end of time
self.deleteJob(job['id']) #just delete the job, until it's possible to edit schedules locally, inactive jobs has no place at all here
# just delete the job, until it's possible to edit schedules locally, inactive jobs has
# no place at all here
self.deleteJob(job['id'])
return False
today = datetime.now(timezone(self.timezone)).weekday() # normalize?
weekdays = [int(n) for n in job['weekdays'].split(',')]
Expand Down Expand Up @@ -112,32 +116,51 @@ def calculateNextRunTime(self, job):
if not runDate:
#something is wrong, no weekday to run
job['nextRunTime'] = 253402214400
self.deleteJob(job['id']) #just delete the job, until it's possible to edit schedules locally, inactive jobs has no place at all here
# just delete the job, until it's possible to edit schedules locally, inactive jobs
# has no place at all here
self.deleteJob(job['id'])
return False

job['nextRunTime'] = self.calculateRunTimeForDay(runDate, job) + random.randint(0, job['random_interval']) * 60
job['nextRunTime'] = self.calculateRunTimeForDay(runDate, job) \
+ random.randint(0, job['random_interval']) * 60
return True

def calculateNextWeekday(self, d, weekday):
days_ahead = weekday - d.weekday()

@staticmethod
def calculateNextWeekday(todayDate, weekday):
days_ahead = weekday - todayDate.weekday()
if days_ahead <= 0: # Target day already happened this week
days_ahead += 7
return d + timedelta(days_ahead)
return todayDate + timedelta(days_ahead)

def calculateRunTimeForDay(self, runDate, job):
"""Calculates and returns a timestamp for when this job should be run next. Takes timezone into consideration."""
"""
Calculates and returns a timestamp for when this job should be run next.
Takes timezone into consideration.
"""
runDate = datetime(runDate.year, runDate.month, runDate.day)
if job['type'] == 'time':
tt = timezone(self.timezone) #TODO, sending timezone from the server now, but it's really a client setting, can I get it from somewhere else?
runDate = runDate + timedelta(hours=job['hour'], minutes=job['minute']) #won't random here, since this time may also be used to see if it's passed today or not
return timegm(tt.localize(runDate).utctimetuple()) #returning a timestamp, corrected for timezone settings
# TODO, sending timezone from the server now, but it's really a client setting, can I
# get it from somewhere else?
tzone = timezone(self.timezone)
# won't random here, since this time may also be used to see if it's passed today or not
runDate = runDate + timedelta(hours=job['hour'], minutes=job['minute'])
# returning a timestamp, corrected for timezone settings
return timegm(tzone.localize(runDate).utctimetuple())
elif job['type'] == 'sunrise':
sunCalc = SunCalculator()
riseSet = sunCalc.nextRiseSet(timegm(runDate.utctimetuple()), float(self.latitude), float(self.longitude))
return riseSet['sunrise'] + job['offset'] * 60
riseSet = sunCalc.nextRiseSet(
timegm(runDate.utctimetuple()),
float(self.latitude),
float(self.longitude)
)
return riseSet['sunrise'] + job['offset'] * 60
elif job['type'] == 'sunset':
sunCalc = SunCalculator()
riseSet = sunCalc.nextRiseSet(timegm(runDate.utctimetuple()), float(self.latitude), float(self.longitude))
riseSet = sunCalc.nextRiseSet(
timegm(runDate.utctimetuple()),
float(self.latitude),
float(self.longitude)
)
return riseSet['sunset'] + job['offset'] * 60

def checkNewlyLoadedJob(self, job):
Expand All @@ -153,11 +176,18 @@ def checkNewlyLoadedJob(self, job):
if (currentDate.weekday() + 1) in weekdays:
#check for this day (today or yesterday)
runTime = self.calculateRunTimeForDay(currentDate, job)
runTimeMax = runTime + job['reps'] * 3 + job['retry_interval'] * 60 * (job['retries'] + 1) + 70 + job['random_interval'] * 60
runTimeMax = runTime + job['reps'] * 3 \
+ job['retry_interval'] * 60 * (job['retries'] + 1) \
+ 70 \
+ job['random_interval'] * 60
jobId = job['id']
executedJobs = self.s.get('executedJobs', {})
if (str(jobId) not in executedJobs or executedJobs[str(jobId)] < runTime) and time.time() > runTime and time.time() < runTimeMax:
#run time for this job was passed during downtime, but it was passed within the max-runtime, and the last time it was executed (successfully) was before this run time, so it should be run again...
executedJobs = self.settings.get('executedJobs', {})
if (str(jobId) not in executedJobs or executedJobs[str(jobId)] < runTime) \
and time.time() > runTime \
and time.time() < runTimeMax:
# run time for this job was passed during downtime, but it was passed within the
# max-runtime, and the last time it was executed (successfully) was before this
# run time, so it should be run again...
jobCopy = copy.deepcopy(job)
jobCopy['originalRepeats'] = job['reps']
jobCopy['nextRunTime'] = runTime
Expand All @@ -168,14 +198,16 @@ def checkNewlyLoadedJob(self, job):

def deleteJob(self, jobId):
with self.jobsLock:
self.jobs[:] = [x for x in self.jobs if x['id'] != jobId] #Test this! It should be fast and keep original reference, they say (though it will iterate all, even if it could end after one)
# Test this! It should be fast and keep original reference, they say (though it will
# iterate all, even if it could end after one)
self.jobs[:] = [x for x in self.jobs if x['id'] != jobId]
if jobId in self.runningJobs: #TODO this might require a lock too?
self.runningJobs[jobId]['retries'] = 0

executedJobs = self.s.get('executedJobs', {})
executedJobs = self.settings.get('executedJobs', {})
if str(jobId) in executedJobs:
del executedJobs[str(jobId)]
self.s['executedJobs'] = executedJobs
self.settings['executedJobs'] = executedJobs

def deviceRemoved(self, deviceId):
jobsToDelete = []
Expand All @@ -188,7 +220,7 @@ def deviceRemoved(self, deviceId):
def fetchLocalJobs(self):
"""Fetch local jobs from settings"""
try:
jobs = self.s.get('jobs', [])
jobs = self.settings.get('jobs', [])
except ValueError:
jobs = [] #something bad has been stored, just ignore it and continue?
print "WARNING: Could not fetch schedules from local storage"
Expand All @@ -210,7 +242,7 @@ def removeOneJob(self, msg):
scheduleDict = msg.argument(0).toNative()
jobId = scheduleDict['id']
self.deleteJob(jobId)
self.s['jobs'] = self.jobs #save to storage
self.settings['jobs'] = self.jobs #save to storage
self.live.pushToWeb('scheduler', 'removed', jobId)

@TelldusLive.handler('scheduler-report')
Expand All @@ -221,7 +253,7 @@ def receiveJobsFromServer(self, msg):
else:
scheduleDict = msg.argument(0).toNative()
jobs = scheduleDict['jobs']
self.s['jobs'] = jobs
self.settings['jobs'] = jobs
self.calculateJobs(jobs)

@TelldusLive.handler('scheduler-update')
Expand All @@ -239,8 +271,11 @@ def receiveOneJobFromServer(self, msg):
with self.jobsLock:
self.jobs.append(job)
self.jobs.sort(key=lambda job: job['nextRunTime'])
self.s['jobs'] = self.jobs #save to storage
#self.live.pushToWeb('scheduler', 'updated', job['id']) #TODO is this a good idea? Trying to avoid cache problems where updates haven't come through? But this may not work if the same schedule is saved many times in a row, or if changes wasn't saved correctly to the database (not possible yet, only one database for schedules)
self.settings['jobs'] = self.jobs #save to storage
# TODO is this a good idea? Trying to avoid cache problems where updates haven't come through?
# But this may not work if the same schedule is saved many times in a row, or if changes
# wasn't saved correctly to the database (not possible yet, only one database for schedules)
# self.live.pushToWeb('scheduler', 'updated', job['id'])

def requestJobsFromServer(self):
self.live.send(LiveMessage("scheduler-requestjob"))
Expand All @@ -264,14 +299,20 @@ def run(self):

if jobCopy:
jobCopy['originalRepeats'] = job['reps']
jobCopy['maxRunTime'] = jobCopy['nextRunTime'] + jobCopy['reps'] * 3 + jobCopy['retry_interval'] * 60 * (jobCopy['retries'] + 1) + 70 + jobCopy['random_interval'] * 60 #approximate maxRunTime, sanity check
# approximate maxRunTime, sanity check
jobCopy['maxRunTime'] = jobCopy['nextRunTime'] \
+ jobCopy['reps'] * 3 \
+ jobCopy['retry_interval'] * 60 * (jobCopy['retries'] + 1) \
+ 70 \
+ jobCopy['random_interval'] * 60
self.runningJobs[jobId] = jobCopy
self.calculateNextRunTime(job)
with self.jobsLock:
self.jobs.sort(key=lambda job: job['nextRunTime'])

jobsToRun = [] #jobs to run in a separate list, to avoid deadlocks (necessary?)
for runningJobId in self.runningJobs.keys():
jobsToRun = [] # jobs to run in a separate list, to avoid deadlocks (necessary?)
# Iterating using .keys(9 since we are modifiyng the dict while iterating
for runningJobId in self.runningJobs.keys(): # pylint: disable=C0201
runningJob = self.runningJobs[runningJobId]
if runningJob['nextRunTime'] < time.time():
if runningJob['maxRunTime'] > time.time():
Expand Down Expand Up @@ -303,18 +344,24 @@ def run(self):
for jobToRun in jobsToRun:
self.runJob(jobToRun)

time.sleep(5) # TODO decide on a time (how often should we check for jobs to run, what resolution?)
# TODO decide on a time (how often should we check for jobs to run, what resolution?)
time.sleep(5)

def stop(self):
self.running = False

def successfulJobRun(self, jobId, state, stateValue):
"""Called when job run was considered successful (acked by Z-Wave or sent away from 433), repeats should still be run"""
#save timestamp for when this was executed, to avoid rerun within maxRunTime on restart, TODO is this too much writing?
executedJobs = self.s.get('executedJobs', {})
"""
Called when job run was considered successful (acked by Z-Wave or sent away from 433),
repeats should still be run
"""
del state, stateValue
# save timestamp for when this was executed, to avoid rerun within maxRunTime on restart
# TODO is this too much writing?
executedJobs = self.settings.get('executedJobs', {})
executedJobs[str(jobId)] = time.time() #doesn't work well with int type, for some reason
self.s['executedJobs'] = executedJobs
#executedJobsTest = self.s.get('executedJobs', {})
self.settings['executedJobs'] = executedJobs
#executedJobsTest = self.settings.get('executedJobs', {})
if jobId in self.runningJobs:
self.runningJobs[jobId]['retries'] = 0

Expand All @@ -329,12 +376,23 @@ def runJob(self, jobData):
if 'value' in jobData:
value = jobData['value']

device.command(method, value=value, origin='Scheduler', success=self.successfulJobRun, callbackArgs=[jobData['id']])
device.command(
method,
value=value,
origin='Scheduler',
success=self.successfulJobRun,
callbackArgs=[jobData['id']]
)

@mainthread
def runMaintenanceJob(self, jobData):
if not jobData:
return
if jobData['recurrence']:
self.addMaintenanceJob(time.time() + jobData['recurrence'], jobData['callback'], jobData['recurrence']) # readd the job for another run
# readd the job for another run
self.addMaintenanceJob(
time.time() + jobData['recurrence'],
jobData['callback'],
jobData['recurrence']
)
jobData['callback']()
Loading

0 comments on commit a9cb82f

Please sign in to comment.