# Network analysis of Python libraries and AI-related packages

Este notebook constrói uma **rede de dependências entre pacotes Python** e enriquece cada nó com **métricas de vulnerabilidades Snyk**, permitindo comparar:

- **Rede completa de pacotes** (todos os packages do CSV)
- **Sub-rede apenas de libs usadas para IA/ML** (ex.: `torch`, `transformers`, `mlflow`, `langchain`, `vllm`, etc.)

> ⚠️ **Pré-requisitos de arquivos**
>
> - `VULN_CSV`: CSV com as colunas  
>   `package,cve,cwes,severity,first_affected_version,first_affected_date,disclosed_date,mitigation_version,mitigation_date,disclosure_lag_days,time_to_fix_from_first_days,time_to_fix_from_disclosure_days,fix_semver_type`  
>   (é o dataset que você já mostrou).
> - `DEPS_CSV`: CSV com o grafo de dependências entre pacotes, com **uma linha por aresta**:
>   - `source`: pacote que depende (projeto)
>   - `target`: pacote requerido (dependência, o “importado”)
>
> Este notebook **não extrai imports**; ele assume que seu pipeline anterior já gerou esse arquivo de dependências.


In [None]:
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

# Apenas para visualização mais amigável
pd.set_option("display.max_columns", 50)
pd.set_option("display.width", 140)

In [None]:
# =========================
# Configurações principais
# =========================

# Caminhos dos arquivos de entrada
VULN_CSV = "top_pypi_snyk_timeline_20231101_20251101.csv"   # <-- ajuste para o nome real do seu CSV
DEPS_CSV = "python_dependencies_edges.csv"     # <-- ajuste para o arquivo de arestas gerado no pipeline

# Mapa de severidade para valores numéricos
severity_map = {
    "low": 1,
    "moderate": 2,
    "medium": 2,   # caso apareça "medium" em algum lugar
    "high": 3,
    "critical": 4
}

# Lista curada de libs voltadas para IA/ML/LLM presentes no dataset
AI_LIBS = sorted({
    # Núcleo de DL/ML
    "torch", "scikit-learn", "lightgbm", "pytorch-lightning", "keras",
    "paddlepaddle", "onnx", "xgboost",
    # LLM / RAG / LangChain & família
    "transformers", "langchain", "langchain-core", "langchain-community",
    "langchain-experimental", "llama-index", "llama-index-core",
    "llama-index-cli", "vllm", "qdrant-client", "mlflow",
    # MLOps / pipelines / experiment tracking
    "clearml", "feast", "sagemaker", "kedro", "prefect",
    # Interfaces/ferramentas típicas de IA generativa
    "gradio", "streamlit", "browser-use", "ydata-profiling",
    # Outras libs correlatas do seu dataset
    "fastmcp", "litellm", "skops", "lightning", "pytorch-lightning"
})

print(f"Total de libs marcadas como IA: {len(AI_LIBS)}")
AI_LIBS

In [None]:
# =========================
# Carregar vulnerabilidades Snyk
# =========================

vulns = pd.read_csv(VULN_CSV)

print("Amostra do dataset de vulnerabilidades:")
display(vulns.head())

# Parse de datas
date_cols = ["first_affected_date", "disclosed_date", "mitigation_date"]
for col in date_cols:
    if col in vulns.columns:
        vulns[col] = pd.to_datetime(vulns[col], errors="coerce")

# Garantir colunas numéricas
for col in ["disclosure_lag_days", "time_to_fix_from_first_days", "time_to_fix_from_disclosure_days"]:
    if col in vulns.columns:
        vulns[col] = pd.to_numeric(vulns[col], errors="coerce")

# Severidade numérica
vulns["severity_norm"] = (
    vulns["severity"]
    .astype(str)
    .str.lower()
    .map(severity_map)
)

print("\nResumo de severidade:")
display(vulns["severity"].value_counts(dropna=False))

In [None]:
# =========================
# Agregar atributos por pacote (nó)
# =========================

