Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Config/config.ini
settings.json
# C extensions
*.so
Logs/

# Distribution / packaging
.Python
Expand Down
9 changes: 7 additions & 2 deletions Common/breach_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
from . import utils
import re
import ipaddress
from Config.config import configure_logging
import logging
configure_logging()

# import argparse
# import asyncio

class BreachChecker:
def is_valid_email(self, email):
# Verify email format
# TODO: Add a list of email domains accepted..may be
if re.match(r"[^@]+@[^@]+\.[^@]+", email):
print("Input is a valid email address.")
logging.debug("Input is a valid email address.")
return True
else:
# print("Invalid email address.")
Expand All @@ -20,4 +25,4 @@ def is_valid_ip(self, ip):
ipaddress.ip_address(ip)
return True
except ValueError as err:
return False
return False
13 changes: 9 additions & 4 deletions Common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
import sys
import configparser
from . import utils
from Config.config import configure_logging
import logging


configure_logging()

def error_message(errormsg):
print(colored("Error: "+errormsg, 'red'))
logging.error(colored("Error: "+errormsg, 'red'))
# TODO: Extend error module to log to error log file
# print(errormsg)

def exit_message(exitmsg):
print("\033[91m{}\033[0m".format("Fatal Error: "+exitmsg))
logging.critical("\033[91m{}\033[0m".format("Fatal Error: "+exitmsg))
exit()

class Validator:
Expand All @@ -42,7 +47,7 @@ def is_valid_username(self, username):
# Verify username format
# TODO: Add a list of username domains accepted..may be
if re.match(r"^[a-zA-Z0-9\-\_\!\@\#\$\%\^\&\*\(\)]+", username):
print("Input is a valid username")
logging.debug("Input is a valid username")
return True
else:
# print("Invalid username ")
Expand All @@ -62,7 +67,7 @@ def check_VTAPIkey(self, VT_APIKey):
if 'error' not in data:
# Print pretty json response
# print(json.dumps(data, indent=4, sort_keys=True))
print("Virus Total Key Validation: \033[92m{}\033[0m".format("Success"))
logging.info("Virus Total Key Validation: \033[92m{}\033[0m".format("Success"))
return True
else:
error_message(json.dumps(data['error'], indent=4, sort_keys=True))
Expand Down
23 changes: 23 additions & 0 deletions Config/check_config_validity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import configparser

config = configparser.ConfigParser()
config.read('Config/logger.ini')

# check if the required sections are present
if 'loggers' not in config.sections() or 'handlers' not in config.sections():
raise ValueError('Missing required section in config.ini')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this message would have to be:

raise ValueError('Missing required section in logger.ini')


if 'keys' not in config['loggers'] or 'suspicious' not in config['loggers']:
raise ValueError('Missing required key in loggers section in config.ini')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar change required:

raise ValueError('Missing required key in loggers section in **logger**.ini')


# check if the required keys are present in the 'handlers' section
if 'keys' not in config['handlers'] or 'console' not in config['handlers']['keys']:
raise ValueError('Missing required key in handlers section in config.ini')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar change required:

raise ValueError('Missing required key in handlers section in **logger**.ini')


# check if the values of the keys are in the expected format
if not isinstance(config.getint('handlers', 'console.level'), int):
raise ValueError('Invalid value for console.level in config.ini')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar change:

raise ValueError('Invalid value for console.level in logger.ini')


if not isinstance(config.getint('loggers', 'keys.suspicious.level'), int):
raise ValueError('Invalid value for keys.suspicious.level in config.ini')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar change:

raise ValueError('Invalid value for keys.suspicious.level in logger.ini')


60 changes: 50 additions & 10 deletions Config/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,53 @@
# FILE CURRENTLY NOT USED
import logging
import logging.config
import configparser
import os
import sys

# import configparser
# from Common import utils

# try:
# config = configparser.ConfigParser()
# config.read('config.ini')
# except Exception as er:
# utils.error_message(er)
def get_config(config_file_path):
"""
Parse configuration from file and environment variables.

# # Get the Virus Total API key from the config file
# VT_APIKey = config['APIKeys']['VT_APIKey']
:param str config_file_path: path to the configuration file
:return: configuration values as a dictionary
:rtype: dict
"""
config = configparser.ConfigParser()

# read from file
config.read(config_file_path)
config.__dict__

# read from environment variables
for key in config['ENVIRONMENT']:
env_var = os.environ.get(key)
if env_var is not None:
config['ENVIRONMENT'][key] = env_var

