# CTI Analysis of UNC1549 Domain & IP Infrastructure

This notebook analyzes UNC1549 IOCs (domains and IPs) from a cyber threat intelligence perspective.

## 1) Data Loading
Load the CSV dataset and inspect its schema.

In [None]:
import re
import math
import socket
from collections import Counter

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
import networkx as nx

from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA

sns.set_theme(style="whitegrid")

DATA_PATH = "ioc_data.csv"
df = pd.read_csv(DATA_PATH)

df.head(), df.shape, df['type'].value_counts()

## 2) Domain vs IP Counts
Separate domains and IP addresses and visualize count distribution.

In [None]:
df['type_norm'] = df['type'].str.strip().str.lower()
domains = df[df['type_norm'] == 'domain']['id'].str.lower().dropna().drop_duplicates().reset_index(drop=True)
ips = df[df['type_norm'].isin(['ip', 'ip_address'])]['id'].str.strip().dropna().drop_duplicates().reset_index(drop=True)

count_df = pd.DataFrame({
    'IOC Type': ['Domain', 'IP Address'],
    'Count': [len(domains), len(ips)]
})

plt.figure(figsize=(6, 4))
ax = sns.barplot(data=count_df, x='IOC Type', y='Count', palette=['#2a9d8f', '#e76f51'])
for p in ax.patches:
    ax.annotate(f"{int(p.get_height())}", (p.get_x() + p.get_width()/2, p.get_height()),
                ha='center', va='bottom', fontsize=10)
plt.title('UNC1549 IOC Counts: Domains vs IPs')
plt.tight_layout()
plt.show()

count_df

## 3) Keyword Frequency in Domains
Tokenize domain names and extract frequent keywords.

In [None]:
def domain_tokens(domain: str):
    # Extract alphabetic chunks to highlight semantic naming patterns.
    toks = re.findall(r"[a-z]+", str(domain).lower())
    stop = {
        'com', 'net', 'org', 'co', 'io', 'www', 'azurewebsites',
        'azure', 'cloudapp', 'eastus', 'westus', 'uaenorth',
        'qatarcentral'
    }
    return [t for t in toks if t not in stop and len(t) > 2]

all_tokens = []
for d in domains:
    all_tokens.extend(domain_tokens(d))

freq = Counter(all_tokens)
kw_df = pd.DataFrame(freq.items(), columns=['Keyword', 'Frequency']).sort_values('Frequency', ascending=False)

top_n = 20
top_kw = kw_df.head(top_n)

plt.figure(figsize=(10, 6))
sns.barplot(data=top_kw, y='Keyword', x='Frequency', color='#264653')
plt.title(f'Top {top_n} Domain Keywords')
plt.tight_layout()
plt.show()

top_kw

## 4) Domain Name Entropy
Compute Shannon entropy for each domain and inspect randomness distribution.

In [None]:
def shannon_entropy(s: str) -> float:
    s = str(s)
    if not s:
        return 0.0
    counts = Counter(s)
    probs = [c / len(s) for c in counts.values()]
    return -sum(p * math.log2(p) for p in probs)

entropy_df = pd.DataFrame({'domain': domains})
entropy_df['length'] = entropy_df['domain'].str.len()
entropy_df['entropy'] = entropy_df['domain'].apply(shannon_entropy)
entropy_df['digit_count'] = entropy_df['domain'].str.count(r'\d')
entropy_df['hyphen_count'] = entropy_df['domain'].str.count(r'-')
entropy_df['label_count'] = entropy_df['domain'].str.count(r'\.') + 1

plt.figure(figsize=(8, 5))
sns.histplot(entropy_df['entropy'], bins=20, kde=True, color='#1d3557')
plt.title('Distribution of Domain Shannon Entropy')
plt.xlabel('Entropy (bits/char)')
plt.ylabel('Domain Count')
plt.tight_layout()
plt.show()

entropy_df[['domain', 'entropy']].sort_values('entropy', ascending=False).head(10)

## 5) Infrastructure Graph (Domain -> IP)
Resolve domain A records and build a bipartite graph linking domains to IOC IPs when overlaps exist.

In [None]:
def resolve_domain_ips(domain: str):
    try:
        _, _, addrs = socket.gethostbyname_ex(domain)
        return sorted(set(addrs))
    except Exception:
        return []

resolved_rows = []
for d in domains:
    for rip in resolve_domain_ips(d):
        resolved_rows.append((d, rip))

resolved_df = pd.DataFrame(resolved_rows, columns=['domain', 'resolved_ip'])
ioc_ip_set = set(ips)

if not resolved_df.empty:
    edges_df = resolved_df[resolved_df['resolved_ip'].isin(ioc_ip_set)].copy()
else:
    edges_df = pd.DataFrame(columns=['domain', 'resolved_ip'])