agg = (
    vulns
    .groupby("package")
    .agg(
        vuln_count=("cve", "nunique"),  # número de CVEs distintos
        vuln_rows=("cve", "size"),      # número de linhas (inclui linhas sem CVE)
        max_severity=("severity_norm", "max"),
        mean_severity=("severity_norm", "mean"),
        mean_fix_from_disclosure=("time_to_fix_from_disclosure_days", "mean"),
        mean_fix_from_first=("time_to_fix_from_first_days", "mean"),
        first_disclosure=("disclosed_date", "min"),
        last_disclosure=("disclosed_date", "max")
    )
    .reset_index()
)

print("Atributos agregados por pacote:")
display(agg.head())

print("\nTotal de packages distintos com vulnerabilidades:", len(agg))

In [None]:
# =========================
# Carregar arestas de dependências e construir o grafo
# =========================

deps = pd.read_csv(DEPS_CSV)

print("Amostra do arquivo de dependências:")
display(deps.head())

# Ajuste aqui se suas colunas tiverem outros nomes
SRC_COL = "source"  # projeto que depende
DST_COL = "target"  # dependência requerida

missing_cols = {SRC_COL, DST_COL} - set(deps.columns)
if missing_cols:
    raise ValueError(f"As colunas {missing_cols} não foram encontradas em {DEPS_CSV}. Ajuste SRC_COL/DST_COL.")

# Construir grafo direcionado: source -> target
G = nx.DiGraph()

for _, row in deps.iterrows():
    src = str(row[SRC_COL])
    dst = str(row[DST_COL])
    if src and dst and src != "nan" and dst != "nan":
        G.add_edge(src, dst)

# Garantir que todos os packages vulneráveis apareçam, mesmo que sem arestas
for pkg in agg["package"].astype(str):
    if pkg not in G:
        G.add_node(pkg)

print(f"Grafo criado com {G.number_of_nodes()} nós e {G.number_of_edges()} arestas.")

In [None]:
# =========================
# Atribuir atributos aos nós (do DataFrame 'agg')
# =========================

attr_dict = (
    agg
    .set_index("package")
    .to_dict(orient="index")
)

nx.set_node_attributes(G, attr_dict)

# Flag de lib IA
ai_flags = {pkg: (pkg in AI_LIBS) for pkg in G.nodes()}
nx.set_node_attributes(G, ai_flags, "is_ai_lib")

# Verificar exemplo de nó
sample_node = list(G.nodes())[0]
print("Exemplo de nó com atributos:")
print(sample_node, G.nodes[sample_node])

In [None]:
# =========================
# Métricas de rede
# =========================

# Graus
degree_dict = dict(G.degree())
in_degree_dict = dict(G.in_degree())
out_degree_dict = dict(G.out_degree())

nx.set_node_attributes(G, degree_dict, "degree")
nx.set_node_attributes(G, in_degree_dict, "in_degree")
nx.set_node_attributes(G, out_degree_dict, "out_degree")

# Betweenness centrality (pode ser custoso em grafos muito grandes)
print("Calculando betweenness centrality (pode demorar um pouco)...")
betweenness_dict = nx.betweenness_centrality(G, normalized=True)
nx.set_node_attributes(G, betweenness_dict, "betweenness")

# Eigenvector centrality (usando versão baseada em álgebra linear)
print("Calculando eigenvector centrality (usando grafo não direcionado)...")
UG = G.to_undirected()
try:
    eigenvector_dict = nx.eigenvector_centrality_numpy(UG)
except Exception as e:
    print("Falha ao calcular eigenvector centrality com método numpy:", e)
    eigenvector_dict = {}

nx.set_node_attributes(G, eigenvector_dict, "eigenvector")

# Densidade da rede (não direcionada)
density = nx.density(UG)
print(f"\nDensidade da rede (não direcionada): {density:.6f}")

# Comunidades e modularidade
print("Calculando comunidades (greedy modularity)...")
from networkx.algorithms import community

communities = list(community.greedy_modularity_communities(UG))
modularity_value = community.modularity(UG, communities)
print(f"Modularidade (greedy): {modularity_value:.6f}")
print(f"Número de comunidades detectadas: {len(communities)}")

In [None]:
# =========================
# Consolidar atributos de nós em um DataFrame
# =========================

nodes_data = []
for node, data in G.nodes(data=True):
    row = {"package": node}
    row.update(data)
    nodes_data.append(row)

nodes_df = pd.DataFrame(nodes_data)

print("Amostra do DataFrame de nós:")
display(nodes_df.head())

