## Step 1: Setup - Import Libraries

In [1]:
import json
import os
import shutil
from pathlib import Path

from stixorm.module.definitions.os_threat import Sequence, SequenceExt
from stixorm.module.definitions.stix21 import Incident

print("‚úÖ Libraries imported successfully")

‚úÖ Libraries imported successfully


## Step 2: Setup Paths and Clear Context

In [2]:
# Define paths
context_mem_dir = Path("./generated/os-triage/context_mem")
test_incident_id = "incident--test-sequence-chain-00000000-0000-0000-0000-000000000001"
test_incident_dir = context_mem_dir / test_incident_id

# Clear the test incident directory if it exists
if test_incident_dir.exists():
    shutil.rmtree(test_incident_dir)
    print(f"üóëÔ∏è  Cleared existing test incident directory")

# Create fresh directory
test_incident_dir.mkdir(parents=True, exist_ok=True)
print(f"‚úÖ Created test incident directory: {test_incident_dir}")

# Define file paths
sequence_start_refs_file = test_incident_dir / "sequence_start_refs.json"
sequence_refs_file = test_incident_dir / "sequence_refs.json"

print(f"üìÅ Start refs file: {sequence_start_refs_file}")
print(f"üìÅ Sequence refs file: {sequence_refs_file}")

‚úÖ Created test incident directory: generated\os-triage\context_mem\incident--test-sequence-chain-00000000-0000-0000-0000-000000000001
üìÅ Start refs file: generated\os-triage\context_mem\incident--test-sequence-chain-00000000-0000-0000-0000-000000000001\sequence_start_refs.json
üìÅ Sequence refs file: generated\os-triage\context_mem\incident--test-sequence-chain-00000000-0000-0000-0000-000000000001\sequence_refs.json


## Step 3: Create Fake Incident and Update Context Map

In [4]:
# Create a minimal fake incident with proper extensions
import sys
from stixorm.module.definitions.os_threat import IncidentCoreExt

inc_ext = IncidentCoreExt(extension_type="property-extension")
inc_ext_id = "extension-definition--ef765651-680c-498d-9894-99799f2fa126"

fake_incident = Incident(
    name="Test Sequence Chaining Incident",
    description="Fake incident for testing sequence chaining logic",
    extensions={inc_ext_id: inc_ext}
)

# Convert to wrapped format
convert_dir = os.path.join(Path.cwd().parent, "Block_Families", "General", "_library")
sys.path.insert(0, convert_dir)
from convert_n_and_e import convert_node

incident_dict = json.loads(fake_incident.serialize())
nodes, edges = convert_node(incident_dict)
wrapped_incident = nodes[0]

# Save wrapped incident to file as a list
incident_file = test_incident_dir / "incident.json"
with open(incident_file, 'w') as f:
    json.dump([wrapped_incident], f, indent=2)

print(f"‚úÖ Created fake incident: {fake_incident.id}")

# Update context map to point to our test incident
context_map_file = context_mem_dir / "context_map.json"
context_map = {
    "current_incident": test_incident_id
}
with open(context_map_file, 'w') as f:
    json.dump(context_map, f, indent=2)

print(f"‚úÖ Updated context_map.json to point to test incident")

‚úÖ Created fake incident: incident--45050668-382c-489e-91f6-7aa7a365a36f
‚úÖ Updated context_map.json to point to test incident


## Step 4: Create First Fake Sequence (event type)

In [5]:
# Create first fake sequence with sequence_type = "event"
seq_ext = SequenceExt(extension_type="new-sdo")
seq_ext_id = 'extension-definition--be0c7c79-1961-43db-afde-637066a87a64'

fake_sequence_1 = Sequence(
    sequence_type="event",
    step_type="single_step",
    sequenced_object="event--11111111-1111-4111-8111-111111111111",
    extensions={seq_ext_id: seq_ext}
)

print(f"‚úÖ Created first fake sequence: {fake_sequence_1.id}")
print(f"   - sequence_type: {fake_sequence_1.sequence_type}")
print(f"   - step_type: {fake_sequence_1.step_type}")

