Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Onboarding MDE for Linux to LISA #3113

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleanup
  • Loading branch information
Zeeshan Akhter committed Dec 21, 2023
commit 49560a630d6e92c0f63c887bd556601e78c80a7a
44 changes: 1 addition & 43 deletions lisa/sut_orchestrator/azure/tools.py
Original file line number Diff line number Diff line change
@@ -2,10 +2,7 @@
# Licensed under the MIT license.

import re
import os
import sys
import json
from pathlib import PurePath, Path
from pathlib import PurePath
from typing import Any, Dict, List, Optional, Tuple, Type

from assertpy import assert_that
@@ -23,9 +20,6 @@
get_matched_str,
)
from lisa.util.process import ExecutableResult
from typing import Optional, cast
from lisa.operating_system import Posix



class Waagent(Tool):
@@ -567,39 +561,3 @@ def get_pool_records(self, pool_id: int, force_run: bool = False) -> Dict[str, s
records[content_split[i]] = content_split[i + 1]

return records

class mdatp(Tool):
@property
def command(self) -> str:
self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]")
return "mdatp"

@property
def can_install(self) -> bool:
self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]")
return False

def _install(self) -> bool:
return self._check_exists()

def get_result(
self,
arg: str,
json_out: bool = False,
sudo: bool = False,
) -> Any:
self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]")
if json_out:
arg += ' --output json'
result = self.run(
arg,
sudo=sudo,
shell=True,
force_run=True,
)

result.assert_exit_code()
if json_out:
return json.loads(result.stdout)
return result.stdout.split()

1 change: 1 addition & 0 deletions lisa/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -65,6 +65,7 @@
from .lsvmbus import Lsvmbus
from .make import Make
from .mdadm import Mdadm
from .mde import MDE
from .mkdir import Mkdir
from .mkfs import FileSystem, Mkfs, Mkfsext, Mkfsxfs
from .modinfo import Modinfo
99 changes: 99 additions & 0 deletions lisa/tools/mde.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import os
import sys
import json
import requests
from pathlib import Path, PurePath
from typing import Any

from lisa.executable import Tool
from lisa.executable import CustomScriptBuilder, CustomScript
#from . import RemoteCopy, Whoami, Curl
from .remote_copy import RemoteCopy
from .whoami import Whoami

class MDE(Tool):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used by MDE test suite only, so it can be in the same file with the test suite.

@property
def command(self) -> str:
return "mdatp"

@property
def can_install(self) -> bool:
return True

def get_mde_installer(self) -> bool:
if not hasattr(self, '_mde_installer'):
response = requests.get("https://raw.githubusercontent.com/microsoft/mdatp-xplat/master/linux/installation/mde_installer.sh")
if response.ok:
script = response.text
import tempfile
_, self.mde_installer_script = tempfile.mkstemp(prefix='mde_installer', suffix='.sh')
with open(self.mde_installer_script, 'w') as writer:
writer.write(script)
self._mde_installer = CustomScriptBuilder(Path(os.path.dirname(self.mde_installer_script)),
[os.path.basename(self.mde_installer_script)])
return True
return False
return True

def _install(self) -> bool:
if not self.get_mde_installer():
self._log.error("Unable to download mde_installer.sh script. MDE can't be installed")

mde_installer: CustomScript = self.node.tools[self._mde_installer]
self._log.info('Installing MDE')
result1 = mde_installer.run(parameters="--install", sudo=True)
self._log.info(result1)

return self._check_exists()


def onboard(self, onboarding_script: PurePath) -> bool:
if not self._check_exists():
self._log.error("MDE is not installed, onboarding not possible")
return False

username = self.node.tools[Whoami].get_username()

remote_copy = self.node.tools[RemoteCopy]
remote_copy.copy_to_remote(onboarding_script, PurePath(f"/home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py"))

if not self.get_mde_installer():
self._log.error("Unable to download mde_installer.sh script. MDE can't be onboarded")

script: CustomScript = self.node.tools[self._mde_installer]

self._log.info('Onboarding MDE')
result1 = script.run(parameters=f"--onboard /home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py/MicrosoftDefenderATPOnboardingLinuxServer.py", sudo=True)
self._log.info(result1)

output = self.get_result('health --field licensed')

self._log.info(output)

return bool(output == ['true'])


def get_result(
self,
arg: str,
json_out: bool = False,
sudo: bool = False,
) -> Any:
if json_out:
arg += ' --output json'
result = self.run(
arg,
sudo=sudo,
shell=True,
force_run=True,
)

result.assert_exit_code(include_output=True)
if json_out:
return json.loads(result.stdout)
return result.stdout.split()


4 changes: 4 additions & 0 deletions microsoft/runbook/my.yml
Original file line number Diff line number Diff line change
@@ -6,6 +6,10 @@ variable:
value: tiers/tier.yml
- name: case
value: verify_cpu_count
- name: onboarding_script
is_secret: true
is_case_visible: True
value: ""
- name: location
value: "westus3"
- name: keep_environment
82 changes: 18 additions & 64 deletions microsoft/testsuites/vm_extensions/mde.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import os
import time
import requests

