In [4]:
from stix2 import Indicator, ObservedData, NetworkTraffic, IPv4Address, Bundle
from datetime import datetime
import pandas as pd
import logging

csv_file_path = 'TRG_dataset01_test_ml.csv'
chunk_size = 10000  # Adjust based on performance needs
chunks = pd.read_csv(csv_file_path, chunksize=chunk_size)

def parse_timestamp(timestamp):
    # Try parsing the exact format
    try:
        return datetime.strptime(timestamp, '%Y-%m-%d')
    except ValueError:
        logging.error(f"Failed to parse timestamp: {timestamp}")
        return None  # Return None if parsing fails


def create_indicator(row):
    return Indicator(
        name=f"Indicator for {row['threat_type']}",
        pattern=f"[ipv4-addr:value = '{row['source_ip']}']",
        pattern_type="stix",
        valid_from=parse_timestamp(row['timestamp'])
    )

def create_observed_data(row):
    first_observed = parse_timestamp(row['timestamp'])
    last_observed = parse_timestamp(row['timestamp'])

    if first_observed is None or last_observed is None:
        logging.error(f"Invalid timestamp for row: {row}")
        return None  # Skip this row if timestamps are invalid

    src_ip = IPv4Address(value=row['source_ip'])
    dst_ip = IPv4Address(value=row['destination_ip'])

    try:
        src_port = int(row['source_port'])
    except (ValueError, TypeError):
        src_port = None

    try:
        dst_port = int(row['destination_port'])
    except (ValueError, TypeError):
        dst_port = None

    network_traffic = NetworkTraffic(
        protocols=[str(row['protocol']).lower()],
        src_ref=src_ip.id,
        dst_ref=dst_ip.id,
        src_port=src_port,
        dst_port=dst_port
    )

    return ObservedData(
        first_observed=first_observed,
        last_observed=last_observed,
        number_observed=1,
        objects={src_ip.id: src_ip, dst_ip.id: dst_ip, network_traffic.id: network_traffic}
    )


for chunk in chunks:
    for index, row in chunk.iterrows():
        try:
            indicator = create_indicator(row)
            observed_data = create_observed_data(row)

            if indicator:
                all_indicators.append(indicator)
            if observed_data:
                all_observed_data.append(observed_data)
        except Exception as e:
            logging.error(f"Error processing row {index}: {e}")


bundle = Bundle(objects=all_indicators + all_observed_data)

with open('stix_bundle.json', 'w') as f:
    f.write(bundle.serialize(pretty=True))
logging.info(f"STIX bundle created and exported to stix_bundle.json")


In [10]:
from stix2 import Indicator, ObservedData, NetworkTraffic, IPv4Address, Bundle
from datetime import datetime
import pandas as pd
import logging

csv_file_path = 'TRG_dataset01_test_ml.csv'
chunk_size = 10000  # Adjust based on performance needs
chunks = pd.read_csv(csv_file_path, chunksize=chunk_size)

def parse_timestamp(timestamp):
    try:
        # Attempt to parse full datetime with microseconds first
        return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
    except ValueError:
        try:
            # Attempt to parse full datetime without microseconds
            return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
        except ValueError:
            try:
                # Attempt to parse date only
                return datetime.strptime(timestamp, '%Y-%m-%d')
            except ValueError:
                logging.error(f"Failed to parse timestamp entirely: {timestamp}")
                return None  # Return None if parsing fails

def create_indicator(row):
    return Indicator(
        name=f"Indicator for {row['threat_type']}",
        pattern=f"[ipv4-addr:value = '{row['source_ip']}']",
        pattern_type="stix",
        valid_from=parse_timestamp(row['timestamp'])
    )

def create_observed_data(row):
    first_observed = parse_timestamp(row['timestamp'])
    last_observed = parse_timestamp(row['timestamp'])

    if first_observed is None or last_observed is None:
        logging.error(f"Invalid timestamp for row: {row}")
        return None  # Skip this row if timestamps are invalid

    src_ip = IPv4Address(value=row['source_ip'])
    dst_ip = IPv4Address(value=row['destination_ip'])

    try:
        src_port = int(row['source_port'])
    except (ValueError, TypeError):
        src_port = None

    try:
        dst_port = int(row['destination_port'])
    except (ValueError, TypeError):
        dst_port = None

    network_traffic = NetworkTraffic(
        protocols=[str(row['protocol']).lower()],
        src_ref=src_ip.id,
        dst_ref=dst_ip.id,
        src_port=src_port,
        dst_port=dst_port
    )

    return ObservedData(
        first_observed=first_observed,
        last_observed=last_observed,
        number_observed=1,
        objects={src_ip.id: src_ip, dst_ip.id: dst_ip, network_traffic.id: network_traffic}
    )

all_indicators = []
all_observed_data = []

for chunk in chunks:
    for index, row in chunk.iterrows():
        try:
            indicator = create_indicator(row)
            observed_data = create_observed_data(row)

            if indicator:
                all_indicators.append(indicator)
            if observed_data:
                all_observed_data.append(observed_data)
        except Exception as e:
            logging.error(f"Error processing row {index}: {e}")

bundle = Bundle(objects=all_indicators + all_observed_data)

with open('stix_bundle.json', 'w') as f:
    f.write(bundle.serialize(pretty=True))
logging.info(f"STIX bundle created and exported to stix_bundle.json")