‚úÖ Created first fake sequence: sequence--265c1981-78c9-4e92-987c-3ca15eff5cdb
   - sequence_type: event
   - step_type: single_step


## Step 5: Import Block Functions

In [6]:
import sys
import os
import importlib

# Add parent directory to path
parent_dir = os.path.dirname(os.getcwd())
save_context_dir = os.path.join(parent_dir, "Block_Families", "OS_Triage", "Save_Context")
sys.path.insert(0, save_context_dir)

import chain_sequence
import save_incident_context

# Reload modules to pick up any changes
importlib.reload(chain_sequence)
importlib.reload(save_incident_context)

from chain_sequence import chain_sequence as chain_seq_func
from save_incident_context import save_context

print("‚úÖ Imported and reloaded chain_sequence and save_context functions")

‚úÖ Imported and reloaded chain_sequence and save_context functions


## Step 6: Chain First Sequence (Should Create Start Step)

In [7]:
# Wrap the sequence object
# Add path for convert_n_and_e
convert_dir = os.path.join(parent_dir, "Block_Families", "General", "_library")
sys.path.insert(0, convert_dir)

from convert_n_and_e import convert_node

seq_1_dict = json.loads(fake_sequence_1.serialize())
nodes_1, edges_1 = convert_node(seq_1_dict)
wrapped_sequence_1 = nodes_1[0]

print("Wrapped sequence 1:")
print(f"  - id: {wrapped_sequence_1['id']}")
print(f"  - type: {wrapped_sequence_1['type']}")
print(f"  - original.sequence_type: {wrapped_sequence_1['original']['sequence_type']}")
print()

# Call chain_sequence
print("üîó Calling chain_sequence for first sequence...")
result_1 = chain_seq_func(wrapped_sequence_1)
print(f"Result: {result_1}")
print()

Wrapped sequence 1:
  - id: sequence--265c1981-78c9-4e92-987c-3ca15eff5cdb
  - type: sequence
  - original.sequence_type: event

üîó Calling chain_sequence for first sequence...
Result:  start sequence created and registered - 
stix_id -> sequence--1affc4b7-50a8-41ec-98c1-3ca4fbfa5170



## Step 7: Check Context Memory After First Chain

In [8]:
print("üìã Checking context memory after first chain...")
print()

# Check sequence_start_refs.json
if sequence_start_refs_file.exists():
    with open(sequence_start_refs_file, 'r') as f:
        start_refs = json.load(f)
    print(f"‚úÖ sequence_start_refs.json exists with {len(start_refs)} entries:")
    for i, entry in enumerate(start_refs, 1):
        print(f"   {i}. {entry['id']} (sequence_type: {entry['original']['sequence_type']}, step_type: {entry['original']['step_type']})")
        print(f"      next_steps: {entry['original'].get('next_steps', [])}")
else:
    print("‚ùå sequence_start_refs.json does NOT exist")

print()

# Check sequence_refs.json
if sequence_refs_file.exists():
    with open(sequence_refs_file, 'r') as f:
        seq_refs = json.load(f)
    print(f"üìÑ sequence_refs.json exists with {len(seq_refs)} entries")
else:
    print("üìÑ sequence_refs.json does NOT exist yet (expected)")

üìã Checking context memory after first chain...

‚úÖ sequence_start_refs.json exists with 1 entries:
   1. sequence--1affc4b7-50a8-41ec-98c1-3ca4fbfa5170 (sequence_type: event, step_type: start_step)
      next_steps: ['sequence--265c1981-78c9-4e92-987c-3ca15eff5cdb']

üìÑ sequence_refs.json does NOT exist yet (expected)


## Step 8: Save First Sequence to Context

In [9]:
print("üíæ Saving first sequence to incident context...")
save_result_1 = save_context(wrapped_sequence_1, "sequence")
print(f"Result: {save_result_1}")
print()

