Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.2
0.3.3
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ requests~=2.24.0
pycryptodome==3.9.8
responses==0.10.15
requests-mock==1.8.0
pytest==5.4.3
pytest==5.4.3
setuptools~=49.6.0
IPy~=1.0
6 changes: 5 additions & 1 deletion securenative/config/configuration_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class ConfigurationManager(object):
DEFAULT_CONFIG_FILE = "securenative.ini"
CUSTOM_CONFIG_FILE_ENV_NAME = "SECURENATIVE_COMFIG_FILE"
CUSTOM_CONFIG_FILE_ENV_NAME = "SECURENATIVE_CONFIG_FILE"
config = ConfigParser()

@classmethod
Expand Down Expand Up @@ -35,8 +35,12 @@ def _get_resource_path(cls, env_name):
@classmethod
def _get_env_or_default(cls, properties, key, default):
if os.environ.get(key):
if "," in os.environ.get(key):
return os.environ.get(key).split(",")
return os.environ.get(key)
if properties.get(key):
if "," in properties.get(key):
return properties.get(key).split(",")
return properties.get(key)
return default

Expand Down
23 changes: 9 additions & 14 deletions securenative/utils/ip_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import ipaddress
import re
import socket
from IPy import IP


class IpUtils(object):
Expand All @@ -17,21 +17,16 @@ def is_ip_address(ip_address):

@staticmethod
def is_valid_public_ip(ip_address):
try:
socket.inet_aton(ip_address)
except socket.error:
return False
ip = ipaddress.ip_address(ip_address)

ip = ipaddress.IPv4Address(ip_address)
if ip.is_loopback \
or not ip.is_global \
or ip.is_private \
or ip.is_link_local \
or ip.is_multicast \
or ip.is_reserved \
or ip.is_unspecified:
if ip.version is 4:
if not ip.is_loopback and not ip.is_reserved and not ip.is_unspecified and IP(ip_address).iptype() is not "PRIVATE":
return True
return False
return True

if not ip.is_unspecified and IP(ip_address).iptype() is not "PRIVATE":
return True
return False

@staticmethod
def is_loop_back(ip_address):
Expand Down
61 changes: 46 additions & 15 deletions securenative/utils/request_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from securenative.utils.ip_utils import IpUtils


class RequestUtils(object):
SECURENATIVE_COOKIE = "_sn"
SECURENATIVE_HEADER = "x-securenative"
IP_HEADERS = ["HTTP_X_FORWARDED_FOR", "X_FORWARDED_FOR", "REMOTE_ADDR", "x-forwarded-for", "x-client-ip", "x-real-ip", "x-forwarded", "x-cluster-client-ip", "forwarded-for", "forwarded", "via"]

@staticmethod
def get_secure_header_from_request(headers):
Expand All @@ -15,33 +19,60 @@ def get_client_ip_from_request(request, options):
for header in options.proxy_headers:
try:
if request.environ.get(header) is not None:
return request.environ.get(header)
ips = request.environ.get(header).split(',')
return RequestUtils.get_valid_ip(ips)
if request.headers[header] is not None:
return request.headers[header]
ips = request.headers[header].split(',')
return RequestUtils.get_valid_ip(ips)
except Exception:
try:
if request.headers[header] is not None:
return request.headers[header]
ips = request.headers[header].split(',')
return RequestUtils.get_valid_ip(ips)
except Exception:
continue

try:
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[-1].strip()
else:
ip = request.META.get('REMOTE_ADDR')
for header in RequestUtils.IP_HEADERS:
try:
ips = request.headers.get(header).split(',')
return RequestUtils.get_valid_ip(ips)
except Exception:
continue

if ip is None or ip == "":
ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', ""))
for header in RequestUtils.IP_HEADERS:
try:
ips = request.META.get(header).split(',')
return RequestUtils.get_valid_ip(ips)
except Exception:
continue

return ip
except Exception:
return ""
return ""

@staticmethod
def get_remote_ip_from_request(request):
try:
return request.raw._original_response.fp.raw._sock.getpeername()[0]
if len(request.raw._original_response.fp.raw._sock.getpeername()) > 0:
return request.raw._original_response.fp.raw._sock.getpeername()[0]
except Exception:
return ""

@staticmethod
def get_valid_ip(ips):
if isinstance(ips, list):
for ip in ips:
ip = ip.strip()
if IpUtils.is_valid_public_ip(ip):
return ip

# No public ip found check for no loopback
for ip in ips:
ip = ip.strip()
if not IpUtils.is_loop_back(ip):
return ip

ip = ips.strip()
if IpUtils.is_valid_public_ip(ip):
return ip

if IpUtils.is_loop_back(ip):
return ip
2 changes: 1 addition & 1 deletion securenative/utils/version_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ class VersionUtils(object):

@staticmethod
def get_version():
return "0.3.2"
return "0.3.3"
2 changes: 1 addition & 1 deletion tests/api_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_track_event(self):

@responses.activate
def test_should_timeout_on_post(self):
options = SecureNativeOptions(api_key="YOUR_API_KEY", auto_send=True, timeout=0,
options = SecureNativeOptions(api_key="YOUR_API_KEY", auto_send=True, timeout=0.000001,
api_url="https://api.securenative-stg.com/collector/api/v1")

responses.add(responses.POST, "https://api.securenative-stg.com/collector/api/v1/verify",
Expand Down
Loading