print("\nColunas disponíveis em nodes_df:")
print(nodes_df.columns.tolist())

In [None]:
# =========================
# Comparação: rede completa vs libs de IA
# =========================

nodes_total = nodes_df.copy()
nodes_ai = nodes_df[nodes_df["is_ai_lib"] == True].copy()
nodes_non_ai = nodes_df[nodes_df["is_ai_lib"] == False].copy()

print(f"Total de nós (todos os packages): {len(nodes_total)}")
print(f"Total de nós (libs IA): {len(nodes_ai)}")
print(f"Total de nós (não IA): {len(nodes_non_ai)}")

def describe_metric(df, col):
    return df[col].describe().to_frame(name=col)

metrics_to_compare = ["degree", "in_degree", "out_degree", "betweenness", "eigenvector", "max_severity", "mean_fix_from_disclosure"]

summary_total = pd.concat(
    [describe_metric(nodes_total, m) for m in metrics_to_compare if m in nodes_total.columns],
    axis=1
)

summary_ai = pd.concat(
    [describe_metric(nodes_ai, m) for m in metrics_to_compare if m in nodes_ai.columns],
    axis=1
)

summary_non_ai = pd.concat(
    [describe_metric(nodes_non_ai, m) for m in metrics_to_compare if m in nodes_non_ai.columns],
    axis=1
)

print("\nResumo das métricas - REDE COMPLETA:")
display(summary_total)

print("\nResumo das métricas - LIBS IA:")
display(summary_ai)

print("\nResumo das métricas - NÃO IA:")
display(summary_non_ai)

In [None]:
# =========================
# Top nós por centralidade (geral vs IA)
# =========================

TOP_N = 20

def top_by(df, col, label):
    if col not in df.columns:
        print(f"Coluna {col} não encontrada.")
        return
    print(f"\nTop {TOP_N} por {col} - {label}:")
    display(
        df[["package", col, "vuln_count", "max_severity", "mean_fix_from_disclosure"]]
        .sort_values(col, ascending=False)
        .head(TOP_N)
    )

for metric in ["degree", "betweenness", "eigenvector"]:
    top_by(nodes_total, metric, "Rede completa")
    top_by(nodes_ai, metric, "Apenas libs IA")

In [None]:
# =========================
# Visualizações simples: distribuição de grau e severidade
# =========================

plt.figure()
nodes_total["degree"].hist(bins=30)
plt.title("Distribuição do grau - rede completa")
plt.xlabel("degree")
plt.ylabel("frequência")
plt.show()

plt.figure()
nodes_ai["degree"].hist(bins=15)
plt.title("Distribuição do grau - libs IA")
plt.xlabel("degree")
plt.ylabel("frequência")
plt.show()

# Comparação de severidade máxima (1=low .. 4=critical)
plt.figure()
nodes_total["max_severity"].hist(bins=[0.5,1.5,2.5,3.5,4.5])
plt.title("Severidade máxima por pacote - rede completa")
plt.xlabel("max_severity (1=low, 4=critical)")
plt.ylabel("freq")
plt.show()

plt.figure()
nodes_ai["max_severity"].hist(bins=[0.5,1.5,2.5,3.5,4.5])
plt.title("Severidade máxima por pacote - libs IA")
plt.xlabel("max_severity (1=low, 4=critical)")
plt.ylabel("freq")
plt.show()

In [None]:
# =========================
# Comunidades contendo libs IA (visão rápida)
# =========================

# Mapeia cada nó para o ID da comunidade (0..N-1)
node_to_comm = {}
for i, comm in enumerate(communities):
    for node in comm:
        node_to_comm[node] = i

nodes_df["community_id"] = nodes_df["package"].map(node_to_comm)

ai_communities = (
    nodes_df[nodes_df["is_ai_lib"] == True]
    .groupby("community_id")
    .size()
    .sort_values(ascending=False)
)

print("Comunidades (community_id) com mais libs IA:")
display(ai_communities.head(10))

print("\nExemplo de comunidade com libs IA (top 1):")
if not ai_communities.empty:
    top_comm_id = ai_communities.index[0]
    display(nodes_df[nodes_df["community_id"] == top_comm_id][["package", "is_ai_lib", "degree", "max_severity"]]
            .sort_values("is_ai_lib", ascending=False))