üíæ Saving first sequence to incident context...
Result:  incident context saved - 
stix_id -> sequence--265c1981-78c9-4e92-987c-3ca15eff5cdb



## Step 9: Check Context Memory After First Save

In [10]:
print("üìã Checking context memory after first save...")
print()

# Check sequence_refs.json
if sequence_refs_file.exists():
    with open(sequence_refs_file, 'r') as f:
        seq_refs = json.load(f)
    print(f"‚úÖ sequence_refs.json now has {len(seq_refs)} entries:")
    for i, entry in enumerate(seq_refs, 1):
        print(f"   {i}. {entry['id']} (sequence_type: {entry['original']['sequence_type']}, step_type: {entry['original']['step_type']})")
        print(f"      next_steps: {entry['original'].get('next_steps', [])}")
else:
    print("‚ùå sequence_refs.json still does NOT exist")

print()

# Re-check start refs (should be unchanged)
if sequence_start_refs_file.exists():
    with open(sequence_start_refs_file, 'r') as f:
        start_refs = json.load(f)
    print(f"‚úÖ sequence_start_refs.json still has {len(start_refs)} entries (unchanged)")
else:
    print("‚ùå sequence_start_refs.json does NOT exist")

üìã Checking context memory after first save...

‚úÖ sequence_refs.json now has 1 entries:
   1. sequence--265c1981-78c9-4e92-987c-3ca15eff5cdb (sequence_type: event, step_type: single_step)
      next_steps: []

‚úÖ sequence_start_refs.json still has 1 entries (unchanged)


## Step 10: Create Second Fake Sequence (event type)

In [11]:
# Create second fake sequence with sequence_type = "event"
fake_sequence_2 = Sequence(
    sequence_type="event",
    step_type="single_step",
    sequenced_object="event--22222222-2222-4222-8222-222222222222",
    extensions={seq_ext_id: seq_ext}
)

print(f"‚úÖ Created second fake sequence: {fake_sequence_2.id}")
print(f"   - sequence_type: {fake_sequence_2.sequence_type}")
print(f"   - step_type: {fake_sequence_2.step_type}")

‚úÖ Created second fake sequence: sequence--2c3a5610-8e81-4752-b28e-f18210191ab0
   - sequence_type: event
   - step_type: single_step


## Step 11: Chain Second Sequence (Should NOT Create New Start Step)

In [12]:
# Wrap the sequence object
seq_2_dict = json.loads(fake_sequence_2.serialize())
nodes_2, edges_2 = convert_node(seq_2_dict)
wrapped_sequence_2 = nodes_2[0]

print("Wrapped sequence 2:")
print(f"  - id: {wrapped_sequence_2['id']}")
print(f"  - type: {wrapped_sequence_2['type']}")
print(f"  - original.sequence_type: {wrapped_sequence_2['original']['sequence_type']}")
print()

# Call chain_sequence
print("üîó Calling chain_sequence for second sequence...")
result_2 = chain_seq_func(wrapped_sequence_2)
print(f"Result: {result_2}")
print()

Wrapped sequence 2:
  - id: sequence--2c3a5610-8e81-4752-b28e-f18210191ab0
  - type: sequence
  - original.sequence_type: event

üîó Calling chain_sequence for second sequence...
Result:  sequence chained and registered - 
stix_id -> sequence--265c1981-78c9-4e92-987c-3ca15eff5cdb



## Step 12: Check Context Memory After Second Chain (CRITICAL)

In [13]:
print("üìã CRITICAL CHECK: Verifying correct chaining behavior...")
print()

# Check sequence_start_refs.json
if sequence_start_refs_file.exists():
    with open(sequence_start_refs_file, 'r') as f:
        start_refs = json.load(f)
    
    event_start_steps = [s for s in start_refs if s['original']['sequence_type'] == 'event']
    
    if len(event_start_steps) == 1:
        print(f"‚úÖ CORRECT: Only 1 event start_step exists (should be 1)")
        print(f"   {event_start_steps[0]['id']}")
    else:
        print(f"‚ùå WRONG: {len(event_start_steps)} event start_steps exist (should be 1)")
        for i, entry in enumerate(event_start_steps, 1):
            print(f"   {i}. {entry['id']}")