return config


def get_logging_config(file_location="Config/logger.ini"):
"""
Get logging configuration from configuration file.

:return: logging configuration as a dictionary
:rtype: dict
"""
try:
open(file_location, "r")
except FileNotFoundError:
logging.critical(f"File location invalid: {file_location}")
sys.exit(1)

config = get_config('Config/logger.ini')
return config


def configure_logging():
"""
Configure logging for the application.
"""
logging_config = get_logging_config()
return logging_config
24 changes: 24 additions & 0 deletions Config/formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging


class Formatter(logging.Formatter):
def __init__(self, fmt=None, datefmt=None, levels=None):
super().__init__(fmt, datefmt)
self.levels = levels or {}

def format(self, record):
record.levelprefix = self.levels.get(record.levelno, '')
emoji = ''
if record.levelno == logging.CRITICAL:
emoji = '💣'
elif record.levelno == logging.ERROR:
emoji = '🔥'
elif record.levelno == logging.WARNING:
emoji = '⚠️'
elif record.levelno == logging.INFO:
emoji = 'ℹ️'
elif record.levelno == logging.DEBUG:
emoji = '🔍'
record.emoji = emoji
return super().format(record)

35 changes: 35 additions & 0 deletions Config/logger.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[ENVIRONMENT]
production = False

[logging]
level = INFO
handler = console,file
formatter = default
output = Logs/traceback.log
debug_emoji = 🐛
info_emoji = 💡
warning_emoji = ⚠️
error_emoji = 🚨
critical_emoji = 💣

[handlers]
keys = console,file

[formatters]
keys = default

[handler_console]
class = logging.StreamHandler
level = INFO
formatter = default
args = (sys.stdout,)

[handler_file]
class = logging.FileHandler
level = INFO
formatter = default
args = ('Logs/traceback.log', 'a')

[formatter_default]
format = %(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt = %Y-%m-%d %H:%M:%S
34 changes: 21 additions & 13 deletions Email/email_reputation_checker.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from logging.config import fileConfig
from logging import Logger, getLogger
import requests
import json
from termcolor import colored
from Common import utils as utils
from Common.utils import KeyFetcher
from Common.breach_checker import BreachChecker
from Config.config import configure_logging

import logging

configure_logging()


class EmailBreachChecker:
def __init__(self, email):
Expand All @@ -23,7 +31,7 @@ def periodicBreachDownloader(self):
}
try:
response = requests.get(url, headers=headers, data=payload).text
print("Downloading full breach data from HIBP")
logging.info("Downloading full breach data from HIBP")
breaches = json.loads(response)
with open('all_breaches.json', 'w') as f:
json.dump(breaches, f)
Expand All @@ -46,12 +54,12 @@ def checkEmailBreach(self, email):
except Exception as e:
utils.error_message(e.text)
if response.status_code == 404:
print(colored("No breaches found for this email","green"))
logger.debug("{}".format(colored("No breaches found for this email","green")))
exit()
data = json.loads(response.text)
# Validating the response during execution instead of wasting a call for validation
if 'statusCode' not in data:
print("Query successful")
logging.info("Query successful")
for breach_name in data:
# print("Breach name: ", breach_name['Name'])
with open('all_breaches.json', 'r') as f:
Expand All @@ -61,25 +69,25 @@ def checkEmailBreach(self, email):
# print(breaches)
for breach in breaches:
if 'Name' in breach and breach['Name'] == search_name:
print("\n\n" + colored("Account name:", "blue"), colored(email, "red"))
logging.info("\n\n{}{}".format(colored("Account name:", "blue"), colored(email, "red")))
if 'Name' in breach:
print(colored("Breach name:", "blue"), colored(breach_name['Name'], "red"))
logging.info("{} {}".format(colored("Breach name:", "blue"), colored(breach_name['Name'], "red")))
if 'Description' in breach:
print(colored("Breach description:", "blue"), colored(breach['Description'], "red"))
logging.info("{} {}".format(colored("Breach description:", "blue"), colored(breach['Description'], "red")))
if 'BreachDate' in breach:
print(colored("Breach date:", "blue"), colored(breach['BreachDate'], "white"))
logging.info("{} {}".format(colored("Breach date:", "blue"), colored(breach['BreachDate'], "white")))
if 'DataClasses' in breach:
print(colored("Data classes that were part of this breach:", "blue"), colored(breach['DataClasses'], "red"))
logging.info("{} {}".format(colored("Data classes that were part of this breach:", "blue"), colored(breach['DataClasses'], "red")))
if 'IsSensitive' in breach and breach['IsSensitive'] == True:
print(colored("Breach is sensitive:", "blue"), colored(breach['IsSensitive'], "yellow"))
logging.info("{} {}".format(colored("Breach is sensitive:", "blue"), colored(breach['IsSensitive'], "yellow")))
if 'IsSensitive' in breach and breach['IsSensitive'] == False:
print(colored("Breach is sensitive:", "blue"), colored(breach['IsSensitive'], "green"))
logging.info("{} {}".format(colored("Breach is sensitive:", "blue"), colored(breach['IsSensitive'], "green")))
if 'IsVerified' in breach and breach['IsVerified'] == True:
print(colored("Breach is verified:", "blue"), colored(breach['IsVerified'], "green"))
logging.info("{} {}".format(colored("Breach is verified:", "blue"), colored(breach['IsVerified'], "green")))
if 'IsVerified' in breach and breach['IsVerified'] == False:
print(colored("Breach is verified:", "blue"), colored(breach['IsVerified'], "red"))
logging.info("{} {}".format(colored("Breach is verified:", "blue"), colored(breach['IsVerified'], "red")))
# print(colored("Number of accounts compromised:", "blue"), colored(breach['PwnCount'], "white"))
print(colored("Number of accounts compromised:", "blue"), colored('{:,}'.format(breach['PwnCount']), "white"))
logging.info("{} {}".format(colored("Number of accounts compromised:", "blue"), colored('{:,}'.format(breach['PwnCount']), "white")))
else:
pass
else:
Expand Down
12 changes: 8 additions & 4 deletions IP/ip_reputation_checker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
from Common import utils as utils
from Common.utils import KeyFetcher
from Common.utils import Validator
import requests
import json
from Config.config import configure_logging

