Skip to content

Commit

Permalink
[ruijie] Replace os.system and remove subprocess with shell=True (#12107
Browse files Browse the repository at this point in the history
)

Signed-off-by: maipbui <maibui@microsoft.com>
Dependency: [#12065
#### Why I did it
1. `getstatusoutput` is used without a static string and it uses `shell=True`
2. `subprocess()` - when using with `shell=True` is dangerous. Using subprocess function without a static string can lead to command injection.
3. `os` - not secure against maliciously constructed input and dangerous if used to evaluate dynamic content.
#### How I did it
1. use `getstatusoutput` without shell=True
2. `subprocess()` - use `shell=False` instead. use an array string. Ref: [https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation](https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation)
3. `os` - use with `subprocess`
  • Loading branch information
maipbui authored and StormLiangMS committed Dec 10, 2022
1 parent 48d4c0a commit 5238bd7
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ def get_pcie_device(self):
pciList = []
p1 = "^(\w+):(\w+)\.(\w)\s(.*)\s*\(*.*\)*"
p2 = "^.*:.*:.*:(\w+)\s*\(*.*\)*"
command1 = "sudo lspci"
command2 = "sudo lspci -n"
command1 = ["sudo", "lspci"]
command2 = ["sudo", "lspci", "-n"]
# run command 1
proc1 = subprocess.Popen(command1, shell=True, universal_newlines=True, stdout=subprocess.PIPE)
proc1 = subprocess.Popen(command1, universal_newlines=True, stdout=subprocess.PIPE)
output1 = proc1.stdout.readlines()
proc1.communicate()
# run command 2
proc2 = subprocess.Popen(command2, shell=True, universal_newlines=True, stdout=subprocess.PIPE)
proc2 = subprocess.Popen(command2, universal_newlines=True, stdout=subprocess.PIPE)
output2 = proc2.stdout.readlines()
proc2.communicate()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

try:
import time
import subprocess
from sonic_platform_base.chassis_base import ChassisBase
from sonic_platform.common import Common
from sonic_platform.sfp import Sfp
from sonic_platform.sfp import PORT_START
from sonic_platform.sfp import PORTS_IN_BLOCK
from sonic_platform.logger import logger
from sonic_py_common.general import getstatusoutput_noshell
except ImportError as e:
raise ImportError(str(e) + "- required module not found")

Expand All @@ -36,17 +36,17 @@ def __init__(self):
self.SFP_STATUS_INSERTED = "1"
self.SFP_STATUS_REMOVED = "0"
self.port_dict = {}
self.enable_read= "i2cset -f -y 2 0x35 0x2a 0x01"
self.disable_read = "i2cset -f -y 2 0x35 0x2a 0x00"
self.enable_write = "i2cset -f -y 2 0x35 0x2b 0x00"
self.disable_write = "i2cset -f -y 2 0x35 0x2b 0x01"
self.enable_erase = "i2cset -f -y 2 0x35 0x2c 0x01"
self.disable_erase = "i2cset -f -y 2 0x35 0x2c 0x00"
self.read_value = "i2cget -f -y 2 0x35 0x25"
self.write_value = "i2cset -f -y 2 0x35 0x21 0x0a"
self.set_sys_led_cmd = "i2cset -f -y 2 0x33 0xb2 "
self.get_sys_led_cmd = "i2cget -f -y 2 0x33 0xb2"
self.led_status = "red"
self.enable_read= ["i2cset", "-f", "-y", "2", "0x35", "0x2a", "0x01"]
self.disable_read = ["i2cset", "-f", "-y", "2", "0x35", "0x2a", "0x00"]
self.enable_write = ["i2cset", "-f", "-y", "2", "0x35", "0x2b", "0x00"]
self.disable_write = ["i2cset", "-f", "-y", "2", "0x35", "0x2b", "0x01"]
self.enable_erase = ["i2cset", "-f", "-y", "2", "0x35", "0x2c", "0x01"]
self.disable_erase = ["i2cset", "-f", "-y", "2", "0x35", "0x2c", "0x00"]
self.read_value = ["i2cget", "-f", "-y", "2", "0x35", "0x25"]
self.write_value = ["i2cset", "-f", "-y", "2", "0x35", "0x21", "0x0a"]
self.set_sys_led_cmd = ["i2cset", "-f", "-y", "2", "0x33", "0xb2"]
self.get_sys_led_cmd = ["i2cget", "-f", "-y", "2", "0x33", "0xb2"]
self.led_status = "red"
# Initialize SFP list
# sfp.py will read eeprom contents and retrive the eeprom data.
# It will also provide support sfp controls like reset and setting
Expand Down Expand Up @@ -210,25 +210,25 @@ def get_reboot_cause(self):
try:
is_power_loss = False
# enable read
subprocess.getstatusoutput(self.disable_write)
subprocess.getstatusoutput(self.enable_read)
ret , log = subprocess.getstatusoutput(self.read_value)
getstatusoutput_noshell(self.disable_write)
getstatusoutput_noshell(self.enable_read)
ret , log = getstatusoutput_noshell(self.read_value)
if ret == 0 and "0x0a" in log:
is_power_loss = True

# erase i2c and e2
subprocess.getstatusoutput(self.enable_erase)
getstatusoutput_noshell(self.enable_erase)
time.sleep(1)
subprocess.getstatusoutput(self.disable_erase)
getstatusoutput_noshell(self.disable_erase)
# clear data
subprocess.getstatusoutput(self.enable_write)
subprocess.getstatusoutput(self.disable_read)
subprocess.getstatusoutput(self.disable_write)
subprocess.getstatusoutput(self.enable_read)
getstatusoutput_noshell(self.enable_write)
getstatusoutput_noshell(self.disable_read)
getstatusoutput_noshell(self.disable_write)
getstatusoutput_noshell(self.enable_read)
# enable write and set data
subprocess.getstatusoutput(self.enable_write)
subprocess.getstatusoutput(self.disable_read)
subprocess.getstatusoutput(self.write_value)
getstatusoutput_noshell(self.enable_write)
getstatusoutput_noshell(self.disable_read)
getstatusoutput_noshell(self.write_value)
if is_power_loss:
return(self.REBOOT_CAUSE_POWER_LOSS, None)
except Exception as e:
Expand Down Expand Up @@ -417,7 +417,8 @@ def set_status_led(self, color):
if regval is None:
print("Invaild color input.")
return False
ret , log = subprocess.getstatusoutput(self.set_sys_led_cmd + regval)
cmd = self.set_sys_led_cmd + [regval]
ret, log = getstatusoutput_noshell(cmd)
if ret != 0:
print("Cannot execute %s" % self.set_sys_led_cmd + regval)
return False
Expand All @@ -431,7 +432,7 @@ def get_status_led(self):
A string, one of the valid LED color strings which could be vendor
specified.
"""
ret , log = subprocess.getstatusoutput(self.get_sys_led_cmd)
ret , log = getstatusoutput_noshell(self.get_sys_led_cmd)
if ret != 0:
print("Cannot execute %s" % self.get_sys_led_cmd)
return False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import yaml

import subprocess
from sonic_py_common import device_info


Expand All @@ -10,13 +10,13 @@ class Common:
PMON_PLATFORM_PATH = '/usr/share/sonic/platform/'
CONFIG_DIR = 'sonic_platform_config'

HOST_CHK_CMD = "docker > /dev/null 2>&1"
HOST_CHK_CMD = ["docker"]

def __init__(self):
(self.platform, self.hwsku) = device_info.get_platform_and_hwsku()

def is_host(self):
return os.system(self.HOST_CHK_CMD) == 0
return subprocess.call(self.HOST_CHK_CMD) == 0

def load_json_file(self, path):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
########################################################################

try:
import subprocess
from sonic_platform_base.component_base import ComponentBase
from sonic_platform.regutil import Reg
from sonic_platform.logger import logger
from sonic_py_common.general import getstatusoutput_noshell
except ImportError as e:
raise ImportError(str(e) + "- required module not found")

Expand Down Expand Up @@ -70,12 +70,12 @@ def install_firmware(self, image_path):
"""
try:
successtips = "CPLD Upgrade succeeded!"
status, output = subprocess.getstatusoutput("which firmware_upgrade")
status, output = getstatusoutput_noshell(["which", "firmware_upgrade"])
if status or len(output) <= 0:
logger.error("no upgrade tool.")
return False
cmdstr = "%s %s cpld %d cpld"%(output,image_path,self.slot)
ret, log = subprocess.getstatusoutput(cmdstr)
cmdstr = [output, image_path, "cpld", self.slot, "cpld"]
ret, log = getstatusoutput_noshell(cmdstr)
if ret == 0 and successtips in log:
return True
logger.error("upgrade failed. ret:%d, log:\n%s" % (ret, log))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
#
#######################################################

import subprocess
import time
import glob
import re
#import chardet
from rjutil.smbus import SMBus
import time
from functools import wraps
from sonic_py_common.general import getstatusoutput_noshell


def retry(maxretry =6, delay = 0.01):
Expand Down Expand Up @@ -80,13 +80,13 @@ def rji2csetword_python(bus, addr, reg, value):

@staticmethod
def command(cmdstr):
retcode, output = subprocess.getstatusoutput(cmdstr)
retcode, output = getstatusoutput_noshell(cmdstr)
return retcode, output


@staticmethod
def geti2cword_i2ctool(bus, addr, offset):
command_line = "i2cget -f -y %d 0x%02x 0x%02x wp" % (bus, addr, offset)
command_line = ["i2cget", "-f", "-y", str(bus), "0x%02x"%addr, "0x%02x"%offset, "wp"]
retrytime = 6
ret_t = ""
for i in range(retrytime):
Expand All @@ -99,7 +99,7 @@ def geti2cword_i2ctool(bus, addr, offset):

@staticmethod
def seti2cword_i2ctool(bus, addr, offset, val):
command_line = "i2cset -f -y %d 0x%02x 0x%0x 0x%04x wp" % (bus, addr, offset, val)
command_line = ["i2cset", "-f", "-y", str(bus), "0x%02x"%addr, "0x%0x"%offset, "0x%04x"%val, "wp"]
retrytime = 6
ret_t = ""
for i in range(retrytime):
Expand All @@ -111,7 +111,7 @@ def seti2cword_i2ctool(bus, addr, offset, val):

@staticmethod
def rji2cget_i2ctool(bus, devno, address):
command_line = "i2cget -f -y %d 0x%02x 0x%02x " % (bus, devno, address)
command_line = ["i2cget", "-f", "-y", str(bus), "0x%02x"%devno, "0x%02x"%address]
retrytime = 6
ret_t = ""
for i in range(retrytime):
Expand All @@ -123,8 +123,7 @@ def rji2cget_i2ctool(bus, devno, address):

@staticmethod
def rji2cset_i2ctool(bus, devno, address, byte):
command_line = "i2cset -f -y %d 0x%02x 0x%02x 0x%02x" % (
bus, devno, address, byte)
command_line = ["i2cset", "-f", "-y", str(bus), "0x%02x"%devno, "0x%02x"%address, "0x%02x"%byte]
retrytime = 6
ret_t = ""
for i in range(retrytime):
Expand Down Expand Up @@ -166,7 +165,7 @@ def readsysfs(location):

@staticmethod
def getdevmem(addr, digit, mask):
command_line = "devmem 0x%02x %d" %(addr, digit)
command_line = ["devmem", "0x%02x"%addr, str(digit)]
retrytime = 6
ret_t = ""
for i in range(retrytime):
Expand All @@ -179,13 +178,13 @@ def getdevmem(addr, digit, mask):

@staticmethod
def rj_os_system(cmd):
status, output = subprocess.getstatusoutput(cmd)
status, output = getstatusoutput_noshell(cmd)
return status, output

@staticmethod
def getsdkreg(reg):
try:
cmd = "bcmcmd -t 1 'getr %s ' < /dev/null" % reg
cmd = ["bcmcmd", "-t", "1", "getr"+str(reg)]
ret, result = osutil.rj_os_system(cmd)
result_t = result.strip().replace("\r", "").replace("\n", "")
if ret != 0 or "Error:" in result_t:
Expand All @@ -203,8 +202,8 @@ def getmactemp():
result = {}
#waitForDocker()
#need to exec twice
osutil.rj_os_system("bcmcmd -t 1 \"show temp\" < /dev/null")
ret, log = osutil.rj_os_system("bcmcmd -t 1 \"show temp\" < /dev/null")
osutil.rj_os_system(["bcmcmd", "-t", "1", "show temp"])
ret, log = osutil.rj_os_system(["bcmcmd", "-t", "1", "show temp"])
if ret:
return False, result
else:
Expand Down
Loading

0 comments on commit 5238bd7

Please sign in to comment.