Skip to content

Commit

Permalink
Add smoke test for appprotect csrf policy (#1627)
Browse files Browse the repository at this point in the history
* Add smoke test for appprotect csrf policy
  • Loading branch information
vepatel committed May 26, 2021
1 parent 0128216 commit feb16da
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 25 deletions.
24 changes: 24 additions & 0 deletions tests/data/appprotect/csrf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: appprotect.f5.com/v1beta1
kind: APPolicy
metadata:
name: csrf
spec:
policy:
applicationLanguage: utf-8
name: specific_csrf_url_viol_csrf_block
template:
name: POLICY_TEMPLATE_NGINX_BASE
blocking-settings:
violations:
- name: VIOL_CSRF
alarm: true
block: true
csrf-protection:
enabled: true
csrf-urls:
- "$action": delete
method: POST
url: "*"
- enforcementAction: verify-origin
method: POST
url: "/backend2"
29 changes: 20 additions & 9 deletions tests/suite/resources_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,9 +1162,9 @@ def get_events(v1: CoreV1Api, namespace) -> []:
return res.items


def ensure_response_from_backend(req_url, host, additional_headers=None) -> None:
def ensure_response_from_backend(req_url, host, additional_headers=None, check404=False) -> None:
"""
Wait for 502|504 to disappear.
Wait for 502|504|404 to disappear.
:param req_url: url to request
:param host:
Expand All @@ -1174,10 +1174,21 @@ def ensure_response_from_backend(req_url, host, additional_headers=None) -> None
headers = {"host": host}
if additional_headers:
headers.update(additional_headers)
for _ in range(30):
resp = requests.get(req_url, headers=headers, verify=False)
if resp.status_code != 502 and resp.status_code != 504:
print(f"After {_ * 2} seconds got non 502|504 response. Continue with tests...")
return
time.sleep(1)
pytest.fail(f"Keep getting 502|504 from {req_url} after 60 seconds. Exiting...")

if check404:
for _ in range(60):
resp = requests.get(req_url, headers=headers, verify=False)
if resp.status_code != 502 and resp.status_code != 504 and resp.status_code != 404:
print(f"After {_} retries at 1 second interval, got {resp.status_code} response. Continue with tests...")
return
time.sleep(1)
pytest.fail(f"Keep getting {resp.status_code} from {req_url} after 60 seconds. Exiting...")

else:
for _ in range(30):
resp = requests.get(req_url, headers=headers, verify=False)
if resp.status_code != 502 and resp.status_code != 504:
print(f"After {_} retries at 1 second interval, got non 502|504 response. Continue with tests...")
return
time.sleep(1)
pytest.fail(f"Keep getting 502|504 from {req_url} after 60 seconds. Exiting...")
67 changes: 51 additions & 16 deletions tests/suite/test_app_protect.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class BackendSetup:
ingress_host (str):
"""

def __init__(self, req_url, ingress_host):
def __init__(self, req_url, req_url_2, ingress_host):
self.req_url = req_url
self.req_url_2 = req_url_2
self.ingress_host = ingress_host


Expand All @@ -60,6 +61,7 @@ def backend_setup(request, kube_apis, ingress_controller_endpoint, test_namespac
print("------------------------- Deploy backend application -------------------------")
create_example_app(kube_apis, "simple", test_namespace)
req_url = f"https://{ingress_controller_endpoint.public_ip}:{ingress_controller_endpoint.port_ssl}/backend1"
req_url_2 = f"https://{ingress_controller_endpoint.public_ip}:{ingress_controller_endpoint.port_ssl}/backend2"
wait_until_all_pods_are_ready(kube_apis.v1, test_namespace)
ensure_connection_to_public_endpoint(
ingress_controller_endpoint.public_ip,
Expand All @@ -82,7 +84,9 @@ def backend_setup(request, kube_apis, ingress_controller_endpoint, test_namespac
print("------------------------- Deploy ingress -----------------------------")
ingress_host = {}
src_ing_yaml = f"{TEST_DATA}/appprotect/appprotect-ingress.yaml"
create_ingress_with_ap_annotations(kube_apis, src_ing_yaml, test_namespace, policy, "True", "True", "127.0.0.1:514")
create_ingress_with_ap_annotations(
kube_apis, src_ing_yaml, test_namespace, policy, "True", "True", "127.0.0.1:514"
)
ingress_host = get_first_ingress_host_from_yaml(src_ing_yaml)
wait_before_test()

Expand All @@ -98,7 +102,7 @@ def fin():

request.addfinalizer(fin)

return BackendSetup(req_url, ingress_host)
return BackendSetup(req_url, req_url_2, ingress_host)


@pytest.mark.skip_for_nginx_oss
Expand All @@ -107,7 +111,7 @@ def fin():
@pytest.mark.parametrize(
"crd_ingress_controller_with_ap",
[{"extra_args": [f"-enable-custom-resources", f"-enable-app-protect"]}],
indirect=["crd_ingress_controller_with_ap"],
indirect=True,
)
class TestAppProtect:
@pytest.mark.parametrize("backend_setup", [{"policy": "dataguard-alarm"}], indirect=True)
Expand All @@ -120,8 +124,7 @@ def test_responses_dataguard_alarm(
print("------------- Run test for AP policy: dataguard-alarm --------------")
print(f"Request URL: {backend_setup.req_url} and Host: {backend_setup.ingress_host}")

wait_before_test(40)
ensure_response_from_backend(backend_setup.req_url, backend_setup.ingress_host)
ensure_response_from_backend(backend_setup.req_url, backend_setup.ingress_host, check404=True)

print("----------------------- Send valid request ----------------------")
resp_valid = requests.get(
Expand All @@ -144,15 +147,16 @@ def test_responses_dataguard_alarm(
assert resp_invalid.status_code == 200

@pytest.mark.parametrize("backend_setup", [{"policy": "file-block"}], indirect=True)
def test_responses_file_block(self, kube_apis, crd_ingress_controller_with_ap, backend_setup, test_namespace):
def test_responses_file_block(
self, kube_apis, crd_ingress_controller_with_ap, backend_setup, test_namespace
):
"""
Test file-block AppProtect policy: Block executing types e.g. .bat and .exe
"""
"""
print("------------- Run test for AP policy: file-block --------------")
print(f"Request URL: {backend_setup.req_url} and Host: {backend_setup.ingress_host}")

wait_before_test(40)
ensure_response_from_backend(backend_setup.req_url, backend_setup.ingress_host)
ensure_response_from_backend(backend_setup.req_url, backend_setup.ingress_host, check404=True)

print("----------------------- Send valid request ----------------------")
resp_valid = requests.get(
Expand Down Expand Up @@ -184,8 +188,7 @@ def test_responses_malformed_block(
print("------------- Run test for AP policy: malformed-block --------------")
print(f"Request URL: {backend_setup.req_url} and Host: {backend_setup.ingress_host}")

wait_before_test(40)
ensure_response_from_backend(backend_setup.req_url, backend_setup.ingress_host)
ensure_response_from_backend(backend_setup.req_url, backend_setup.ingress_host, check404=True)

print("----------------------- Send valid request with no body ----------------------")
headers = {"host": backend_setup.ingress_host}
Expand All @@ -196,18 +199,50 @@ def test_responses_malformed_block(
assert resp_valid.status_code == 200

print("----------------------- Send valid request with body ----------------------")
headers = {
"Content-Type": "application/json",
"host": backend_setup.ingress_host}
headers = {"Content-Type": "application/json", "host": backend_setup.ingress_host}
resp_valid = requests.post(backend_setup.req_url, headers=headers, data="{}", verify=False)
print(resp_valid.text)
assert valid_resp_addr in resp_valid.text
assert valid_resp_name in resp_valid.text
assert resp_valid.status_code == 200

print("---------------------- Send invalid request ---------------------")
resp_invalid = requests.post(backend_setup.req_url, headers=headers, data="{{}}", verify=False,)
resp_invalid = requests.post(
backend_setup.req_url,
headers=headers,
data="{{}}",
verify=False,
)
print(resp_invalid.text)
assert invalid_resp_title in resp_invalid.text
assert invalid_resp_body in resp_invalid.text
assert resp_invalid.status_code == 200

@pytest.mark.parametrize("backend_setup", [{"policy": "csrf"}], indirect=True)
def test_responses_csrf(
self, kube_apis, ingress_controller_endpoint, crd_ingress_controller_with_ap, backend_setup, test_namespace
):
"""
Test CSRF (Cross Site Request Forgery) AppProtect policy: Block requests with invalid/null/non-https origin-header
"""
print("------------- Run test for AP policy: CSRF --------------")
print(f"Request URL without CSRF protection: {backend_setup.req_url}")
print(f"Request URL with CSRF protection: {backend_setup.req_url_2}")

ensure_response_from_backend(backend_setup.req_url_2, backend_setup.ingress_host, check404=True)

print("----------------------- Send request with http origin header ----------------------")

headers = {"host": backend_setup.ingress_host, "Origin": "http://appprotect.example.com"}
resp_valid = requests.post(backend_setup.req_url, headers=headers, verify=False, cookies={"flavor": "darkchoco"})
resp_invalid = requests.post(backend_setup.req_url_2, headers=headers, verify=False, cookies={"flavor": "whitechoco"})

print(resp_valid.text)
print(resp_invalid.text)

assert valid_resp_addr in resp_valid.text
assert valid_resp_name in resp_valid.text
assert resp_valid.status_code == 200
assert invalid_resp_title in resp_invalid.text
assert invalid_resp_body in resp_invalid.text
assert resp_invalid.status_code == 200

0 comments on commit feb16da

Please sign in to comment.