In [12]:
# 11 October 2023

import json
import sqlite3
import csv
from datetime import datetime
from typing import List, Dict, Union, Tuple, Optional, Any

# Initial Variables
DATABASE_NAME = 'events2.db'
DATAFILE_NAME = 'data_2023-10-11.json'


def create_database() -> None:
    """
    Create a SQLite database with tables named 'events', 'threat_actors', and 'sources'.
    """
    conn = sqlite3.connect(DATABASE_NAME)
    cursor = conn.cursor()
    
    # Create events table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS events (
        _key TEXT PRIMARY KEY,
        description TEXT,
        eventConfidence TEXT,
        eventDateFrom TEXT,
        eventName TEXT,
        eventType TEXT,
        country TEXT,
        countryAbbreviation TEXT,
        threatActorKey TEXT,
        FOREIGN KEY (threatActorKey) REFERENCES threat_actors(_key)
    );
    ''')
    
    # Create threat_actors table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS threat_actors (
        _key TEXT PRIMARY KEY,
        name TEXT,
        type TEXT,
        profiled TEXT,
        identifiers TEXT,
        active TEXT,
        apt TEXT,
        allegiance TEXT,
        origin TEXT,
        targetedSectors TEXT,
        description TEXT
    );
    ''')
    
    # Create sources table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS sources (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        eventKey TEXT,
        URL TEXT,
        title TEXT,
        sourceName TEXT,
        FOREIGN KEY (eventKey) REFERENCES events(_key)
    );
    ''')
    
    conn.commit()
    conn.close()


def insert_data(data: List[Dict[str, Any]]) -> None:
    """
    Insert the data into SQLite database tables: events, threat_actors, and sources.

    Parameters:
        data (List[Dict[str, Any]]): A list of dictionaries, each representing an event.

    Returns:
        None
    """
    conn = sqlite3.connect(DATABASE_NAME)
    cursor = conn.cursor()

    for record in data:
        event = record.get('event') or {}
        threat_actor = record.get('threatActor') or {}
        location = record.get('location') or {}
        sources = record.get('sources') or []

        # Insert into events table
        if event.get('_key'):
            cursor.execute("SELECT 1 FROM events WHERE _key=?", (event['_key'],))
            if cursor.fetchone() is None:
                cursor.execute(
                    "INSERT INTO events (_key, description, eventConfidence, eventDateFrom, eventName, eventType, country, countryAbbreviation, threatActorKey) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
                    (event.get('_key'), event.get('description'), event.get('eventConfidence'), event.get('eventDateFrom'), event.get('eventName'), event.get('type'),
                     location.get('country'), location.get('countryAbbreviation'), threat_actor.get('_key')))

        # Insert into threat_actors table
        if threat_actor.get('_key'):
            cursor.execute("SELECT 1 FROM threat_actors WHERE _key=?", (threat_actor['_key'],))
            if cursor.fetchone() is None:
                cursor.execute(
                    "INSERT INTO threat_actors (_key, name, type, profiled, identifiers, active, apt, allegiance, origin, targetedSectors, description) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                    (threat_actor.get('_key'), threat_actor.get('name'), threat_actor.get('type'), threat_actor.get('profiled'), threat_actor.get('identifiers'),
                     threat_actor.get('active'), threat_actor.get('apt'), threat_actor.get('allegiance'), threat_actor.get('origin'), threat_actor.get('targetedSectors'), threat_actor.get('Description')))

        # Insert into sources table
        for source in sources:
            cursor.execute("SELECT 1 FROM sources WHERE URL=?", (source.get('URL'),))
            if cursor.fetchone() is None:
                cursor.execute("INSERT INTO sources (eventKey, URL, title, sourceName) VALUES (?, ?, ?, ?)",
                               (event.get('_key'), source.get('URL'), source.get('title'), source.get('sourceName')))

    conn.commit()
    conn.close()

def search_threat_actor(threat_actor_key: str) -> None:
    """
    Search for a threat actor in the 'threat_actors' table and print their profile.
    """
    # Connect to the SQLite database
    conn = sqlite3.connect(DATABASE_NAME)
    cursor = conn.cursor()
    
    # Query the 'threat_actors' table
    cursor.execute("SELECT * FROM threat_actors WHERE _key = ?", (threat_actor_key,))
    result = cursor.fetchone()
    
    if result:
        print("Threat Actor Profile:")
        print(f"Key: {result[0]}")
        print(f"Name: {result[1]}")
        print(f"Type: {result[2]}")
        print(f"Profiled: {result[3]}")
        print(f"Identifiers: {result[4]}")
        print(f"Active: {result[5]}")
        print(f"APT: {result[6]}")
        print(f"Allegiance: {result[7]}")
        print(f"Origin: {result[8]}")
        print(f"Targeted Sectors: {result[9]}")
        print(f"Description: {result[10]}")
    else:
        print("Threat actor not found.")
    
    # Close the connection
    conn.close()

    # Example usage
    # search_threat_actor("ITARMYOFUKRAINE")

def search_events_by_date_range(start_date: str, end_date: str, existing_results: Optional[List[Tuple]] = None) -> List[Tuple]:
    """
    Search for events within a specific date range. If existing_results is provided, filters those results instead of querying the database.
    
    Args:
        start_date (str): The start date in the format 'YYYY-MM-DD'.
        end_date (str): The end date in the format 'YYYY-MM-DD'.
        existing_results (Optional[List[Tuple]]): Existing results to filter, if available.
        
    Returns:
        List[Tuple]: List of tuples containing event data within the specified date range.
    """
    # Convert the input dates to the same format as in the database (ISO 8601)
    start_date_iso = datetime.strptime(start_date, '%Y-%m-%d').isoformat() + 'T00:00:00Z'
    end_date_iso = datetime.strptime(end_date, '%Y-%m-%d').isoformat() + 'T00:00:00Z'
    
    if existing_results is None:
        # Connect to the SQLite database
        conn = sqlite3.connect(DATABASE_NAME)
        cursor = conn.cursor()
        
        # Execute the query to find events within the date range
        cursor.execute('''
        SELECT * FROM events
        WHERE eventDateFrom >= ? AND eventDateFrom <= ?;
        ''', (start_date_iso, end_date_iso))
        
        # Fetch all matching records
        results = cursor.fetchall()
        
        # Close the database connection
        conn.close()
    else:
        results = [event for event in existing_results if start_date_iso <= event[3] <= end_date_iso]
    
    return results

In [14]:
# Initialize the database
create_database()

# Example of loading data from a file
with open(DATAFILE_NAME, 'r') as f:
    data = json.load(f)

# Insert the data into the database
insert_data(data)



In [15]:

search_threat_actor("ITARMYOFUKRAINE")


Threat Actor Profile:
Key: ITARMYOFUKRAINE
Name: IT Army of Ukraine
Type: Nation State
Profiled: Yes
Identifiers: 
Active: 2022
APT: No
Allegiance: Ukraine
Origin: UA
Targeted Sectors: Finance; Government; Military; Telecommunitcations
Description: The IT Army of Ukraine is a collective of volunteers from around the world. The collective was formed on February 26, 2022 when Ukraine's Vice Prime Minister - Mykhailo Fedorov - called on cyberattacks against a list of Russian organizations. The group primarily engages in DDoS attacks against Russian organizations deemed to have a stake in the war. Volunteers of the group are involved in both defensive and offensive cyber operations.