configure_logging()

# IPReputationChecker class inherits methods from BreachChecker super class
class IPReputationChecker:
Expand All @@ -16,7 +20,7 @@ def checkIPReputationVT(self, ip_address):
validator = Validator()
validator.check_VTAPIkey(vtkey)

print("Checking IP reputation from Virus Total for ip:", ip_address)
logging.info("Checking IP reputation from Virus Total for ip:", ip_address)
# Check IP reputation from Virus Total
url = "https://www.virustotal.com/api/v3/ip_addresses/"+ip_address
payload={}
Expand All @@ -35,15 +39,15 @@ def checkIPReputationTalos(self, ip_list):
# Check IP reputation from Cisco Talos
# https://talosintelligence.com/reputation_center/lookup?search=
# User requests to check IP reputation from Cisco Talos
print("Checking IP reputation from Cisco Talos")
logging.info("Checking IP reputation from Cisco Talos")
for ip in ip_list:
url = "https://talosintelligence.com/reputation_center/lookup?search="+ip
response = requests.request("GET", url).text
if "This IP address has been observed to be associated with malicious activity." in response:
print("IP: "+ip+" is malicious")
logging.info("IP: "+ip+" is malicious")
self.bad_ip_list.append(ip)
else:
print("IP: "+ip+" is not malicious")
logging.info("IP: "+ip+" is not malicious")

# # ==================
# def queryIP(apikey, ip_quad, bad_ip_list):
Expand Down
33 changes: 33 additions & 0 deletions Tests/Config/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
import unittest
from configparser import ConfigParser

class TestConfig(unittest.TestCase):
def setUp(self):
self.path = os.path.join('Config/logger.ini')

if not os.path.isdir('Logs'):
os.mkdir('Logs')
if not os.path.exists(os.path.join('Logs', 'traceback.log')):
with open(os.path.join('Logs', 'traceback.log')) as fp:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have encountered an issue while running the test when traceback.log file isn't already created. Like in line 10, where Logs directory is created if not exists, we may need to create traceback.log if it doesn't exist.

fp.close()

def test_config_exists(self):
self.assertTrue(self.path)

def test_config_is_valid(self):
parser = ConfigParser()
if self.test_config_exists:
parser.read(self.path)

# Check if the required sections are present in the config
self.assertIn('formatters', parser.sections())
self.assertIn('handler_console', parser.sections())

# Check if the required keys are present in the sections
self.assertIn('keys', parser['formatters'])
self.assertIn('level', parser['handler_console'])


if __name__ == "__main__":
unittest.main()
Loading