else:
    print("‚ùå sequence_start_refs.json does NOT exist")

print()

# Check sequence_refs.json for chaining
if sequence_refs_file.exists():
    with open(sequence_refs_file, 'r') as f:
        seq_refs = json.load(f)
    
    print(f"üìÑ sequence_refs.json has {len(seq_refs)} entries:")
    for i, entry in enumerate(seq_refs, 1):
        print(f"   {i}. {entry['id']}")
        print(f"      sequence_type: {entry['original']['sequence_type']}")
        print(f"      step_type: {entry['original']['step_type']}")
        next_steps = entry['original'].get('next_steps', [])
        print(f"      next_steps: {next_steps}")
        
        # Check if first sequence now points to second sequence
        if i == 1 and len(next_steps) > 0:
            if next_steps[0] == wrapped_sequence_2['id']:
                print(f"      ‚úÖ CORRECT: First sequence now chains to second sequence!")
            else:
                print(f"      ‚ùå WRONG: First sequence points to {next_steps[0]}, not second sequence")
        print()
else:
    print("‚ùå sequence_refs.json does NOT exist")

üìã CRITICAL CHECK: Verifying correct chaining behavior...

‚úÖ CORRECT: Only 1 event start_step exists (should be 1)
   sequence--1affc4b7-50a8-41ec-98c1-3ca4fbfa5170

üìÑ sequence_refs.json has 1 entries:
   1. sequence--265c1981-78c9-4e92-987c-3ca15eff5cdb
      sequence_type: event
      step_type: single_step
      next_steps: ['sequence--2c3a5610-8e81-4752-b28e-f18210191ab0']
      ‚úÖ CORRECT: First sequence now chains to second sequence!



## Step 13: Save Second Sequence to Context

In [14]:
print("üíæ Saving second sequence to incident context...")
save_result_2 = save_context(wrapped_sequence_2, "sequence")
print(f"Result: {save_result_2}")
print()

üíæ Saving second sequence to incident context...
Result:  incident context saved - 
stix_id -> sequence--2c3a5610-8e81-4752-b28e-f18210191ab0



## Step 14: Final Verification

In [15]:
print("üéØ FINAL VERIFICATION")
print("=" * 60)
print()

# Check sequence_start_refs.json
if sequence_start_refs_file.exists():
    with open(sequence_start_refs_file, 'r') as f:
        start_refs = json.load(f)
    print(f"sequence_start_refs.json has {len(start_refs)} total entries")
    event_starts = [s for s in start_refs if s['original']['sequence_type'] == 'event']
    print(f"Event start_steps: {len(event_starts)} (expected: 1)")
    
    if len(event_starts) == 1:
        print("‚úÖ PASS: Correct number of event start_steps")
    else:
        print("‚ùå FAIL: Too many event start_steps created")
else:
    print("‚ùå FAIL: sequence_start_refs.json missing")

print()

# Check sequence_refs.json
if sequence_refs_file.exists():
    with open(sequence_refs_file, 'r') as f:
        seq_refs = json.load(f)
    print(f"sequence_refs.json has {len(seq_refs)} entries (expected: 2)")
    
    if len(seq_refs) == 2:
        print("‚úÖ PASS: Both sequences saved")
        
        # Check chaining
        first_seq = seq_refs[0]
        second_seq = seq_refs[1]
        
        next_steps = first_seq['original'].get('next_steps', [])
        if len(next_steps) > 0 and next_steps[0] == second_seq['id']:
            print("‚úÖ PASS: Sequences properly chained via next_steps")
        else:
            print("‚ùå FAIL: Sequences not properly chained")
    else:
        print("‚ùå FAIL: Wrong number of sequences")
else:
    print("‚ùå FAIL: sequence_refs.json missing")

print()
print("=" * 60)
print("Test complete!")

