-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added new check -
ssh-files-checksum
: `Calls remote process using S…
…SH and expects: the listed files and checksums will be matching`
- Loading branch information
1 parent
f58b9cf
commit 1fe6326
Showing
2 changed files
with
156 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
#!/usr/bin/env python3 | ||
|
||
""" | ||
ssh-files-checksum | ||
------------------ | ||
Calls remote process using SSH and expects: the listed files and checksums will be matching | ||
Parameters: | ||
- USER (default: root) | ||
- HOST | ||
- PORT (default: 22) | ||
- PRIVATE_KEY | ||
- PASSWORD | ||
- SSH_BIN (default: ssh) | ||
- SSHPASS_BIN (default: sshpass) | ||
- SSH_OPTS | ||
- COMMAND (default: uname -a) | ||
- TIMEOUT: (default: 15, unit: seconds) | ||
- METHOD (default: sha256sum) | ||
- EXPECTS (json dict, example: {"/usr/bin/bahub": "d6e85b50756a08e24c1d46f07b68e288c9e7e565fd662a15baca214f576c34be"}) | ||
""" | ||
|
||
|
||
import os | ||
import sys | ||
import inspect | ||
import json | ||
import re | ||
from typing import Tuple | ||
|
||
|
||
path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) + '/../../' | ||
sys.path.insert(0, path) | ||
|
||
from infracheck.infracheck.checklib.ssh import SSH | ||
|
||
|
||
class SshChecksum(SSH): | ||
def main(self, method: str, expects: dict) -> Tuple[bool, str]: | ||
paths = [] | ||
|
||
for path, expected in expects.items(): | ||
paths.append(path) | ||
|
||
current_state = self._calculate_current_hashes(method, paths) | ||
|
||
for path, expects in expects.items(): | ||
current = current_state[path] if path in current_state else 'file-not-found' | ||
|
||
if current != expects: | ||
return False, "FAIL: '%s' checksum is not matching. Expected: '%s', current: '%s'" % (path, expects, current) | ||
|
||
return True, "All checksums are matching" | ||
|
||
def _calculate_current_hashes(self, method: str, paths: list): | ||
out, exit_code = self.execute([method] + paths) | ||
hashes = {} | ||
|
||
for line in out.split("\n"): | ||
parsed = re.search(r'([A-Za-z0-9]+)\s+(.*)', line) | ||
|
||
if not parsed: | ||
continue | ||
|
||
hashes[parsed.group(2)] = parsed.group(1) | ||
|
||
return hashes | ||
|
||
|
||
if __name__ == '__main__': | ||
expects = json.loads(os.getenv('EXPECTS', '{}')) | ||
|
||
if not os.getenv('HOST'): | ||
print("HOST is mandatory") | ||
exit(1) | ||
|
||
if not expects: | ||
print('EXPECTS is mandatory (json dictionary)') | ||
exit(1) | ||
|
||
app = SshChecksum( | ||
user=os.getenv('USER', 'root'), | ||
host=os.getenv('HOST'), | ||
port=int(os.getenv('PORT', 22)), | ||
password=os.getenv('PASSWORD', ''), | ||
private_key=os.getenv('PRIVATE_KEY', ''), | ||
ssh_bin=os.getenv('SSH_BIN', 'ssh'), | ||
sshpass_bin=os.getenv('SSHPASS_BIN', 'sshpass'), | ||
ssh_opts=os.getenv('SSH_OPTS', ''), | ||
timeout=os.getenv('TIMEOUT', 15) | ||
) | ||
|
||
status, message = app.main( | ||
method=os.getenv('METHOD', 'sha256sum'), | ||
expects=expects | ||
) | ||
|
||
print(message) | ||
sys.exit(0 if status else 1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import unittest | ||
import docker | ||
|
||
from .utils import run_check | ||
from .utils.ssh_test import TestThatRequiresSshServer | ||
|
||
|
||
class SshCommandCheckTest(TestThatRequiresSshServer, unittest.TestCase): | ||
docker_client: docker.DockerClient | ||
|
||
def test_not_passed_host_raises_human_readable_message(self): | ||
stdout: str | ||
result: int | ||
|
||
stdout, result, hooks_output = run_check('ssh-files-checksum', {}, {}) | ||
|
||
self.assertIn('HOST is mandatory', stdout) | ||
self.assertFalse(result) | ||
|
||
def test_success_case(self): | ||
stdout: str | ||
result: int | ||
|
||
stdout, result, hooks_output = run_check('ssh-files-checksum', { | ||
'HOST': 'localhost', | ||
'PORT': 3222, | ||
'USER': 'root', | ||
'PASSWORD': 'root', | ||
'SSH_OPTS': '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null', | ||
'EXPECTS': { | ||
"/dev/null": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" | ||
} | ||
}, {}) | ||
|
||
self.assertEqual('All checksums are matching', stdout.strip()) | ||
self.assertTrue(result) | ||
|
||
def test_at_least_one_checksum_not_matching(self): | ||
stdout: str | ||
result: int | ||
|
||
stdout, result, hooks_output = run_check('ssh-files-checksum', { | ||
'HOST': 'localhost', | ||
'PORT': 3222, | ||
'USER': 'root', | ||
'PASSWORD': 'root', | ||
'SSH_OPTS': '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null', | ||
'EXPECTS': { | ||
"/dev/null": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", | ||
"/bin/sh": "will-not-match-this-one" | ||
} | ||
}, {}) | ||
|
||
self.assertIn("FAIL: '/bin/sh' checksum is not matching. Expected: 'will-not-match-this-one'", stdout.strip()) | ||
self.assertFalse(result) |