In [None]:
from configparser import ConfigParser
from pymongo import MongoClient
from kafka import KafkaConsumer
from json import loads


def read_config():
    data = {}
    config = ConfigParser()
    config.read('../kafka_config.ini')
    data['topic_name'] = config.get('kafka', 'topic_name')
    data['bootstrap_servers'] = config.get('kafka', 'bootstrap_servers')
    data['group_id'] = config.get('kafka', 'group_id')
    data['auto_offset_reset'] = config.get('kafka', 'auto_offset_reset')
    data['enable_auto_commit'] = config.get('kafka', 'enable_auto_commit')

    return data


kafka_config = read_config()
# generating the Kafka Consumer
my_consumer = KafkaConsumer(
    kafka_config['topic_name'],
    bootstrap_servers=kafka_config['bootstrap_servers'],
    auto_offset_reset=kafka_config['auto_offset_reset'],
    enable_auto_commit=kafka_config['enable_auto_commit'],
    group_id=kafka_config['group_id'],
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)

connectString = 'mongodb://10.8.0.3:27021/?directConnection=true'

try:
    my_client = MongoClient(connectString)
    my_collection = my_client.my_application.users
    print("Connected successfully!")
except:
    print("Could not connect to MongoDB")


for message in my_consumer:
    message = message.value
    try:
        my_collection.insert_one(message)
        print("Data inserted successfull!")
    except:
        print("Could not insert into database!")


In [3]:
from xml.etree import ElementTree as ET
import json
import xmltodict
import dateutil.parser

def xml_to_json(xml_string):
    xml_dict = xmltodict.parse(xml_string)
    json_data = json.dumps(xml_dict)
    return json_data

evt_level_dict = {0: 'LogAlways',
                  1: 'Critical',
                  2: 'Error',
                  3: 'Warning',
                  4: 'Informational',
                  5: 'Verbose'}

evt_opcode_dict = {0: 'Info',
                   1: 'Start',
                   2: 'Stop',
                   3: 'DataCollectionStart',
                   4: 'DataCollectionStop',
                   5: 'Extension',
                   6: 'Reply',
                   7: 'Resume',
                   8: 'Suspend',
                   9: 'Send',
                   240: 'Receive'}


def parse_XML_log(event):
    """"Parse a Windows event log entry in XML format into a dictionary of properties. 

    Args:
        event (string): The XML string representing the Windows event log entry.

    Returns:
        dict: A dictionary of properties extracted from the XML string.
    """
    tree = ET.ElementTree(ET.fromstring(event))
    root = tree.getroot()
    ns = "{http://schemas.microsoft.com/win/2004/08/events/event}"
    data = {}
    for eventID in root.findall(".//"):
        if eventID.tag == f"{ns}System":
            for e_id in eventID.iter():
                if e_id.tag == f"{ns}System":
                    pass
                elif e_id.tag == f"{ns}Provider":
                    data["Provider"] = e_id.attrib.get('Name')
                elif e_id.tag == f"{ns}TimeCreated":
                    data["TimeCreated"] = dateutil.parser.isoparse(
                        e_id.attrib.get('SystemTime')).strftime('%Y-%m-%d %H:%M:%S')
                elif e_id.tag == f"{ns}Correlation":
                    data["ActivityID"] = e_id.attrib.get('ActivityID')
                elif e_id.tag == f"{ns}Execution":
                    data["ProcessID"] = e_id.attrib.get('ProcessID')
                    data["ThreadID"] = e_id.attrib.get('ThreadID')
                elif e_id.tag == f"{ns}Level":
                    if not int(e_id.text) in evt_level_dict.keys():
                        data['Level'] = "unknown"
                    else:
                        data['Level'] = evt_level_dict[int(e_id.text)]
                elif e_id.tag == f"{ns}Opcode":
                    if not int(e_id.text) in evt_opcode_dict.keys():
                        data['Opcode'] = "unknown"
                    else:
                        data['Opcode'] = evt_opcode_dict[int(e_id.text)]
                else:
                    att = e_id.tag.replace(f"{ns}", "")
                    data[att] = e_id.text

        if eventID.tag == f"{ns}EventData":
            for attr in eventID.iter():
                if attr.tag == f'{ns}Data':
                    if attr.get('Name') is None:
                        data["Data"] = attr.text
                    else:
                        data[attr.get('Name')] = attr.text
                elif attr.tag == f'{ns}Binary':
                    data["Binary"] = attr.text

    return data

In [4]:
data = []
with open("xml_logs.txt", "r+") as file1:
    lines = file1.readlines()   
    for line in lines:              
        data.append(parse_XML_log(line))   
        
data

[{'Provider': 'Microsoft-Windows-Security-Auditing',
  'EventID': '4798',
  'Version': '0',
  'Level': 'Critical',
  'Task': '13824',
  'Opcode': 'Info',
  'Keywords': '0x8020000000000000',
  'TimeCreated': '2023-05-27 11:07:27',
  'EventRecordID': '470695',
  'ActivityID': '{b4860852-8edd-0002-c708-86b4dd8ed901}',
  'ProcessID': '680',
  'ThreadID': '844',
  'Channel': 'Security',
  'Computer': 'DESKTOP-1LIHC4I',
  'Security': None,
  'TargetUserName': 'Louki',
  'TargetDomainName': 'DESKTOP-1LIHC4I',
  'TargetSid': 'S-1-5-21-2384755564-1941788603-3002086374-1001',
  'SubjectUserSid': 'S-1-5-21-2384755564-1941788603-3002086374-1001',
  'SubjectUserName': 'Louki',
  'SubjectDomainName': 'DESKTOP-1LIHC4I',
  'SubjectLogonId': '0x63a1c67',
  'CallerProcessId': '0x2b74',
  'CallerProcessName': 'C:\\Program Files\\Git\\usr\\bin\\sh.exe'},
 {'Provider': 'Microsoft-Windows-Security-Auditing',
  'EventID': '5379',
  'Version': '0',
  'Level': 'Error',
  'Task': '13824',
  'Opcode': 'Info',
  