-
Notifications
You must be signed in to change notification settings - Fork 683
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #977 from heeplr/patch-1
DOCSIS status monitoring
- Loading branch information
Showing
1 changed file
with
267 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,267 @@ | ||
#!/usr/bin/env python3 | ||
|
||
""" | ||
=head1 NAME | ||
arris - MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85 | ||
and compatible cable modems | ||
=head1 DESCRIPTION | ||
Connect to the web-frontend and get current DOCSIS status of upstream and | ||
downstream channels. (Signal Power, SNR, Lock Status) | ||
=head1 REQUIREMENTS | ||
- BeautifulSoup | ||
- pycryptodome | ||
=head1 CONFIGURATION | ||
=head2 Example | ||
[arris] | ||
env.url http://192.168.100.1 | ||
env.username admin | ||
env.password yourpassword | ||
=head2 Parameters | ||
url - URL to web-frontend | ||
username - defaults to "admin" | ||
password - valid password | ||
=head1 REFERENCES | ||
https://www.arris.com/products/touchstone-tg3442-cable-voice-gateway/ | ||
=head1 AUTHOR | ||
Copyright (c) 2019 Daniel Hiepler <d-munin@coderdu.de> | ||
Copyright (c) 2004-2009 Nicolas Stransky <Nico@stransky.cx> | ||
Copyright (c) 2018 Lars Kruse <devel@sumpfralle.de> | ||
=head1 LICENSE | ||
Permission to use, copy, and modify this software with or without fee | ||
is hereby granted, provided that this entire notice is included in | ||
all source code copies of any software which is or includes a copy or | ||
modification of this software. | ||
THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRESS OR | ||
IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY | ||
REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE | ||
MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR | ||
PURPOSE. | ||
=head1 MAGIC MARKERS | ||
#%# family=contrib | ||
=cut | ||
""" | ||
|
||
import binascii | ||
from bs4 import BeautifulSoup | ||
from Crypto.Cipher import AES | ||
import hashlib | ||
import json | ||
import re | ||
import requests | ||
import sys | ||
import os | ||
|
||
|
||
def login(session, url, username, password): | ||
"""login to """ | ||
# get login page | ||
r = session.get(f"{url}") | ||
# parse HTML | ||
h = BeautifulSoup(r.text, "lxml") | ||
# get session id from javascript in head | ||
current_session_id = re.search(r".*var currentSessionId = '(.+)';.*", h.head.text)[1] | ||
|
||
# encrypt password | ||
salt = os.urandom(8) | ||
iv = os.urandom(8) | ||
key = hashlib.pbkdf2_hmac( | ||
'sha256', | ||
bytes(password.encode("ascii")), | ||
salt, | ||
iterations=1000, | ||
dklen=128/8 | ||
) | ||
secret = { "Password": password, "Nonce": current_session_id } | ||
plaintext = bytes(json.dumps(secret).encode("ascii")) | ||
associated_data = "loginPassword" | ||
# initialize cipher | ||
cipher = AES.new(key, AES.MODE_CCM, iv) | ||
# set associated data | ||
cipher.update(bytes(associated_data.encode("ascii"))) | ||
# encrypt plaintext | ||
encrypt_data = cipher.encrypt(plaintext) | ||
# append digest | ||
encrypt_data += cipher.digest() | ||
# return | ||
login_data = { | ||
'EncryptData': binascii.hexlify(encrypt_data).decode("ascii"), | ||
'Name': username, | ||
'Salt': binascii.hexlify(salt).decode("ascii"), | ||
'Iv': binascii.hexlify(iv).decode("ascii"), | ||
'AuthData': associated_data | ||
} | ||
|
||
# login | ||
r = session.put( | ||
f"{url}/php/ajaxSet_Password.php", | ||
headers={ | ||
"Content-Type": "application/json", | ||
"csrfNonce": "undefined" | ||
}, | ||
data=json.dumps(login_data) | ||
) | ||
|
||
# parse result | ||
result = json.loads(r.text) | ||
# success? | ||
if result['p_status'] == "Fail": | ||
print("login failure", file=sys.stderr) | ||
exit(-1) | ||
# remember CSRF nonce | ||
csrf_nonce = result['nonce'] | ||
|
||
# prepare headers | ||
session.headers.update({ | ||
"X-Requested-With": "XMLHttpRequest", | ||
"csrfNonce": csrf_nonce, | ||
"Origin": f"{url}/", | ||
"Referer": f"{url}/" | ||
}) | ||
# set credentials cookie | ||
session.cookies.set( | ||
"credential", | ||
"eyAidW5pcXVlIjoiMjgwb2FQU0xpRiIsICJmYW1pbHkiOiI4NTIiLCAibW9kZWxuYW1lIjoiV" | ||
"EcyNDkyTEctODUiLCAibmFtZSI6InRlY2huaWNpYW4iLCAidGVjaCI6dHJ1ZSwgIm1vY2EiOj" | ||
"AsICJ3aWZpIjo1LCAiY29uVHlwZSI6IldBTiIsICJnd1dhbiI6ImYiLCAiRGVmUGFzc3dkQ2h" | ||
"hbmdlZCI6IllFUyIgfQ==" | ||
) | ||
|
||
# set session | ||
r = session.post(f"{url}/php/ajaxSet_Session.php") | ||
|
||
|
||
def docsis_status(session): | ||
"""get current DOCSIS status page, parse and return channel data""" | ||
r = session.get(f"{url}/php/status_docsis_data.php") | ||
# extract json from javascript | ||
json_downstream_data = re.search(r".*json_dsData = (.+);.*", r.text)[1] | ||
json_upstream_data = re.search(r".*json_usData = (.+);.*", r.text)[1] | ||
# parse json | ||
downstream_data = json.loads(json_downstream_data) | ||
upstream_data = json.loads(json_upstream_data) | ||
# convert lock status to numeric values | ||
for d in [ upstream_data, downstream_data ]: | ||
for c in d: | ||
if c['LockStatus'] == "ACTIVE" or c['LockStatus'] == "Locked": | ||
c['LockStatus'] = 1 | ||
else: | ||
c['LockStatus'] = 0 | ||
return downstream_data, upstream_data | ||
|
||
|
||
# ----------------------------------------------------------------------------- | ||
if __name__ == "__main__": | ||
# get config | ||
url = os.getenv("url") | ||
username = os.getenv("username") | ||
password = os.getenv("password") | ||
# validate config | ||
if not url or not username or not password: | ||
print("Set url, username and password first.", file=sys.stderr) | ||
exit(1) | ||
# create session | ||
session = requests.Session() | ||
# login with username and password | ||
login(session, url, username, password) | ||
# get DOCSIS status | ||
downstream, upstream = docsis_status(session) | ||
# prepare munin graph info | ||
graph_descriptions = [ | ||
{ | ||
"name": "up_signal", | ||
"title": "DOCSIS Upstream signal strength", | ||
"vlabel": "dBmV", | ||
"info": "DOCSIS upstream signal strength by channel", | ||
"data": upstream, | ||
"key": "PowerLevel" | ||
}, | ||
{ | ||
"name": "up_lock", | ||
"title": "DOCSIS Upstream lock", | ||
"vlabel": "locked", | ||
"info": "DOCSIS upstream channel lock status", | ||
"data": upstream, | ||
"key": "LockStatus" | ||
}, | ||
{ | ||
"name": "down_signal", | ||
"title": "DOCSIS Downstream signal strength", | ||
"vlabel": "dBmV", | ||
"info": "DOCSIS downstream signal strength by channel", | ||
"data": downstream, | ||
"key": "PowerLevel" | ||
}, | ||
{ | ||
"name": "down_lock", | ||
"title": "DOCSIS Downstream lock", | ||
"vlabel": "locked", | ||
"info": "DOCSIS downstream channel lock status", | ||
"data": downstream, | ||
"key": "LockStatus" | ||
}, | ||
{ | ||
"name": "down_snr", | ||
"title": "DOCSIS Downstream signal/noise ratio", | ||
"vlabel": "dB", | ||
"info": "SNR/MER", | ||
"data": downstream, | ||
"key": "SNRLevel" | ||
} | ||
] | ||
|
||
# configure ? | ||
if len(sys.argv) > 1 and "config" == sys.argv[1]: | ||
# process all graphs | ||
for g in graph_descriptions: | ||
# graph config | ||
print( | ||
f"multigraph docsis_{g['name']}\n" | ||
f"graph_title {g['title']}\n" \ | ||
f"graph_category network\n" \ | ||
f"graph_vlabel {g['vlabel']}\n" \ | ||
f"graph_info {g['info']}\n" \ | ||
f"graph_scale no\n" | ||
) | ||
|
||
# channels | ||
for c in g['data']: | ||
# only use channels with PowerLevel | ||
if not c['PowerLevel']: | ||
continue | ||
print( | ||
f"channel_{c['ChannelID']}.label {c['ChannelID']} ({c['Frequency']} MHz)\n" | ||
f"channel_{c['ChannelID']}.info Channel type: {c['ChannelType']}, Modulation: {c['Modulation']}" | ||
) | ||
|
||
# output values ? | ||
else: | ||
# process all graphs | ||
for g in graph_descriptions: | ||
print(f"multigraph docsis_{g['name']}") | ||
# channels | ||
for c in g['data']: | ||
# only use channels with PowerLevel | ||
if not c['PowerLevel']: | ||
continue | ||
print(f"channel_{c['ChannelID']}.value {c[g['key']]}") |