üéØ FINAL VERIFICATION

sequence_start_refs.json has 1 total entries
Event start_steps: 1 (expected: 1)
‚úÖ PASS: Correct number of event start_steps

sequence_refs.json has 2 entries (expected: 2)
‚úÖ PASS: Both sequences saved
‚úÖ PASS: Sequences properly chained via next_steps

Test complete!


## Step 15: Test with TASK Sequences (Like Step 3)

Now let's test with sequence_type="task" to match what Step 3 does.

In [16]:
print("\nüîÑ Testing TASK sequence type (matching Step 2 & Step 3 behavior)...")
print("=" * 60)

# Create first TASK sequence
fake_task_seq_1 = Sequence(
    sequence_type="task",
    step_type="single_step",
    sequenced_object="task--11111111-1111-4111-8111-111111111111",
    extensions={seq_ext_id: seq_ext}
)

# Wrap and chain it
task_seq_1_dict = json.loads(fake_task_seq_1.serialize())
task_nodes_1, task_edges_1 = convert_node(task_seq_1_dict)
wrapped_task_seq_1 = task_nodes_1[0]

print(f"\n‚úÖ Created first TASK sequence: {wrapped_task_seq_1['id']}")
result_task_1 = chain_seq_func(wrapped_task_seq_1)
print(f"Chain result: {result_task_1}")

# Save it
save_result_task_1 = save_context(wrapped_task_seq_1, "sequence")
print(f"Save result: {save_result_task_1}")


üîÑ Testing TASK sequence type (matching Step 2 & Step 3 behavior)...

‚úÖ Created first TASK sequence: sequence--ff64d00b-0ee7-4874-9ad1-fc59545bdeff
Chain result:  start sequence created and registered - 
stix_id -> sequence--e47a3156-9056-46a4-b1d6-555e935467b3
Save result:  incident context saved - 
stix_id -> sequence--ff64d00b-0ee7-4874-9ad1-fc59545bdeff


In [17]:
# Create second TASK sequence (THIS IS WHERE STEP 3 HANGS!)
print("\nüîó Creating second TASK sequence (like Step 3 does)...")

fake_task_seq_2 = Sequence(
    sequence_type="task",
    step_type="single_step",
    sequenced_object="task--22222222-2222-4222-8222-222222222222",
    extensions={seq_ext_id: seq_ext}
)

# Wrap it
task_seq_2_dict = json.loads(fake_task_seq_2.serialize())
task_nodes_2, task_edges_2 = convert_node(task_seq_2_dict)
wrapped_task_seq_2 = task_nodes_2[0]

print(f"‚úÖ Created second TASK sequence: {wrapped_task_seq_2['id']}")
print(f"‚ö†Ô∏è  Calling chain_sequence... (this is where Step 3 hangs)")

# THIS IS THE CRITICAL TEST - will it hang like Step 3?
result_task_2 = chain_seq_func(wrapped_task_seq_2)
print(f"‚úÖ SUCCESS! Chain completed: {result_task_2}")

# Save it
save_result_task_2 = save_context(wrapped_task_seq_2, "sequence")
print(f"Save result: {save_result_task_2}")


üîó Creating second TASK sequence (like Step 3 does)...
‚úÖ Created second TASK sequence: sequence--52879b26-2037-4e4b-aa9d-607fba5e74a5
‚ö†Ô∏è  Calling chain_sequence... (this is where Step 3 hangs)
‚úÖ SUCCESS! Chain completed:  sequence chained and registered - 
stix_id -> sequence--ff64d00b-0ee7-4874-9ad1-fc59545bdeff
Save result:  incident context saved - 
stix_id -> sequence--52879b26-2037-4e4b-aa9d-607fba5e74a5
‚úÖ SUCCESS! Chain completed:  sequence chained and registered - 
stix_id -> sequence--ff64d00b-0ee7-4874-9ad1-fc59545bdeff
Save result:  incident context saved - 
stix_id -> sequence--52879b26-2037-4e4b-aa9d-607fba5e74a5