from typing import Any
from pathlib import Path, PurePath
from pathlib import PurePath

from assertpy import assert_that

@@ -16,12 +14,9 @@
simple_requirement,
)
from lisa.operating_system import BSD
from lisa.sut_orchestrator.azure.tools import mdatp
from lisa.testsuite import TestResult
from lisa.tools import RemoteCopy, Whoami, Curl
from lisa import CustomScriptBuilder, CustomScript
from lisa.util import LisaException

from lisa.tools import Curl, MDE as mdatp
from lisa.util import LisaException, SkippedException

@TestSuiteMetadata(
area="vm_extension",
@@ -33,17 +28,10 @@
class MDE(TestSuite):

def before_case(self, log: Logger, **kwargs: Any) -> None:
response = requests.get("https://raw.githubusercontent.com/microsoft/mdatp-xplat/master/linux/installation/mde_installer.sh")
if response.ok:
script = response.text
import tempfile
_, self.mde_installer = tempfile.mkstemp(prefix='mde_installer', suffix='.sh')
with open(self.mde_installer, 'w') as writer:
writer.write(script)
self._echo_script = CustomScriptBuilder(Path(os.path.dirname(self.mde_installer)),
[os.path.basename(self.mde_installer)])
else:
log.error('Unable to download mde_installer.sh script')
variables = kwargs["variables"]
self.onboarding_script = variables.get("onboarding_script", "")
if not self.onboarding_script:
raise SkippedException("Onboarding script is not provided.")

@TestCaseMetadata(
description="""
@@ -54,12 +42,9 @@ def before_case(self, log: Logger, **kwargs: Any) -> None:
min_memory_mb=1024,
unsupported_os=[BSD])
)
def verify_install(self, node: Node, log: Logger, result: TestResult) -> None:
script: CustomScript = node.tools[self._echo_script]
log.info('Installing MDE')
result1 = script.run(parameters="--install", sudo=True)
log.info(result1)
def verify_mde(self, node: Node, log: Logger, result: TestResult) -> None:

#Invoking tools first time, intalls the tool.
try:
output = node.tools[mdatp]._check_exists()
except LisaException as e:
@@ -68,60 +53,30 @@ def verify_install(self, node: Node, log: Logger, result: TestResult) -> None:

assert_that(output).described_as('Unable to install MDE').is_equal_to(True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the description (described_as) supposed to be opposite of what is being checked (is_equal_to) here and elsewhere?


@TestCaseMetadata(
description="""
Verify if MDE is healthy
""",
priority=1,
requirement=simple_requirement(min_core_count=2,
min_memory_mb=1024,
unsupported_os=[BSD])
)
def verify_onboard(self, node: Node, log: Logger, result: TestResult) -> None:
username = node.tools[Whoami].get_username()
self.verify_onboard(node, log, result)

remote_copy = node.tools[RemoteCopy]
remote_copy.copy_to_remote(
PurePath("/home/zakhter/projects/lab/MicrosoftDefenderATPOnboardingLinuxServer.py"), PurePath(f"/home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py"))
self.verify_health(node, log, result)

script: CustomScript = node.tools[self._echo_script]
self.verify_eicar_detection(node, log, result)

log.info('Onboarding MDE')
result1 = script.run(parameters=f"--onboard /home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py/MicrosoftDefenderATPOnboardingLinuxServer.py", sudo=True)
log.info(result1)
def verify_onboard(self, node: Node, log: Logger, result: TestResult) -> None:

output = node.tools[mdatp].get_result('health --field licensed')
onboarding_result = node.tools[mdatp].onboard(PurePath(self.onboarding_script))

log.info(output)
assert_that(onboarding_result).is_equal_to(True)

output = node.tools[mdatp].get_result('health --field licensed')

assert_that(output).is_equal_to(['true'])

@TestCaseMetadata(
description="""
Verify if MDE is healthy
""",
priority=1,
requirement=simple_requirement(min_core_count=2,
min_memory_mb=1024,
unsupported_os=[BSD])
)
def verify_health(self, node: Node, log: Logger, result: TestResult) -> None:
output = node.tools[mdatp].get_result('health', json_out=True)

log.info(output)

assert_that(output['healthy']).is_equal_to(True)

@TestCaseMetadata(
description="""
Verify if MDE is healthy
""",
priority=1,
requirement=simple_requirement(min_core_count=2,
min_memory_mb=1024,
unsupported_os=[BSD])
)
def eicar_test(self, node: Node, log: Logger, result: TestResult) -> None:
def verify_eicar_detection(self, node: Node, log: Logger, result: TestResult) -> None:
log.info('Running EICAR test')

output = node.tools[mdatp].get_result('health --field real_time_protection_enabled')
@@ -144,7 +99,6 @@ def eicar_test(self, node: Node, log: Logger, result: TestResult) -> None:
eicar_detect = ' '.join(new_threat_list).replace(' '.join(current_threat_list), '')

log.info(eicar_detect)
log.info(eicar_detect.find('Name: Virus:DOS/EICAR_Test_File'))
assert_that('Name: Virus:DOS/EICAR_Test_File' in eicar_detect).is_equal_to(True)