# Event Producer (Windows Logs → Kafka)

## Purpose of This Notebook

This notebook implements the **event producer** for the cybersecurity pipeline used in this laboratory.

Its purpose is to **simulate a real-world source of security-relevant events** and to inject these events into the pipeline **asynchronously**, using a message queue (Kafka).

This notebook does **not** perform any detection or analysis.  
It represents the *upstream data source* of the system.

---

## What This Producer Represents

Conceptually, this notebook simulates:

- Windows process execution logs  
- Windows authentication events  
- Host- and user-related security telemetry  

In a real SOC / SIEM system, similar data would come from:

- endpoint agents,
- operating system logs,
- EDR tools,
- log forwarders.

Here, the data is **synthetic**, but the **architecture is realistic**.

---

## Key Architectural Ideas Demonstrated

### 1. Event-Driven Design

Events are sent to Kafka **independently of consumers**.

The producer:
- does not know who will process the data,
- does not wait for classification results,
- does not depend on downstream components.

This decoupling is a core principle of **high-load cybersecurity pipelines**.

---

### 2. Message Queue Instead of Function Calls

Kafka is used instead of direct function calls to ensure:

- buffering under load,
- fault tolerance,
- asynchronous processing,
- scalability.

This is why real SOC pipelines rely on queues, not synchronous APIs.

---

### 3. Distributed Tracing from the First Stage

Each generated event is assigned:

- a unique `event_id`,
- a corresponding **trace ID**,
- and a producer-side trace span.

This allows the **entire lifecycle of one event** to be observed later in Jaeger,
across multiple pipeline stages.

---

## Generated Event Structure

Each event is a JSON object with fields such as:

- `event_id`
- `timestamp`
- `user`
- `host`
- `source_ip`
- `event_type`

Depending on the event type, additional fields are included:

- process execution details, or
- authentication result details.

This structure is intentionally simple and readable.

---

## Tracing Model Used Here

This producer uses **OpenTelemetry** to emit traces.

Important design choice:

> **The trace ID is derived from the event ID.**

This ensures that:
- producer spans,
- consumer spans,
- and processing spans

all appear as part of **one single trace** in Jaeger.

This is essential for understanding **end-to-end pipeline execution**.

---

## What This Notebook Does NOT Do

- It does **not** classify events.
- It does **not** store results.
- It does **not** perform analytics.
- It does **not** optimize performance.

All of that happens in downstream pipeline stages.

---

## How This Notebook Is Used in the Lab

1. Start the full pipeline using `docker compose`.
2. Run this notebook to begin producing events.
3. Observe:
   - events appearing in Kafka (via Redpanda Console),
   - traces appearing in Jaeger.
4. Run the consumer notebook in parallel.

---

## Key Takeaway

> **This notebook models how security data enters a real pipeline.**  
> Understanding this stage is critical before working on detection, ML, or performance.

In cybersecurity systems, **data ingestion is as important as analysis**.


In [5]:
!pip install -q kafka-python

In [6]:
!pip install -q opentelemetry-sdk

E0000 00:00:1768469899.066104    4703 backup_poller.cc:138] Run client channel backup poller: UNKNOWN:pollset_work {children:[UNKNOWN:epoll_wait: Bad file descriptor (9)]}


In [7]:
!pip install -q opentelemetry-exporter-otlp

In [None]:
import json
import random
import time
import uuid
from datetime import datetime, timezone

from kafka import KafkaProducer

# ---------------- OpenTelemetry ----------------
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
    OTLPSpanExporter,
)
from opentelemetry.trace import SpanKind, TraceFlags
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace import SpanContext, NonRecordingSpan

# ---------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------

KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
KAFKA_TOPIC = "raw-events"

# OpenTelemetry Collector endpoint
OTLP_ENDPOINT = "jaeger:4317"  # gRPC
SERVICE_NAME = "windows-log-producer"

EVENT_INTERVAL_SEC = (1, 5)

# ---------------------------------------------------------------------
# Tracing setup (OTLP)
# ---------------------------------------------------------------------

resource = Resource.create(
    {
        "service.name": SERVICE_NAME,
    }
)

trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)

otlp_exporter = OTLPSpanExporter(
    endpoint=OTLP_ENDPOINT,
    insecure=True,  # внутри docker-сети
)

span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# ---------------------------------------------------------------------
# Kafka producer
# ---------------------------------------------------------------------

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

# ---------------------------------------------------------------------
# Event generation
# ---------------------------------------------------------------------

USERS = ["alice", "bob", "charlie", "admin"]
HOSTS = ["win-01", "win-02", "win-03"]
IPS = ["10.0.0.10", "10.0.0.11", "10.0.0.12"]

PROCESS_SCENARIOS = [
    {
        "process_name": "powershell.exe",
        "command_line": "powershell -EncodedCommand SQBFAFgA",
        "parent_process": "explorer.exe",
    },
    {
        "process_name": "cmd.exe",
        "command_line": "cmd.exe /c whoami",
        "parent_process": "explorer.exe",
    },
    {
        "process_name": "notepad.exe",
        "command_line": "notepad.exe",
        "parent_process": "explorer.exe",
    },
]

LOGIN_SCENARIOS = [
    {"logon_type": "success"},
    {"logon_type": "failure"},
]

def generate_event():
    event_id = str(uuid.uuid4())
    event_type = random.choice(["process_start", "user_login"])

    event = {
        "event_id": event_id,
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "user": random.choice(USERS),
        "host": random.choice(HOSTS),
        "source_ip": random.choice(IPS),
        "event_type": event_type,
    }

    if event_type == "process_start":
        s = random.choice(PROCESS_SCENARIOS)
        event.update(
            {
                "process_name": s["process_name"],
                "command_line": s["command_line"],
                "parent_process": s["parent_process"],
                "logon_type": None,
            }
        )
    else:
        s = random.choice(LOGIN_SCENARIOS)
        event.update(
            {
                "process_name": None,
                "command_line": None,
                "parent_process": None,
                "logon_type": s["logon_type"],
            }
        )

    return event

# ---------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------

print("Starting Windows log producer (OTLP)...")
print(f"Kafka topic: {KAFKA_TOPIC}")
print(f"OTLP endpoint: {OTLP_ENDPOINT}")

while True:
    event = generate_event()
    event_id = event["event_id"]

    # event_id → trace_id (UUID → 128-bit int)
    trace_id = uuid.UUID(event_id).int

    parent_ctx = set_span_in_context(
        NonRecordingSpan(
            SpanContext(
                trace_id=trace_id,
                span_id=random.getrandbits(64),
                is_remote=False,
                trace_flags=TraceFlags(TraceFlags.SAMPLED),
                trace_state={},
            )
        )
    )

    with tracer.start_as_current_span(
        "produce_event",
        context=parent_ctx,
        kind=SpanKind.PRODUCER,
    ) as span:

        span.set_attribute("event.id", event_id)
        span.set_attribute("event.type", event["event_type"])
        span.set_attribute("host.name", event["host"])
        span.set_attribute("user.name", event["user"])

        with tracer.start_as_current_span("kafka_produce"):
            producer.send(KAFKA_TOPIC, event)
            producer.flush()

        print(f"Produced event {event_id} ({event['event_type']})")

    time.sleep(random.uniform(*EVENT_INTERVAL_SEC))