print(f"Resolved domain->IP pairs: {len(resolved_df)}")
print(f"Edges that match IOC IP list: {len(edges_df)}")
edges_df.head()

In [None]:
G = nx.Graph()

for d in domains:
    G.add_node(d, node_type='domain')
for ip in ips:
    G.add_node(ip, node_type='ip')
for _, row in edges_df.iterrows():
    G.add_edge(row['domain'], row['resolved_ip'])

if G.number_of_edges() == 0:
    print("No matching domain-to-IOC-IP edges were found in live DNS resolution.")
else:
    pos = nx.spring_layout(G, seed=42, k=0.6)

    edge_x, edge_y = [], []
    for u, v in G.edges():
        x0, y0 = pos[u]
        x1, y1 = pos[v]
        edge_x += [x0, x1, None]
        edge_y += [y0, y1, None]

    node_x, node_y, node_color, node_text = [], [], [], []
    for n, attrs in G.nodes(data=True):
        x, y = pos[n]
        node_x.append(x)
        node_y.append(y)
        node_color.append('#457b9d' if attrs.get('node_type') == 'domain' else '#e63946')
        node_text.append(f"{attrs.get('node_type')}: {n}")

    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=edge_x, y=edge_y,
        mode='lines',
        line=dict(width=1, color='#999'),
        hoverinfo='none'
    ))
    fig.add_trace(go.Scatter(
        x=node_x, y=node_y,
        mode='markers',
        marker=dict(size=10, color=node_color, line=dict(width=1, color='black')),
        text=node_text,
        hoverinfo='text'
    ))

    fig.update_layout(
        title='UNC1549 Infrastructure Graph (Domains linked to IOC IPs)',
        showlegend=False,
        template='plotly_white',
        margin=dict(l=20, r=20, t=50, b=20)
    )
    fig.show()

## 6) Clustering Analysis of Domains
Create numeric features from domain strings, run KMeans clustering, and visualize in 2D PCA space.

In [None]:
cluster_df = entropy_df.copy()
cluster_df['alpha_count'] = cluster_df['domain'].str.count(r'[A-Za-z]')
cluster_df['special_count'] = cluster_df['domain'].str.count(r'[^A-Za-z0-9]')

feature_cols = ['length', 'digit_count', 'hyphen_count', 'label_count', 'entropy', 'alpha_count', 'special_count']
X = cluster_df[feature_cols].astype(float)

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

k = min(4, len(cluster_df))
kmeans = KMeans(n_clusters=k, random_state=42, n_init=20)
cluster_df['cluster'] = kmeans.fit_predict(X_scaled)

pca = PCA(n_components=2, random_state=42)
coords = pca.fit_transform(X_scaled)
cluster_df['pc1'] = coords[:, 0]
cluster_df['pc2'] = coords[:, 1]

fig = px.scatter(
    cluster_df,
    x='pc1',
    y='pc2',
    color=cluster_df['cluster'].astype(str),
    hover_data=['domain', 'entropy', 'length', 'digit_count', 'hyphen_count'],
    title='KMeans Clusters of UNC1549 Domains (PCA 2D Projection)',
    template='plotly_white'
)
fig.update_traces(marker=dict(size=10, line=dict(width=0.5, color='black')))
fig.show()

cluster_df[['domain', 'cluster', 'entropy', 'length', 'digit_count', 'hyphen_count']].head(15)

## 7) GeoIP Mapping (Optional)
Set `RUN_GEOIP = True` to fetch IP geolocation (online API) and plot distribution.

In [None]:
RUN_GEOIP = False

if RUN_GEOIP:
    import requests

    geo_rows = []
    for ip in ips:
        try:
            r = requests.get(f"https://ipapi.co/{ip}/json/", timeout=8)
            data = r.json()
            if 'latitude' in data and 'longitude' in data:
                geo_rows.append({
                    'ip': ip,
                    'city': data.get('city'),
                    'country': data.get('country_name'),
                    'lat': data.get('latitude'),
                    'lon': data.get('longitude')
                })
        except Exception:
            continue

    geo_df = pd.DataFrame(geo_rows)
    if geo_df.empty:
        print('No geolocation data returned.')
    else:
        fig = px.scatter_geo(
            geo_df,
            lat='lat',
            lon='lon',
            hover_name='ip',
            hover_data=['city', 'country'],
            title='GeoIP Distribution of UNC1549 IP Infrastructure',
            projection='natural earth'
        )
        fig.show()
        display(geo_df)
else:
    print('GeoIP step is disabled. Set RUN_GEOIP = True to execute API lookups.')

## Notes
- Domain-to-IP links depend on live DNS and may differ from historical malicious infrastructure.
- For higher-fidelity CTI enrichment, integrate passive DNS, WHOIS, ASN, TLS cert, and historical resolution datasets.