/
hikvision.py
executable file
·679 lines (571 loc) · 26.4 KB
/
hikvision.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
"""
pyhik.hikvision
~~~~~~~~~~~~~~~~~~~~
Provides api for Hikvision events
Copyright (c) 2016-2021 John Mihalic <https://github.com/mezz64>
Licensed under the MIT license.
Based on the following api documentation:
System:
http://oversea-download.hikvision.com/uploadfile/Leaflet/ISAPI/HIKVISION%20ISAPI_2.0-IPMD%20Service.pdf
Imaging:
http://oversea-download.hikvision.com/uploadfile/Leaflet/ISAPI/HIKVISION%20ISAPI_2.0-Image%20Service.pdf
"""
import time
import datetime
import logging
import uuid
try:
import xml.etree.cElementTree as ET
except ImportError:
import xml.etree.ElementTree as ET
import threading
import requests
from requests.auth import HTTPDigestAuth
# Make pydispatcher optional to support legacy implentations
# New usage should implement the event_callback
try:
from pydispatch import dispatcher
except ImportError:
dispatcher = None
from pyhik.watchdog import Watchdog
from pyhik.constants import (
DEFAULT_PORT, DEFAULT_HEADERS, XML_NAMESPACE, SENSOR_MAP,
CAM_DEVICE, NVR_DEVICE, CONNECT_TIMEOUT, READ_TIMEOUT, CONTEXT_INFO,
CONTEXT_TRIG, CONTEXT_MOTION, CONTEXT_ALERT, CHANNEL_NAMES, ID_TYPES,
__version__)
_LOGGING = logging.getLogger(__name__)
# Hide nuisance requests logging
logging.getLogger('urllib3').setLevel(logging.ERROR)
"""
Things still to do:
- Support status of day/night and switching
IR switch URL:
http://X.X.X.X/ISAPI/Image/channels/1/ircutFilter
report IR status and allow
"""
# pylint: disable=too-many-instance-attributes
class HikCamera(object):
"""Creates a new Hikvision api device."""
def __init__(self, host=None, port=DEFAULT_PORT,
usr=None, pwd=None, verify_ssl=True):
"""Initialize device."""
_LOGGING.debug("pyHik %s initializing new hikvision device at: %s",
__version__, host)
self.event_states = {}
self.watchdog = Watchdog(300.0, self.watchdog_handler)
if not host:
_LOGGING.error('Host not specified! Cannot continue.')
return
self.host = host
self.usr = usr
self.pwd = pwd
self.cam_id = 0
self.name = ''
self.device_type = None
self.motion_detection = None
self._motion_detection_xml = None
self.root_url = '{}:{}'.format(host, port)
self.namespace = {
CONTEXT_INFO: None,
CONTEXT_TRIG: None,
CONTEXT_ALERT: None,
CONTEXT_MOTION: None
}
# Build requests session for main thread calls
# Default to basic authentication. It will change to digest inside
# get_device_info if basic fails
self.hik_request = requests.Session()
self.hik_request.verify = verify_ssl
self.hik_request.auth = (usr, pwd)
self.hik_request.headers.update(DEFAULT_HEADERS)
# Define event stream processing thread
self.kill_thrd = threading.Event()
self.reset_thrd = threading.Event()
self.thrd = threading.Thread(
target=self.alert_stream, args=(self.reset_thrd, self.kill_thrd,))
self.thrd.daemon = False
# Callbacks
self._updateCallbacks = []
self.initialize()
@property
def get_id(self):
"""Returns unique camera/nvr identifier."""
return self.cam_id
@property
def get_name(self):
"""Return camera/nvr name."""
return self.name
@property
def get_type(self):
"""Return device type."""
return self.device_type
@property
def current_event_states(self):
"""Return Event states dictionary"""
return self.event_states
@property
def current_motion_detection_state(self):
"""Return current state of motion detection property"""
return self.motion_detection
def get_motion_detection(self):
"""Fetch current motion state from camera"""
url = ('%s/ISAPI/System/Video/inputs/'
'channels/1/motionDetection') % self.root_url
try:
response = self.hik_request.get(url, timeout=CONNECT_TIMEOUT)
except (requests.exceptions.RequestException,
requests.exceptions.ConnectionError) as err:
_LOGGING.error('Unable to fetch MotionDetection, error: %s', err)
self.motion_detection = None
return self.motion_detection
if response.status_code == requests.codes.unauthorized:
_LOGGING.error('Authentication failed')
self.motion_detection = None
return self.motion_detection
if response.status_code != requests.codes.ok:
# If we didn't receive 200, abort
_LOGGING.debug('Unable to fetch motion detection.')
self.motion_detection = None
return self.motion_detection
try:
tree = ET.fromstring(response.text)
self.fetch_namespace(tree, CONTEXT_MOTION)
enabled = tree.find(self.element_query('enabled', CONTEXT_MOTION))
if enabled is not None:
self._motion_detection_xml = tree
self.motion_detection = {'true': True, 'false': False}[enabled.text]
return self.motion_detection
except AttributeError as err:
_LOGGING.error('Entire response: %s', response.text)
_LOGGING.error('There was a problem: %s', err)
self.motion_detection = None
return self.motion_detection
def enable_motion_detection(self):
"""Enable motion detection"""
self._set_motion_detection(True)
def disable_motion_detection(self):
"""Disable motion detection"""
self._set_motion_detection(False)
def _set_motion_detection(self, enable):
"""Set desired motion detection state on camera"""
url = ('%s/ISAPI/System/Video/inputs/'
'channels/1/motionDetection') % self.root_url
enabled = self._motion_detection_xml.find(self.element_query('enabled', CONTEXT_MOTION))
if enabled is None:
_LOGGING.error("Couldn't find 'enabled' in the xml")
_LOGGING.error('XML: %s', ET.tostring(self._motion_detection_xml))
return
enabled.text = 'true' if enable else 'false'
xml = ET.tostring(self._motion_detection_xml)
try:
response = self.hik_request.put(url, data=xml, timeout=CONNECT_TIMEOUT)
except (requests.exceptions.RequestException,
requests.exceptions.ConnectionError) as err:
_LOGGING.error('Unable to set MotionDetection, error: %s', err)
return
if response.status_code == requests.codes.unauthorized:
_LOGGING.error('Authentication failed')
return
if response.status_code != requests.codes.ok:
# If we didn't receive 200, abort
_LOGGING.error('Unable to set motion detection: %s', response.text)
self.motion_detection = enable
def add_update_callback(self, callback, sensor):
"""Register as callback for when a matching device sensor changes."""
self._updateCallbacks.append([callback, sensor])
_LOGGING.debug('Added update callback to %s on %s', callback, sensor)
def _do_update_callback(self, msg):
"""Call registered callback functions."""
for callback, sensor in self._updateCallbacks:
if sensor == msg:
_LOGGING.debug('Update callback %s for sensor %s',
callback, sensor)
callback(msg)
def element_query(self, element, context):
"""Build tree query for a given element and context."""
if context == CONTEXT_INFO:
return '{%s}%s' % (self.namespace[CONTEXT_INFO], element)
elif context == CONTEXT_TRIG:
return '{%s}%s' % (self.namespace[CONTEXT_TRIG], element)
elif context == CONTEXT_ALERT:
return '{%s}%s' % (self.namespace[CONTEXT_ALERT], element)
elif context == CONTEXT_MOTION:
return '{%s}%s' % (self.namespace[CONTEXT_MOTION], element)
else:
return '{%s}%s' % (XML_NAMESPACE, element)
def fetch_namespace(self, tree, context):
"""Determine proper namespace to find given element."""
if context == CONTEXT_INFO:
nmsp = tree.tag.split('}')[0].strip('{')
self.namespace[CONTEXT_INFO] = nmsp if nmsp.startswith('http') else XML_NAMESPACE
_LOGGING.debug('Device info namespace: %s', self.namespace[CONTEXT_INFO])
elif context == CONTEXT_TRIG:
try:
# For triggers we *typically* only care about the sub-namespace
nmsp = tree[0][1].tag.split('}')[0].strip('{')
except IndexError:
# If get a index error check on top level
nmsp = tree.tag.split('}')[0].strip('{')
self.namespace[CONTEXT_TRIG] = nmsp if nmsp.startswith('http') else XML_NAMESPACE
_LOGGING.debug('Device triggers namespace: %s', self.namespace[CONTEXT_TRIG])
elif context == CONTEXT_ALERT:
nmsp = tree.tag.split('}')[0].strip('{')
self.namespace[CONTEXT_ALERT] = nmsp if nmsp.startswith('http') else XML_NAMESPACE
_LOGGING.debug('Device alerts namespace: %s', self.namespace[CONTEXT_ALERT])
elif context == CONTEXT_MOTION:
nmsp = tree.tag.split('}')[0].strip('{')
self.namespace[CONTEXT_MOTION] = nmsp if nmsp.startswith('http') else XML_NAMESPACE
_LOGGING.debug('Device motion namespace: %s', self.namespace[CONTEXT_MOTION])
def initialize(self):
"""Initialize deviceInfo and available events."""
device_info = self.get_device_info()
if device_info is None:
self.name = None
self.cam_id = None
self.event_states = None
return
for key in device_info:
if key == 'deviceName':
self.name = device_info[key]
elif key == 'deviceID':
if len(device_info[key]) > 10:
self.cam_id = device_info[key]
else:
self.cam_id = uuid.uuid4()
events_available = self.get_event_triggers()
if events_available:
for event, channel_list in events_available.items():
for channel in channel_list:
try:
# Tracking videoloss events causes problems since they are used
# as the watchdog so ignore them if they are enabled in the triggers.
if event.lower() != 'videoloss':
self.event_states.setdefault(
SENSOR_MAP[event.lower()], []).append(
[False, channel, 0, datetime.datetime.now()])
except KeyError:
# Sensor type doesn't have a known friendly name
# We can't reliably handle it at this time...
_LOGGING.warning(
'Sensor type "%s" is unsupported.', event)
_LOGGING.debug('Initialized Dictionary: %s', self.event_states)
else:
_LOGGING.debug('No Events available in dictionary.')
self.get_motion_detection()
def get_device_info(self):
"""Parse deviceInfo into dictionary."""
device_info = {}
url = '%s/ISAPI/System/deviceInfo' % self.root_url
using_digest = False
try:
response = self.hik_request.get(url, timeout=CONNECT_TIMEOUT)
if response.status_code == requests.codes.unauthorized:
_LOGGING.debug('Basic authentication failed. Using digest.')
self.hik_request.auth = HTTPDigestAuth(self.usr, self.pwd)
using_digest = True
response = self.hik_request.get(url)
if response.status_code == requests.codes.not_found:
# Try alternate URL for deviceInfo
_LOGGING.debug('Using alternate deviceInfo URL.')
url = '%s/System/deviceInfo' % self.root_url
response = self.hik_request.get(url)
# Seems to be difference between camera and nvr, they can't seem to
# agree if they should 404 or 401 first
if not using_digest and response.status_code == requests.codes.unauthorized:
_LOGGING.debug('Basic authentication failed. Using digest.')
self.hik_request.auth = HTTPDigestAuth(self.usr, self.pwd)
using_digest = True
response = self.hik_request.get(url)
except (requests.exceptions.RequestException,
requests.exceptions.ConnectionError) as err:
_LOGGING.error('Unable to fetch deviceInfo, error: %s', err)
return None
if response.status_code == requests.codes.unauthorized:
_LOGGING.error('Authentication failed')
return None
if response.status_code != requests.codes.ok:
# If we didn't receive 200, abort
_LOGGING.debug('Unable to fetch device info.')
return None
try:
tree = ET.fromstring(response.text)
self.fetch_namespace(tree, CONTEXT_INFO)
for item in tree:
tag = item.tag.split('}')[1]
device_info[tag] = item.text
return device_info
except AttributeError as err:
_LOGGING.error('Entire response: %s', response.text)
_LOGGING.error('There was a problem: %s', err)
return None
def get_event_triggers(self):
"""
Returns dict of supported events.
Key = Event Type
List = Channels that have that event activated
"""
events = {}
nvrflag = False
event_xml = []
# different firmware versions support different endpoints.
urls = (
'%s/ISAPI/Event/triggers', # ISAPI v2.0+
'%s/Event/triggers', # Old devices?
)
response = {}
for url in urls:
try:
response = self.hik_request.get(url % self.root_url, timeout=CONNECT_TIMEOUT)
if response.status_code != requests.codes.ok:
# Try next alternate URL for triggers
_LOGGING.debug('Trying alternate triggers URL.')
continue
except (requests.exceptions.RequestException,
requests.exceptions.ConnectionError) as err:
_LOGGING.error('Unable to fetch events, error: %s', err)
return None
break
else:
_LOGGING.error('Unable to fetch events. '
'Device firmware may be old/bad.')
return None
# pylint: disable=too-many-nested-blocks
try:
content = ET.fromstring(response.text)
self.fetch_namespace(content, CONTEXT_TRIG)
if content[0].find(self.element_query('EventTrigger', CONTEXT_TRIG)):
event_xml = content[0].findall(
self.element_query('EventTrigger', CONTEXT_TRIG))
elif content.find(self.element_query('EventTrigger', CONTEXT_TRIG)):
# This is either an NVR or a rebadged camera
event_xml = content.findall(
self.element_query('EventTrigger', CONTEXT_TRIG))
for eventtrigger in event_xml:
ettype = eventtrigger.find(self.element_query('eventType', CONTEXT_TRIG))
# Catch empty xml defintions
if ettype is None:
break
etnotify = eventtrigger.find(
self.element_query('EventTriggerNotificationList', CONTEXT_TRIG))
etchannel = None
etchannel_num = 0
for node_name in CHANNEL_NAMES:
etchannel = eventtrigger.find(
self.element_query(node_name, CONTEXT_TRIG))
if etchannel is not None:
try:
# Need to make sure this is actually a number
etchannel_num = int(etchannel.text)
if etchannel_num > 1:
# Must be an nvr
nvrflag = True
break
except ValueError:
# Field must not be an integer
pass
if etnotify:
for notifytrigger in etnotify:
ntype = notifytrigger.find(
self.element_query('notificationMethod', CONTEXT_TRIG))
if ntype.text == 'center' or ntype.text == 'HTTP':
"""
If we got this far we found an event that we want
to track.
"""
events.setdefault(ettype.text, []) \
.append(etchannel_num)
except (AttributeError, ET.ParseError) as err:
_LOGGING.error(
'There was a problem finding an element: %s', err)
return None
if nvrflag:
self.device_type = NVR_DEVICE
else:
self.device_type = CAM_DEVICE
_LOGGING.debug('Processed %s as %s Device.',
self.cam_id, self.device_type)
_LOGGING.debug('Found events: %s', events)
self.hik_request.close()
return events
def watchdog_handler(self):
"""Take care of threads if wachdog expires."""
_LOGGING.debug('%s Watchdog expired. Resetting connection.', self.name)
self.watchdog.stop()
self.reset_thrd.set()
def disconnect(self):
"""Disconnect from event stream."""
_LOGGING.debug('Disconnecting from stream: %s', self.name)
self.kill_thrd.set()
self.thrd.join()
_LOGGING.debug('Event stream thread for %s is stopped', self.name)
self.kill_thrd.clear()
def start_stream(self):
"""Start thread to process event stream."""
# self.watchdog.start()
self.thrd.start()
def alert_stream(self, reset_event, kill_event):
"""Open event stream."""
_LOGGING.debug('Stream Thread Started: %s, %s', self.name, self.cam_id)
start_event = False
parse_string = ""
fail_count = 0
url = '%s/ISAPI/Event/notification/alertStream' % self.root_url
# pylint: disable=too-many-nested-blocks
while True:
try:
stream = self.hik_request.get(url, stream=True,
timeout=(CONNECT_TIMEOUT,
READ_TIMEOUT))
if stream.status_code == requests.codes.not_found:
# Try alternate URL for stream
url = '%s/Event/notification/alertStream' % self.root_url
stream = self.hik_request.get(url, stream=True)
if stream.status_code != requests.codes.ok:
raise ValueError('Connection unsucessful.')
else:
_LOGGING.debug('%s Connection Successful.', self.name)
fail_count = 0
self.watchdog.start()
for line in stream.iter_lines():
# _LOGGING.debug('Processing line from %s', self.name)
# filter out keep-alive new lines
if line:
str_line = line.decode("utf-8", "ignore")
# New events start with --boundry
if str_line.find('<EventNotificationAlert') != -1:
# Start of event message
start_event = True
parse_string = str_line
elif str_line.find('</EventNotificationAlert>') != -1:
# Message end found found
parse_string += str_line
start_event = False
if parse_string:
try:
tree = ET.fromstring(parse_string)
self.process_stream(tree)
self.update_stale()
except ET.ParseError as err:
_LOGGING.warning('XML parse error in stream.')
parse_string = ""
else:
if start_event:
parse_string += str_line
if kill_event.is_set():
# We were asked to stop the thread so lets do so.
break
elif reset_event.is_set():
# We need to reset the connection.
raise ValueError('Watchdog failed.')
if kill_event.is_set():
# We were asked to stop the thread so lets do so.
_LOGGING.debug('Stopping event stream thread for %s',
self.name)
self.watchdog.stop()
self.hik_request.close()
return
elif reset_event.is_set():
# We need to reset the connection.
raise ValueError('Watchdog failed.')
except (ValueError,
requests.exceptions.ConnectionError,
requests.exceptions.ChunkedEncodingError) as err:
fail_count += 1
reset_event.clear()
_LOGGING.warning('%s Connection Failed (count=%d). Waiting %ss. Err: %s',
self.name, fail_count, (fail_count * 5) + 5, err)
parse_string = ""
self.watchdog.stop()
self.hik_request.close()
time.sleep(5)
self.update_stale()
time.sleep(fail_count * 5)
continue
def process_stream(self, tree):
"""Process incoming event stream packets."""
if not self.namespace[CONTEXT_ALERT]:
self.fetch_namespace(tree, CONTEXT_ALERT)
try:
etype = SENSOR_MAP[tree.find(
self.element_query('eventType', CONTEXT_ALERT)).text.lower()]
# Since this pasing is different and not really usefull for now, just return without error.
if len(etype) > 0 and etype == 'Ongoing Events':
return
estate = tree.find(
self.element_query('eventState', CONTEXT_ALERT)).text
for idtype in ID_TYPES:
echid = tree.find(self.element_query(idtype, CONTEXT_ALERT))
if echid is not None:
try:
# Need to make sure this is actually a number
echid = int(echid.text)
break
except (ValueError, TypeError) as err:
# Field must not be an integer or is blank
pass
ecount = tree.find(
self.element_query('activePostCount', CONTEXT_ALERT)).text
except (AttributeError, KeyError, IndexError) as err:
_LOGGING.error('Problem finding attribute: %s', err)
return
# Take care of keep-alive
if len(etype) > 0 and etype == 'Video Loss':
self.watchdog.pet()
# Track state if it's in the event list.
if len(etype) > 0:
state = self.fetch_attributes(etype, echid)
if state:
# Determine if state has changed
# If so, publish, otherwise do nothing
estate = (estate == 'active')
old_state = state[0]
attr = [estate, echid, int(ecount),
datetime.datetime.now()]
self.update_attributes(etype, echid, attr)
if estate != old_state:
self.publish_changes(etype, echid)
self.watchdog.pet()
def update_stale(self):
"""Update stale active statuses"""
# Some events don't post an inactive XML, only active.
# If we don't get an active update for 5 seconds we can
# assume the event is no longer active and update accordingly.
for etype, echannels in self.event_states.items():
for eprop in echannels:
if eprop[3] is not None:
sec_elap = ((datetime.datetime.now()-eprop[3])
.total_seconds())
# print('Seconds since last update: {}'.format(sec_elap))
if sec_elap > 5 and eprop[0] is True:
_LOGGING.debug('Updating stale event %s on CH(%s)',
etype, eprop[1])
attr = [False, eprop[1], eprop[2],
datetime.datetime.now()]
self.update_attributes(etype, eprop[1], attr)
self.publish_changes(etype, eprop[1])
def publish_changes(self, etype, echid):
"""Post updates for specified event type."""
_LOGGING.debug('%s Update: %s, %s',
self.name, etype, self.fetch_attributes(etype, echid))
signal = 'ValueChanged.{}'.format(self.cam_id)
sender = '{}.{}'.format(etype, echid)
if dispatcher:
dispatcher.send(signal=signal, sender=sender)
self._do_update_callback('{}.{}.{}'.format(self.cam_id, etype, echid))
def fetch_attributes(self, event, channel):
"""Returns attribute list for a given event/channel."""
try:
for sensor in self.event_states[event]:
if sensor[1] == int(channel):
return sensor
except KeyError:
return None
def update_attributes(self, event, channel, attr):
"""Update attribute list for current event/channel."""
try:
for i, sensor in enumerate(self.event_states[event]):
if sensor[1] == int(channel):
self.event_states[event][i] = attr
except KeyError:
_LOGGING.debug('Error updating attributes for: (%s, %s)',
event, channel)