# Get Repos from github

Ce script Python a pour but de télécharger automatiquement des fichiers Python à partir de dépôts GitHub listés dans un fichier CSV, et de les stocker dans un dossier spécifié sur Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import requests
import pandas as pd
import time

def telecharger_fichier(url, chemin):
    chemin = os.path.join('/content/drive/My Drive', chemin)
    if chemin.endswith('.py'):
        os.makedirs(os.path.dirname(chemin), exist_ok=True)
        for attempt in range(5):  # Réessayez jusqu'à 5 fois en cas d'échec
            try:
                reponse = requests.get(url)
                if reponse.status_code == 200:
                    with open(chemin, 'wb') as f:
                        f.write(reponse.content)
                    break
                else:
                    print(f"Erreur lors du téléchargement du fichier {url}: {reponse.status_code}")
            except Exception as e:
                print(f"Erreur lors du téléchargement du fichier {url}: {e}")
                time.sleep(5)  # Pause de 5 secondes avant de réessayer

def parcourir_dossiers_et_telecharger(repo_url, chemin_base, nom_repo, dossier_global, token):
    headers = {'Authorization': f'token {token}'}
    api_url = f"https://api.github.com/repos/{repo_url}/contents/{chemin_base}"
    for attempt in range(5):  # Réessayez jusqu'à 5 fois en cas d'échec
        try:
            reponse = requests.get(api_url, headers=headers)
            if reponse.status_code == 200:
                contenu = reponse.json()
                for item in contenu:
                    chemin_complet = os.path.join(dossier_global, nom_repo, chemin_base, item['name'])
                    if item['type'] == 'file' and item['name'].endswith('.py'):
                        telecharger_fichier(item['download_url'], chemin_complet)
                    elif item['type'] == 'dir':
                        parcourir_dossiers_et_telecharger(repo_url, os.path.join(chemin_base, item['name']), nom_repo, dossier_global, token)
                break
            else:
                print(f"Erreur lors de l'accès à {api_url}: {reponse.status_code}")
        except Exception as e:
            print(f"Erreur lors de l'accès à {api_url}: {e}")
            time.sleep(5)  # Pause de 5 secondes avant de réessayer

# Configuration initiale
dossier_global = "/content/drive/My Drive/Dataset/Test/ddos/benin"
os.makedirs(dossier_global, exist_ok=True)

token="xxxx"
# Lire le fichier Excel contenant les dépôts et les auteurs
# Lire le fichier CSV contenant les dépôts et les auteurs
df = pd.read_csv('/content/benin_ddos.csv')

for index, row in df.iterrows():
    repo_url = f"{row['Author']}/{row['Repo']}"
    nom_repo = repo_url.split('/')[-1]
    projet_path = os.path.join(dossier_global, nom_repo)

    # Vérifier si le projet existe déjà
    if os.path.exists(projet_path):
        print(f"Projet {row['Repo']} déjà téléchargé. Passer au suivant.")
        continue

    print(f"Projet: {row['Repo']}")
    parcourir_dossiers_et_telecharger(repo_url, '', nom_repo, dossier_global, token)
    print(f"Projet num {index} : son nom est : {row['Repo']}")
    time.sleep(1)  # Pause de 1 seconde entre les requêtes pour éviter de surcharger l'API


# GraphE d'appel

Installation de la bibliotheque pycg qui genre les graphe d'appel

In [None]:
#Installation de la bibliotheque pycg qui genre les graphe d'appel
!git clone https://github.com/vitsalis/PyCG.git
%cd PyCG
!pip install .


In [None]:
%cd ..

In [None]:
%pwd


Ce script est  conçu pour parcourir chaque sous-dossier dans un dossier global sur Google Drive, trouver tous les fichiers Python dans chaque projet, puis exécuter un script Python de la bib pycg (PyCG/__main__.py) sur ces fichiers pour générer des fichiers JSON en sortie (call-graph).

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import subprocess

# Chemin vers le dossier contenant tous les projets, situé dans Google Drive
dossier_global = '/content/drive/MyDrive/Dataset/Test/keylogger/benin'

# Chemin vers le dossier de sortie pour les fichiers JSON, également sur Google Drive
dossier_json ="/content/drive/My Drive/Dataset/Test/keylogger/json_benin"
os.makedirs(dossier_json, exist_ok=True)  # Assurez-vous que le dossier de sortie existe
i = 1

# Parcourir chaque sous-dossier dans le dossier global
for projet in os.listdir(dossier_global):
    chemin_projet = os.path.join(dossier_global, projet)

    if os.path.isdir(chemin_projet):
        # Vérifier si le fichier JSON existe déjà
        fichier_json = os.path.join(dossier_json, f'{projet}.json')
        if os.path.exists(fichier_json):
            print(f"Le fichier JSON pour {projet} existe déjà, passage au projet suivant.")
            continue

        # Construire la commande pour chaque projet
        commande = [
            'python',
            'PyCG/__main__.py',
            '--package',
            chemin_projet,
        ]
        # Trouver tous les fichiers Python dans le projet
        fichiers_python = subprocess.check_output(['find', chemin_projet, '-type', 'f', '-name', '*.py']).decode().splitlines()
        commande.extend(fichiers_python)
        commande.extend(['-o', fichier_json])

        # Exécuter la commande
        print(f'{i} début nom = {projet}')
        i += 1
        subprocess.run(commande)

print("Terminé !")


# Visualisation




In [None]:

# visualiser le graphe
import networkx as nx
import matplotlib.pyplot as plt
import json

def visualize_call_graph(data: dict):
    G = nx.DiGraph()

    # Add nodes and edges to the graph
    for node, children in data.items():
        G.add_node(node, size=40)
        for child in children:
            G.add_edge(node, child)

    # Draw the graph
    print(G)
    pos = nx.spring_layout(G)  # Layout the nodes using spring layout
    plt.figure(figsize=(10, 10))  # Set the figure size
    nx.draw(G, pos, with_labels=True, node_size=800, node_color="skyblue", font_size=10, font_weight="bold")  # Draw the graph
    plt.title("Call Graph")  # Set the title
    plt.show()  # Show the plot

In [None]:

# script de netoiyage de graphe
import json

# Fonction pour supprimer les builtin
def supprimer_builtin_nodes(data):
    if isinstance(data, dict):
        keys_to_delete = [key for key in data.keys() if key.startswith("<builtin>.")]
        for key in keys_to_delete:
            del data[key]
        for key, value in data.items():
            if isinstance(value, list):
                data[key] = [item for item in value if not item.startswith("<builtin>.")]
            elif isinstance(value, dict):
                data[key] = supprimer_builtin_nodes(value)
    elif isinstance(data, list):
        data = [item for item in data if not item.startswith("<builtin>.")]
    return data


# Fonction pour supprimer les nœuds sans référence entrante et sans nœuds voisins
def supprimer_noeuds_sans_reference(data, references):
    for key, value in list(data.items()):
        if key not in references and len(value) == 0:
            del data[key]
        elif isinstance(value, dict):
            supprimer_noeuds_sans_reference(value, references)

# Fonction pour obtenir toutes les clés référencées
def obtenir_references(data):
    references = set()
    if isinstance(data, dict):
        for value in data.values():
            references |= obtenir_references(value)
    elif isinstance(data, list):
        for item in data:
            if isinstance(item, str):
                references.add(item)
            elif isinstance(item, dict):
                references |= obtenir_references(item)
    return references


In [None]:
import os
dossier_json = '/content/drive/MyDrive/json'

for fichier_json in os.listdir(dossier_json):
    chemin_json = os.path.join(dossier_json, fichier_json)
    if os.path.isfile(chemin_json) and fichier_json.endswith('.json'):
        # Charger le fichier JSON
        with open(chemin_json, 'r') as f:
            data = json.load(f)
            visualize_call_graph(data)


#  GRAPH (TORCH)

In [None]:
# installer des packages torch
!pip install torch torchvision
!pip install torch-geometric


Ce bloc de code est très utile pour charger des données de graphe à partir de fichiers JSON, les convertir en format compatible avec PyTorch, et enfin, visualiser le graphe avec des étiquettes informatives

In [None]:
#Ce bloc de code est très utile pour charger des données de graphe à partir de fichiers JSON, les convertir en format compatible avec PyTorch, et enfin, visualiser le graphe avec des étiquettes informatives
import json
import torch
import networkx as nx
import matplotlib.pyplot as plt
from torch_geometric.data import Data

# Charger le fichier JSON
def load_json(filepath):
    with open(filepath, 'r') as file:
        data = json.load(file)
    return data

# Construire un graphique PyTorch à partir du fichier JSON
def json_to_graph(json_data):
    # Créer un mapping de noms de nœuds à indices
    node_mapping = {name: i for i, name in enumerate(json_data.keys())}

    # Créer les listes d'arêtes source et destination
    edge_index = [[], []]
    for source, targets in json_data.items():
        source_idx = node_mapping[source]
        for target in targets:
            target_idx = node_mapping[target]
            edge_index[0].append(source_idx)  # Source to target
            edge_index[1].append(target_idx)  # Target to source (si non dirigé)

    # Convertir les listes en tenseurs Torch
    edge_index_tensor = torch.tensor(edge_index, dtype=torch.long)

    # Créer un objet Data
    data = Data(edge_index=edge_index_tensor)
    return data, node_mapping

# Fonction pour visualiser le graphe avec les noms des fonctions
def visualize_graph(data, node_mapping):
    G = nx.DiGraph()  # Utiliser DiGraph pour un graphe dirigé
    # Ajouter les arêtes au graphique
    edge_index = data.edge_index.numpy()
    for i in range(edge_index.shape[1]):
        src, dest = edge_index[:, i]
        G.add_edge(src, dest)

    # Dessiner le graphe avec les étiquettes de nœuds
    plt.figure(figsize=(12, 12))
    pos = nx.spring_layout(G)  # positions for all nodes
    nx.draw(G, pos, with_labels=False, node_color='skyblue', node_size=500, edge_color='k', linewidths=1, font_size=15, arrows=True, arrowstyle='-|>', arrowsize=10)

    # Dessiner les étiquettes des nœuds avec les noms de fonctions
    labels = {i: name for name, i in node_mapping.items()}
    nx.draw_networkx_labels(G, pos, labels, font_size=8)

    plt.title('Visualisation du graphe avec noms de fonctions')
    plt.show()


In [None]:

# Chemin vers le fichier JSON
json_path = '/content/fireELF.json'  # Modifiez le chemin selon votre environnement

# Charger et convertir le JSON en graphique
json_data = load_json(json_path)
graph_data, node_mapping = json_to_graph(json_data)

# Visualiser le graphe
visualize_graph(graph_data, node_mapping)

print(graph_data, node_mapping)

# EMBEDDING

In [None]:
!pip install code-bert-score
from transformers import AutoTokenizer, AutoModel
tokenizer = AutoTokenizer.from_pretrained("neulab/codebert-python")
model = AutoModel.from_pretrained("neulab/codebert-python")

In [None]:
import json
import os
import shutil
import torch



#Cette fonction prend en entrée du code source, un tokenizer et un modèle de transformer, et retourne un embedding représentant le code. Elle divise le code en segments, les encode avec le modèle et calcule la moyenne des embeddings des segments.
def get_embedding(code, tokenizer, model):
    segment_size = 512
    segments = [code[i:i+segment_size] for i in range(0, len(code), segment_size)]
    segment_embeddings = []

    for segment in segments:
        inputs = tokenizer(segment, return_tensors="pt")
        outputs = model(**inputs)
        segment_embedding = outputs.last_hidden_state.mean(dim=1).squeeze()  # Utilisez squeeze() pour éliminer les dimensions de taille 1
        segment_embeddings.append(segment_embedding)

    if segment_embeddings:
        embedding = torch.stack(segment_embeddings).mean(dim=0)  # Moyenne des embeddings
    else:
        embedding = torch.zeros(model.config.hidden_size)  # Assurez-vous que la dimension est correcte

    return embedding


#################################################
##Cette fonction prend une chaîne de code et transforme  en une importation de module. pour les appel system.
def transform_code_chain(code_chain):
    # Diviser la chaîne de code en mots
    words = code_chain.split('.')

    # Si la chaîne ne contient qu'un seul mot, pas de transformation nécessaire
    if len(words) == 1:
        return code_chain

    # Extraire le premier mot comme nom du module à importer
    module_to_import = words[0]

    # Reconstituer la chaîne de code avec importation du premier mot
    transformed_code = f"import {module_to_import}\n{code_chain}"

    return transformed_code


#



#########################################################################3

# Cette fonction extrait le code source d'une fonction à partir d'un fichier Python donné et du nom de la fonction.
import ast

def get_function_code(filename, function_name):
    try:
        with open(filename, 'r') as file:
            file_content = file.read()
            tree = ast.parse(file_content)
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef) and node.name == function_name:
                    return ast.unparse(node)  # Fonctionne avec Python 3.9+
    except FileNotFoundError:
        print(f"Le fichier {filename} n'a pas été trouvé.")
    except Exception as e:
        print(f"Erreur lors de l'analyse du fichier : {e}")



################################################################################
#Cette fonction prend un chemin de répertoire en entrée et retourne une liste de chemins de fichiers présents dans ce répertoire, y compris les sous-répertoires.
def lister_chemins_repertoire(OneRepoGitHub):
    chemins_fichiers = []
    for dossier_parent, dossiers, fichiers in os.walk(OneRepoGitHub):
        for fichier in fichiers:

            chemin_complet = os.path.join(dossier_parent, fichier)
            chemins_fichiers.append(chemin_complet)
          #  print("lister chemin", chemins_fichiers)
    return chemins_fichiers
################################################################################
#Cette fonction génère les chemins de fichiers pour les fonctions extraites à partir d'un graphe.
def generate_file_paths(RepoGitHub,graph, json_file, path_fichiers):
    file_paths = {}
    json_file_name = os.path.basename(json_file).split('.')[0]  # Obtenir le nom du fichier JSON sans extension
    print("json_file_name", json_file_name)

    # Normalisation des chemins dans path_fichiers
    for i, path in enumerate(path_fichiers):
        temp = '/'.join(path.split('/')[1:])
        temp = f'/{temp}'
        path_fichiers[i] = temp

    # Génération des chemins de fichiers avec informations supplémentaires
    for function, _ in graph.items():
        parts = function.split('.')
        file_path = '/'.join(parts[:-1]) + '.py'
        path = os.path.join(RepoGitHub, json_file_name, file_path)

        if path in path_fichiers:

            file_paths[parts[-1]] = {"full_path": f"{json_file_name}/{file_path}", "full_function_name": function}
            #print(f"cas1 path :{json_file_name}/{file_path}")
        elif os.path.join(RepoGitHub, json_file_name, function.replace('.', '/') + ".py") in path_fichiers:
            temp = function.replace('.', '/') + ".py"
            file_paths[parts[-1]] = {"full_path": f'{RepoGitHub}/{json_file_name}/{temp}', "full_function_name": function}
            #print(f"cas2 path :{RepoGitHub}/{json_file_name}/{temp}")

        else:
            file_paths[parts[-1]] = {"full_path": '.'.join(parts), "full_function_name": function}
            #print("cas3 path : ",json_file_name,'.'.join(parts))

    return file_paths


In [None]:
import json
import os
import shutil
import torch



#Cette fonction prend en entrée du code source, un tokenizer et un modèle de transformer, et retourne un embedding représentant le code. Elle divise le code en segments, les encode avec le modèle et calcule la moyenne des embeddings des segments.
def get_embedding(code, tokenizer, model):
    segment_size = 512
    segments = [code[i:i+segment_size] for i in range(0, len(code), segment_size)]
    segment_embeddings = []

    for segment in segments:
        inputs = tokenizer(segment, return_tensors="pt")
        outputs = model(**inputs)
        segment_embedding = outputs.last_hidden_state.mean(dim=1).squeeze()  # Utilisez squeeze() pour éliminer les dimensions de taille 1
        segment_embeddings.append(segment_embedding)

    if segment_embeddings:
        embedding = torch.stack(segment_embeddings).mean(dim=0)  # Moyenne des embeddings
    else:
        embedding = torch.zeros(model.config.hidden_size)  # Assurez-vous que la dimension est correcte

    return embedding


#################################################
##Cette fonction prend une chaîne de code et transforme  en une importation de module. pour les appel system.
def transform_code_chain(code_chain):
    # Diviser la chaîne de code en mots
    words = code_chain.split('.')

    # Si la chaîne ne contient qu'un seul mot, pas de transformation nécessaire
    if len(words) == 1:
        return code_chain

    # Extraire le premier mot comme nom du module à importer
    module_to_import = words[0]

    # Reconstituer la chaîne de code avec importation du premier mot
    transformed_code = f"import {module_to_import}\n{code_chain}"

    return transformed_code


#



#########################################################################3

# Cette fonction extrait le code source d'une fonction à partir d'un fichier Python donné et du nom de la fonction.
import ast

def get_function_code(filename, function_name):
    try:
        with open(filename, 'r') as file:
            file_content = file.read()
            tree = ast.parse(file_content)
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef) and node.name == function_name:
                    return ast.unparse(node)  # Fonctionne avec Python 3.9+
    except FileNotFoundError:
        print(f"Le fichier {filename} n'a pas été trouvé.")
    except Exception as e:
        print(f"Erreur lors de l'analyse du fichier : {e}")



################################################################################
#Cette fonction prend un chemin de répertoire en entrée et retourne une liste de chemins de fichiers présents dans ce répertoire, y compris les sous-répertoires.
def lister_chemins_repertoire(OneRepoGitHub):
    chemins_fichiers = []
    for dossier_parent, dossiers, fichiers in os.walk(OneRepoGitHub):
        for fichier in fichiers:

            chemin_complet = os.path.join(dossier_parent, fichier)
            chemins_fichiers.append(chemin_complet)
          #  print("lister chemin", chemins_fichiers)
    return chemins_fichiers
################################################################################
#Cette fonction génère les chemins de fichiers pour les fonctions extraites à partir d'un graphe.
def generate_file_paths(RepoGitHub,graph, json_file, path_fichiers):
    file_paths = {}
    json_file_name = os.path.basename(json_file).split('.')[0]  # Obtenir le nom du fichier JSON sans extension
    print("json_file_name", json_file_name)

    # Normalisation des chemins dans path_fichiers
    for i, path in enumerate(path_fichiers):
        temp = '/'.join(path.split('/')[1:])
        temp = f'/{temp}'
        path_fichiers[i] = temp

    # Génération des chemins de fichiers avec informations supplémentaires
    for function, _ in graph.items():
        parts = function.split('.')
        file_path = '/'.join(parts[:-1]) + '.py'
        path = os.path.join(RepoGitHub, json_file_name, file_path)

        if path in path_fichiers:

            file_paths[parts[-1]] = {"full_path": f"{json_file_name}/{file_path}", "full_function_name": function}
            #print(f"cas1 path :{json_file_name}/{file_path}")
        elif os.path.join(RepoGitHub, json_file_name, function.replace('.', '/') + ".py") in path_fichiers:
            temp = function.replace('.', '/') + ".py"
            file_paths[parts[-1]] = {"full_path": f'{RepoGitHub}/{json_file_name}/{temp}', "full_function_name": function}
            #print(f"cas2 path :{RepoGitHub}/{json_file_name}/{temp}")

        else:
            file_paths[parts[-1]] = {"full_path": '.'.join(parts), "full_function_name": function}
            #print("cas3 path : ",json_file_name,'.'.join(parts))

    return file_paths


In [None]:
#ce bloc de code est conçu pour traiter et d'extraire des informations utiles à partir du code source Python, notamment la génération de embeddings pour les fonctions

from transformers import AutoTokenizer, AutoModel
tokenizer = AutoTokenizer.from_pretrained("neulab/codebert-python")
model = AutoModel.from_pretrained("neulab/codebert-python")

def generate_code_from_json(json_file, RepoGitHub):
    repo_name, extension = os.path.splitext(os.path.basename(json_file))
    dossier_code = os.path.join('code', repo_name)
    os.makedirs(dossier_code, exist_ok=True)
    function_embeddings = {}


    with open(json_file, "r") as file:
        graph = json.load(file)

    chemins = lister_chemins_repertoire(os.path.join(RepoGitHub, repo_name))
    print("chemins", chemins)

    # Générer les chemins de fichiers pour chaque fonction
    file_paths = generate_file_paths(RepoGitHub,graph, json_file, chemins)

    # Traiter chaque fonction
    for func_name, details in file_paths.items():
        full_path = os.path.join(RepoGitHub, details['full_path'])
        path = os.path.join(dossier_code, f"{func_name}.py")  # Chemin pour écrire le code

        if full_path in chemins:

            function_code = get_function_code(full_path, func_name)
        else:
            transformed_code = transform_code_chain(details['full_path'])
            function_code = transformed_code

        if function_code:
            embedding = get_embedding(function_code,tokenizer, model)
            if embedding.requires_grad:
                embedding = embedding.detach()  # Détacher le tenseur avant de le convertir en numpy
            function_embeddings[details['full_function_name']] = embedding.numpy().tolist()

            with open(path, 'w') as file:
                file.write(function_code)

        else:
            if full_path in chemins:
                shutil.copyfile(full_path, path)  # Copie du fichier original
                # Lecture du fichier pour générer l'embedding
                with open(path, 'r') as file:
                    entire_code = file.read()
                embedding = get_embedding(entire_code,tokenizer, model)
                if embedding.requires_grad:
                    embedding = embedding.detach()
                function_embeddings[details['full_function_name']] = embedding.numpy().tolist()

    return function_embeddings


In [None]:

import os
import json
import shutil
from transformers import AutoTokenizer, AutoModel

# Charger le tokenizer et le modèle pré-entraîné CodeBERT
tokenizer = AutoTokenizer.from_pretrained("neulab/codebert-python")
model = AutoModel.from_pretrained("neulab/codebert-python")

def generate_code_from_json(json_file, RepoGitHub):
    repo_name, extension = os.path.splitext(os.path.basename(json_file))
    dossier_code = os.path.join('code', repo_name)
    os.makedirs(dossier_code, exist_ok=True)
    function_embeddings = {}

    with open(json_file, "r") as file:
        graph = json.load(file)

    chemins = lister_chemins_repertoire(os.path.join(RepoGitHub, repo_name))
    print("chemins", chemins)

    # Générer les chemins de fichiers pour chaque fonction
    file_paths = generate_file_paths(RepoGitHub, graph, json_file, chemins)

    # Traiter chaque fonction
    for func_name, details in file_paths.items():
        full_path = os.path.join(RepoGitHub, details['full_path'])

        if full_path in chemins:
            function_code = get_function_code(full_path, func_name)
        else:
            transformed_code = transform_code_chain(details['full_path'])
            function_code = transformed_code

        if function_code:
            embedding = get_embedding(function_code, tokenizer, model)
            if embedding.requires_grad:
                embedding = embedding.detach()  # Détacher le tenseur avant de le convertir en numpy
            function_embeddings[details['full_function_name']] = embedding.numpy().tolist()

        else:
            if full_path in chemins:
                # Lecture du fichier pour générer l'embedding
                with open(full_path, 'r') as file:
                    entire_code = file.read()
                embedding = get_embedding(entire_code, tokenizer, model)
                if embedding.requires_grad:
                    embedding = embedding.detach()
                function_embeddings[details['full_function_name']] = embedding.numpy().tolist()

    return function_embeddings

In [None]:
import json
import torch
import networkx as nx
import matplotlib.pyplot as plt
from torch_geometric.data import Data

# Charger le fichier JSON
def load_json(filepath):
    with open(filepath, 'r') as file:
        data = json.load(file)
    return data

def json_to_graph(json_data, embeddings):
    # Créer un mapping de noms de nœuds à indices
    node_mapping = {name: i for i, name in enumerate(json_data.keys())}

    # Créer les listes d'arêtes source et destination
    edge_index = [[], []]
    for source, targets in json_data.items():
        source_idx = node_mapping[source]
        for target in targets:
            target_idx = node_mapping[target]
            edge_index[0].append(source_idx)
            edge_index[1].append(target_idx)

    # Convertir les listes en tenseurs Torch
    edge_index_tensor = torch.tensor(edge_index, dtype=torch.long)

    # Associer les embeddings aux nœuds
    num_nodes = len(node_mapping)
    # Assumer que tous les vecteurs ont la même taille et que tous les noeuds ont un embedding
    node_features = torch.zeros((num_nodes, len(next(iter(embeddings.values())))))
    for node, idx in node_mapping.items():
        if node in embeddings:
            # Assurer que l'embedding est un tensor et a la bonne forme
            embedding_tensor = torch.tensor(embeddings[node])
            if embedding_tensor.ndim == 1:
                node_features[idx] = embedding_tensor
            else:
                raise ValueError("Each embedding should be a single-dimensional tensor")

    # Créer un objet Data avec les embeddings
    data = Data(edge_index=edge_index_tensor, x=node_features)
    return data, node_mapping

# Fonction pour visualiser le graphe avec les noms des fonctions
def visualize_graph(data, node_mapping):
    G = nx.DiGraph()  # Utiliser DiGraph pour un graphe dirigé
    # Ajouter les arêtes au graphique
    edge_index = data.edge_index.numpy()
    for i in range(edge_index.shape[1]):
        src, dest = edge_index[:, i]
        G.add_edge(src, dest)

    # Dessiner le graphe avec les étiquettes de nœuds
    plt.figure(figsize=(12, 12))
    pos = nx.spring_layout(G)  # positions for all nodes
    nx.draw(G, pos, with_labels=False, node_color='skyblue', node_size=500, edge_color='k', linewidths=1, font_size=15, arrows=True, arrowstyle='-|>', arrowsize=10)

    # Dessiner les étiquettes des nœuds avec les noms de fonctions
    labels = {i: name for name, i in node_mapping.items()}
    nx.draw_networkx_labels(G, pos, labels, font_size=8)

    plt.title('Visualisation du graphe avec noms de fonctions')
    plt.show()





# DATASET

In [None]:
def get_embedding(code, tokenizer, model):
    segment_size = 512
    segments = [code[i:i + segment_size] for i in range(0, len(code), segment_size)]
    segment_embeddings = []

    for segment in segments:
        inputs = tokenizer(segment, return_tensors="pt")
        outputs = model(**inputs)
        segment_embedding = outputs.last_hidden_state.mean(dim=1).squeeze()
        segment_embeddings.append(segment_embedding)

    if segment_embeddings:
        embedding = torch.stack(segment_embeddings).mean(dim=0)
    else:
        embedding = torch.zeros(model.config.hidden_size)  # Taille fixe

    return embedding


In [None]:
import networkx as nx
import matplotlib.pyplot as plt

def visualize_graph(data, node_mapping):
    G = nx.DiGraph()  # Directed graph
    edge_index = data.edge_index.numpy()

    # Adding edges to the graph
    for i in range(edge_index.shape[1]):
        src, dest = edge_index[:, i]
        G.add_edge(src, dest)

    # Check which nodes are in the graph and node_mapping
    graph_nodes = list(G.nodes)
    mapped_nodes = list(node_mapping.values())
    missing_nodes = set(graph_nodes) - set(mapped_nodes)

    # Filter node_mapping to only include nodes that are present
    filtered_node_mapping = {i: name for name, i in node_mapping.items() if i in graph_nodes}

    # Create a layout for the graph visualization
    pos = nx.spring_layout(G)

    # Draw graph
    plt.figure(figsize=(12, 12))
    nx.draw(G, pos, with_labels=False, node_color='skyblue', node_size=500, edge_color='k', linewidths=1, font_size=15, arrows=True, arrowstyle='-|>', arrowsize=10)

    # Add labels to the nodes based on the function names
    nx.draw_networkx_labels(G, pos, filtered_node_mapping, font_size=8)

    plt.title('Visualisation du graphe avec noms de fonctions')
    plt.show()

    if missing_nodes:
        print(f"Warning: Nodes not found in mapping: {missing_nodes}")


In [None]:
class GraphDataset:
    def __init__(self):
        self.graphs = []

    def add_graph(self, graph_data, node_mapping, label, name ):
        self.graphs.append({'data': graph_data, 'mapping': node_mapping, 'label': label,'name':name})

    def get_graphs(self):
        return self.graphs
    def get_filenames(self):
        return [graph['name'] for graph in self.graphs]


In [None]:
 def get_filenames(self):
        return [graph['name'] for graph in self.graphs]

In [None]:
# Créer une instance de GraphDataset
dataset = GraphDataset()

In [None]:
# Créer une instance de GraphDataset
dataset = GraphDataset()
json_directory='/content/drive/MyDrive/Benin_total_json'
myRepoGitHub ='/content/drive/My Drive/Benin_total'


# Parcourir tous les fichiers JSON dans le répertoire
for json_filename in os.listdir(json_directory):
    if json_filename.endswith('.json'):
        json_path = os.path.join(json_directory, json_filename)

        # Générer le code à partir du fichier JSON
        vecteur = generate_code_from_json(json_path, myRepoGitHub)

        # Charger les données JSON
        json_data = load_json(json_path)

        # Convertir les données JSON en graphique
        graph_data, node_mapping = json_to_graph(json_data, vecteur)


        # Classify the graph and assign a label
        label =1

        # Ajouter le graphique à l'ensemble de données
        dataset.add_graph(graph_data, node_mapping,label,json_filename)



In [None]:
# Créer une instance de GraphDataset

json_directory='/content/drive/MyDrive/Benin_total_json'
xRepoGitHub ='/content/drive/MyDrive/Benin_total'
import os
i = 1
# Parcourir tous les fichiers JSON dans le répertoire
for json_filename in os.listdir(json_directory):

    if json_filename.endswith('.json'):
        print(f'fichier nume : {i} : nom : {json_filename}')
        i = i+1
        json_path = os.path.join(json_directory, json_filename)

        # Générer le code à partir du fichier JSON
        ft = generate_code_from_json(json_path, xRepoGitHub)

        # Charger les données JSON
        json_data = load_json(json_path)

        # Convertir les données JSON en graphique
        graph_data, node_mapping = json_to_graph(json_data, ft)


        # Classify the graph and assign a label
        label =0

        # Ajouter le graphique à l'ensemble de données
        dataset.add_graph(graph_data, node_mapping,label,json_filename)



In [None]:
# Créer un ensemble pour stocker les noms de fichiers déjà ajoutés
existing_files = set(dataset.get_filenames())

json_directory='/content/drive/MyDrive/Benin_total_json'
xRepoGitHub ='/content/drive/MyDrive/Benin_total'
import os
i = 1
# Parcourir tous les fichiers JSON dans le répertoire
for json_filename in os.listdir(json_directory):

    if json_filename.endswith('.json'):
        if json_filename in existing_files:
            print(f'Le fichier {json_filename} existe déjà dans le dataset. Ne pas ajouter une deuxième fois.')
            continue

        print(f'fichier nume : {i} : nom : {json_filename}')
        i = i+1
        json_path = os.path.join(json_directory, json_filename)

        # Générer le code à partir du fichier JSON
        ft = generate_code_from_json(json_path, xRepoGitHub)

        # Charger les données JSON
        json_data = load_json(json_path)

        # Convertir les données JSON en graphique
        graph_data, node_mapping = json_to_graph(json_data, ft)

        # Classify the graph and assign a label
        label = 0

        # Ajouter le graphique à l'ensemble de données
        dataset.add_graph(graph_data, node_mapping, label, json_filename)

        # Mettre à jour l'ensemble des fichiers existants
        existing_files.add(json_filename)


In [None]:
labeled_graphs = dataset.get_graphs()

In [None]:
import pickle

# Suppose que labeled_graphs contient vos graphes
#labeled_graphs = dataset.get_graphs()

# Enregistrement des graphes dans un fichier
with open('/content/drive/MyDrive/Dataset/labeled_graphs.pkl', 'wb') as f:
    pickle.dump(labeled_graphs, f)


In [None]:
import pickle
with open('/content/drive/MyDrive/labeled_graphs.pkl', 'rb') as f:
    mon_dataset= pickle.load(f)
print(len(mon_dataset))

In [None]:
# Visualiser tous les graphes dans l'ensemble de données
for i, graph_entry in enumerate(mon_dataset, start=1):
    graph_data = graph_entry['data']
    node_mapping = graph_entry['mapping']
    name =  graph_entry['name']
    label = graph_entry['label']
    print(f"Graphe {i}:")
    print("Données du graphe:")
    print(graph_data)
    print("Mapping des nœuds:")
    print(node_mapping)
    print(f"  Label: {label}")
    print(f"  Name:   {name}")
    visualize_graph(graph_data, node_mapping)



In [None]:

# Visualiser tous les graphes dans l'ensemble de données
for i, graph_entry in enumerate(x, start=1):
    graph_data = graph_entry['data']
    node_mapping = graph_entry['mapping']
    label = graph_entry['label']
    name = graph_entry['name']

    print(f"\nGraphe {i}:")
    print("Données du graphe:")
    print(graph_data)

    # Récupérer les features des nœuds
    node_features = graph_data.x
    num_nodes = node_features.size(0)  # Taille: nombre de nœuds
    feature_dim = node_features.size(1)  # Dimension des features

    print(f"  Nombre de nœuds: {num_nodes}")
    print(f"  Dimension des features des nœuds: {feature_dim}")

    # Afficher les features pour chaque nœud
    for node_index, features in enumerate(node_features):
        print(f"    Nœud {node_index}: Features = {features.tolist()}")

    print("Mapping des nœuds:")
    print(node_mapping)
    print(f"  Label: {label}")
    print(f"  Name : {name}")

    # Visualiser le graphe
    visualize_graph(graph_data, node_mapping)


#  GNN

In [None]:
import pickle
with open('/content/drive/MyDrive/labeled_graphs.pkl', 'rb') as f:
    mon_dataset= pickle.load(f)

In [None]:
from torch_geometric.data import Data
import torch
pytorch_geometric_graphs = [
    Data(x=entry['data'].x, edge_index=entry['data'].edge_index, y=torch.tensor([entry['label']]) , name=entry['name'])
    for entry in mon_dataset
]

In [None]:

from sklearn.model_selection import train_test_split
from torch_geometric.loader import DataLoader

In [None]:

# Divisez le dataset en ensembles d'entraînement et de test

from sklearn.model_selection import train_test_split
from torch_geometric.loader import DataLoader
train_graphs, test_graphs = train_test_split(pytorch_geometric_graphs, test_size=0.2, random_state=42)

# Créez les loaders d'entraînement et de test
train_loader = DataLoader(train_graphs, batch_size=32, shuffle=True)
test_loader = DataLoader(test_graphs, batch_size=32, shuffle=False)

In [None]:
from torch_geometric.nn import GCNConv, global_mean_pool, BatchNorm
from torch_geometric.nn import GCNConv, global_mean_pool
import torch.nn.functional as F
import torch
class GCN(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.bn1 = BatchNorm(hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.bn2 = BatchNorm(hidden_dim)
        self.conv3 = GCNConv(hidden_dim, output_dim)
        self.bn3 = BatchNorm(output_dim)
        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv3(x, edge_index)
        x = self.bn3(x)
        x = global_mean_pool(x, batch)
        return x


In [None]:
# Initialisation du Modèle, Optimiseur, et Critère de Perte
input_dim = 768  # Ajustez selon vos données
hidden_dim = 384
output_dim = 64 # Ajustez pour correspondre aux classes

modelGNN = GCNWithNorm(input_dim, hidden_dim, output_dim)
optimizer = torch.optim.Adam(modelGNN.parameters(), lr=0.0001, weight_decay=5e-4)

criterion = torch.nn.CrossEntropyLoss()

In [None]:

# Fonction d'entraînement
def train(modeltrain):
    modeltrain.train()
    total_loss = 0
    for data in train_loader:
        optimizer.zero_grad()
        out = modeltrain(data)
        loss = criterion(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)
# Fonction de Test
# Fonction de Test
def test(loader, modeltest):
    modeltest.eval()
    correct = 0
    total_loss = 0
    total_samples = 0
    for data in loader:
        with torch.no_grad():
            out = modeltest(data)
            loss = criterion(out, data.y)
            total_loss += loss.item() * data.num_graphs  # Multiplication par le nombre de graphes dans le batch
            pred = out.argmax(dim=1)
            correct += (pred == data.y).sum().item()
            total_samples += data.num_graphs
    accuracy = correct / total_samples
    average_loss = total_loss / total_samples
    return accuracy, average_loss  # Retourner à la fois l'exactitude et la perte moyenne




In [None]:
import matplotlib.pyplot as plt

# Boucle d'entraînement avec visualisation
epochs = 500
train_losses = []
test_losses = []  # Ajout d'une liste pour stocker les pertes de test
test_accuracies = []
train_accuracies = []

for epoch in range(1, epochs + 1):
    train_loss = train(modelGNN)
    train_acc, _ = test(train_loader, modelGNN)  # Utilisation de la fonction de test pour obtenir l'exactitude
    test_acc, test_loss = test(test_loader, modelGNN)  # Utilisation de la fonction de test pour obtenir l'exactitude et la perte
    train_losses.append(train_loss)
    test_losses.append(test_loss)  # Stockage de la perte de test
    train_accuracies.append(train_acc)
    test_accuracies.append(test_acc)

    print(f'Epoch {epoch:02d}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

# Plotting the results
plt.figure(figsize=(12, 5))



# Plot training loss
plt.subplot(1, 2, 1)
plt.plot(range(1, epochs + 1), train_losses, label='Training Loss')
plt.plot(range(1, epochs + 1), test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss over Epochs')
plt.legend()

# Plot accuracy
plt.subplot(1, 2, 2)
plt.plot(range(1, epochs + 1), train_accuracies, label='Training Accuracy')
plt.plot(range(1, epochs + 1), test_accuracies, label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Accuracy over Epochs')
plt.legend()



plt.show()


In [None]:
print(modelGNN)

In [None]:
# Le modèle GNN Simplifié
from torch_geometric.nn import GCNConv, global_mean_pool
import torch.nn.functional as F
import torch



# Fonction pour afficher les embeddings après chaque époque
def display_graph_embeddings(loader, epoch , modeldisplay):
    modeldisplay.eval()
    graph_embeddings = []
    for data in loader:
        with torch.no_grad():
            embeddings = modeldisplay.forward(data)
            graph_embeddings.append((data.name , embeddings))

    # Afficher les embeddings
    print(f"\nEmbeddings des graphes à l'époque {epoch}:")
    for name, embedding in graph_embeddings:
        print(f"Graphe {name}: Embedding = {embedding.numpy()}")

# Boucle d'entraînement



In [None]:
# Accéder et afficher un graphe par son index
def display_graph_by_index(dataset, index):
    # Vérifiez que l'index est valide
    if index < 0 or index >= len(dataset.get_graphs()):
        print(f"Index {index} hors limites.")
        return

    # Récupérer le graphe en utilisant l'index
    graph_entry = dataset.get_graphs()[index]
    graph_data = graph_entry['data']
    node_mapping = graph_entry['mapping']
    label = graph_entry['label']
    name = graph_entry.get('name', f'Graph {index}')

    # Afficher les informations du graphe
    print(f"\nGraphe {index} ({name}):")
    print("Données du graphe:")
    print(graph_data)

    # Récupérer les features des nœuds
    node_features = graph_data.x
    num_nodes = node_features.size(0)  # Taille: nombre de nœuds
    feature_dim = node_features.size(1)  # Dimension des features

    print(f"  Nombre de nœuds: {num_nodes}")
    print(f"  Dimension des features des nœuds: {feature_dim}")

    # Afficher les features pour chaque nœud
    for node_index, features in enumerate(node_features):
        print(f"    Nœud {node_index}: Features = {features.tolist()}")

    print("Mapping des nœuds:")
    print(node_mapping)
    print(f"  Label: {label}")



In [None]:
display_graph_by_index(dataset , 1)


In [None]:
def get_all_embeddings(model, loader):
    model.eval()
    all_embeddings = []
    all_names = []
    all_labels = []

    # Itérer sur chaque batch dans le DataLoader
    for data in loader:
        with torch.no_grad():
            # Obtenir les embeddings du modèle pour le batch actuel
            embeddings = model.forward(data)

        # Stocker les embeddings
        all_embeddings.extend(embeddings.cpu().numpy())

        # Si les noms sont stockés dans le DataLoader, les récupérer
        if hasattr(data, 'name'):
            all_names.extend(data.name)


        if hasattr(data, 'y'):
            all_labels.extend(data.y)


    return all_embeddings, all_names ,  all_labels

# Utilisation de la fonction
loader_test = DataLoader(pytorch_geometric_graphs, batch_size=2, shuffle=True)
embeddings, names , labels = get_all_embeddings(modelGNN, loader_test)

# Afficher les résultats pour vérifier
for name, embedding ,label   in zip(names, embeddings, labels):
    print(f"Nom du Graphe: {name}, Label du Graphe: {label}, Embedding: {embedding}")




In [None]:
# Chargement du modèle
input_dim = 768  # Ajustez selon vos données
hidden_dim = 384
output_dim = 64
modelGNN = GCN3(input_dim, hidden_dim, output_dim)
modelGNN.load_state_dict(torch.load('/content/drive/MyDrive/gcn_model.pth'))


In [None]:
loader_test = DataLoader(pytorch_geometric_graphs, batch_size=2, shuffle=True)

In [None]:
train_graphs, test_graphs = train_test_split(pytorch_geometric_graphs, test_size=0.2, random_state=42)

# Créez les loaders d'entraînement et de test
train_loader = DataLoader(train_graphs, batch_size=2, shuffle=True)
test_loader = DataLoader(test_graphs, batch_size=2, shuffle=False)

In [None]:
def get_all_embeddings(model, loaders):
    model.eval()
    all_embeddings = []
    all_names = []
    all_labels = []

    # Itérer sur chaque DataLoader fourni
    for loader in loaders:
        for batch in loader:
            with torch.no_grad():
                # Obtenir les embeddings du modèle pour le batch actuel
                embeddings = model.forward(batch)

            # Stocker les embeddings
            all_embeddings.extend(embeddings.cpu().numpy())

            # Si les noms sont stockés dans le DataLoader, les récupérer
           #if hasattr(batch, 'name'):
                #all_names.extend(batch.name)
            if hasattr(batch, 'name'):
               names = [name.split('.')[0] for name in batch.name]  # Split and get first part
               all_names.extend(names)


            if hasattr(batch, 'y'):
              all_labels.extend(batch.y)

    return all_embeddings, all_names, all_labels

# # Utilisation de la fonction avec plusieurs DataLoader
# loaders = [train_loader, test_loader]
# loader_test = DataLoader(pytorch_geometric_graphs, batch_size=2, shuffle=True)
# embeddings, names , labels = get_all_embeddings(modelGNN, loader_test)

# Afficher les résultats pour vérifier
#for name, embedding , label  in zip(names, embeddings,labels):
    #print(f"Nom du Graphe: {name}, Label {label}, Embedding: {embedding}")


In [None]:
def get_all_embeddings(model, batchs):
    model.eval()
    all_embeddings = []
    all_names = []
    all_labels = []

    # Itérer sur chaque DataLoader fourni

    for batch in batchs:
            with torch.no_grad():
                # Obtenir les embeddings du modèle pour le batch actuel
                embeddings = model.forward(batch)

            # Stocker les embeddings
            all_embeddings.extend(embeddings.cpu().numpy())

            # Si les noms sont stockés dans le DataLoader, les récupérer
           #if hasattr(batch, 'name'):
                #all_names.extend(batch.name)
            if hasattr(batch, 'name'):
               names = [name.split('.')[0] for name in batch.name]  # Split and get first part
               all_names.extend(names)


            if hasattr(batch, 'y'):
              all_labels.extend(batch.y)

    return all_embeddings, all_names, all_labels

# # Utilisation de la fonction avec plusieurs DataLoader
# loaders = [train_loader, test_loader]
# loader_test = DataLoader(pytorch_geometric_graphs, batch_size=2, shuffle=True)
# embeddings, names , labels = get_all_embeddings(modelGNN, loader_test)

# Afficher les résultats pour vérifier
#for name, embedding , label  in zip(names, embeddings,labels):
    #print(f"Nom du Graphe: {name}, Label {label}, Embedding: {embedding}")


In [None]:
loaders = [train_loader, test_loader]

In [None]:
def sum_embeddings_with_metadata(embeddings, names, labels, metadata):
    summed_vectors = []
    final_labels = []  # Liste pour stocker les labels correspondants

    for name, embedding, label in zip(names, embeddings, labels):
        embedding_tensor = torch.tensor(embedding)
        ''' if name in metadata:
            metadata_vector = metadata[name]'''
        if name in metadata:
            metadata_vector = torch.tensor(metadata[name]['embedding'])
            if metadata_vector.device != embedding_tensor.device:
                metadata_vector = metadata_vector.to(embedding_tensor.device)

            summed_vector = embedding_tensor + metadata_vector
        else:
            print(f"No metadata available for {name}, using embedding as is.")
            summed_vector = embedding_tensor

        summed_vectors.append(summed_vector.numpy())
        final_labels.append(label)


    return summed_vectors, final_labels


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Convertir les données en format approprié
X = np.array(summed_vectors)  # Les vecteurs sommés
y = np.array(final_labels)    # Les labels correspondants

# Diviser les données en ensembles d'entraînement et de test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Créer le classificateur de forêt aléatoire
clf = RandomForestClassifier(n_estimators=100, random_state=42)

# Entraîner le classificateur
clf.fit(X_train, y_train)

# Prédire sur l'ensemble de test
y_pred = clf.predict(X_test)

# Calculer l'exactitude
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy of the classifier: {accuracy:.2f}")


# Metadata


In [None]:
!pip install gensim


In [None]:
import os
import requests
import base64
import pandas as pd
import re
import nltk

from nltk.corpus import stopwords
nltk.download('stopwords')


# Définition des fonctions de nettoyage et de prétraitement du texte


def preprocess_text(text):
    if not isinstance(text, str):
        text = str(text)
    text = re.sub(r'\W+|\d+', ' ', text)
    text = text.lower()
    text = ' '.join([word for word in text.split() if word not in stop_words])
    return text

# Fonction pour créer et enregistrer les métadonnées des dépôts
def creer_metadata(repo_url, headers):
    documents = []
    api_url = f"https://api.github.com/repos/{repo_url}"
    repo_response = requests.get(api_url, headers=headers)

    if repo_response.status_code == 200:
        repo_data = repo_response.json()
        name = repo_data.get('name', '')
        description = repo_data.get('description', '')
        topics = ', '.join(repo_data.get('topics', []))
        readme_url = f"https://api.github.com/repos/{repo_url}/contents/README.md"
        readme_response = requests.get(readme_url, headers=headers)
        titre=preprocess_text(name)
        topics=preprocess_text(topics)
        description=preprocess_text(description)

        readme_text = ''
        if readme_response.status_code == 200:
            readme_data = readme_response.json()
            content_base64 = readme_data.get('content', '')
            readme_text = base64.b64decode(content_base64).decode('utf-8')
            readme_text= preprocess_text(readme_text)


        # Création d'une entrée pour le dépôt avec son contenu nettoyé
        document = {
            'name': name,
            'text':f"titre : {titre}, description : {description}, topics : {topics}, readme_text : {readme_text}"
        }
        documents.append(document)
    else:
        print(f"Erreur lors de la récupération des données pour le dépôt {repo_url}: {repo_response.status_code}")

    return documents

# Configuration des headers

token = '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'
headers = {
    'Accept': 'application/vnd.github.v3+json',
    'Authorization': f'token {token}'
}



In [None]:
i = 1
metadata_dict_malware = {}  # Dictionnaire pour stocker les métadonnées
df = pd.read_csv('/content/popularMalware_sans_duplication.csv')

token = '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'
headers = {
    'Accept': 'application/vnd.github.v3+json',
    'Authorization': f'token {token}'
}

stop_words = set(stopwords.words('english'))

for index, row in df.iterrows():
    repo_url = f"{row['Author']}/{row['Repo']}"
    print(f"nom repo {i}: {row['Repo']}")
    i += 1
    metadata = creer_metadata(repo_url, headers)
    for data in metadata:
        metadata_dict_malware[data['name']] = {'text': data['text']}  # Stocker les métadonnées

# Vérification des métadonnées stockées
print(metadata_dict_malware)

In [None]:
import json

# Suppose que metadata_dict est déjà défini
with open('metadataMalware.json', 'w') as json_file:
    json.dump(metadata_dict_malware, json_file, indent=4, sort_keys=True)


In [None]:
import json

# Charger le fichier JSON
with open('metadataMalware.json', 'r') as json_file:
    metadata_dict_malware = json.load(json_file)


In [None]:
import json
import gensim
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
import torch

# Charger les métadonnées à partir du fichier JSON
with open('metadataMalware.json', 'r') as json_file:
    metadata_dict = json.load(json_file)

# Préparer les données pour Doc2Vec
tagged_data = []
for name, content in metadata_dict.items():
    text = content['text']
    tagged_data.append(TaggedDocument(words=text.split(), tags=[name]))

# Entraîner le modèle Doc2Vec
model_metadata = Doc2Vec(tagged_data, vector_size=64, window=2, min_count=1, workers=4, dm=0)

# Générer les embeddings et créer une nouvelle structure
embedding_dict = {}
for name, content in metadata_dict.items():
    text = content['text']
    vector = model_metadata.infer_vector(text.split())
    tensor_vector = torch.tensor(vector).tolist()  # Conversion du vecteur en liste pour le stockage JSON
    embedding_dict[name] = {'embedding': tensor_vector}

# Sauvegarder la nouvelle structure dans un fichier JSON lisible
with open('metadata_with_embeddingsMalware.json', 'w') as json_file:
    json.dump(embedding_dict, json_file, indent=4, sort_keys=True)

# Vérification des métadonnées stockées
print(embedding_dict)


In [None]:
import json

# Charger le fichier JSON
with open('metadata.json', 'r') as json_file:
    metadataVectors = json.load(json_file)

# MODELS

In [None]:
from torch_geometric.nn import GCNConv, global_mean_pool, BatchNorm
from torch_geometric.nn import GCNConv, global_mean_pool
import torch.nn.functional as F
import torch
class GCN3(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN3, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.bn1 = BatchNorm(hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.bn2 = BatchNorm(hidden_dim)
        self.conv3 = GCNConv(hidden_dim, output_dim)
        self.bn3 = BatchNorm(output_dim)
        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv3(x, edge_index)
        x = self.bn3(x)
        x = global_mean_pool(x, batch)
        return x


In [None]:
from torch_geometric.nn import GCNConv, global_mean_pool, BatchNorm
from torch_geometric.nn import GCNConv, global_mean_pool
import torch.nn.functional as F
import torch
class GCN1(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN1, self).__init__()
        self.conv1 = GCNConv(input_dim, output_dim)
        self.bn1 = BatchNorm(output_dim)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = self.bn1(x)

        x = global_mean_pool(x, batch)
        return x



from torch_geometric.nn import GCNConv, global_mean_pool, BatchNorm
from torch_geometric.nn import GCNConv, global_mean_pool
import torch.nn.functional as F
import torch
class GCN2(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN2, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.bn1 = BatchNorm(hidden_dim)
        self.conv2 = GCNConv(hidden_dim,output_dim)
        self.bn2 = BatchNorm(output_dim)
        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = global_mean_pool(x, batch)
        return x

class GCN4(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN4, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.bn1 = BatchNorm(hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.bn2 = BatchNorm(hidden_dim)
        self.conv3 = GCNConv(hidden_dim, hidden_dim)
        self.bn3 = BatchNorm(hidden_dim)
        self.conv4 = GCNConv(hidden_dim, output_dim)
        self.bn4 = BatchNorm(output_dim)
        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv3(x, edge_index)
        x = self.bn3(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv4(x, edge_index)
        x = self.bn4(x)
        x = global_mean_pool(x, batch)
        return x


class GCN5(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN5, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.bn1 = BatchNorm(hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.bn2 = BatchNorm(hidden_dim)
        self.conv3 = GCNConv(hidden_dim, hidden_dim)
        self.bn3 = BatchNorm(hidden_dim)
        self.conv4 = GCNConv(hidden_dim, hidden_dim)
        self.bn4 = BatchNorm(hidden_dim)
        self.conv5 = GCNConv(hidden_dim, output_dim)
        self.bn5 = BatchNorm(output_dim)
        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv3(x, edge_index)
        x = self.bn3(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv4(x, edge_index)
        x = self.bn4(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.conv5(x, edge_index)
        x = self.bn5(x)
        x = global_mean_pool(x, batch)
        return x


In [None]:
from torch_geometric.nn import SAGEConv, global_mean_pool, BatchNorm
import torch.nn.functional as F
import torch

class GraphSAGEWithLayers(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=1):
        super(GraphSAGEWithLayers, self).__init__()
        self.num_layers = num_layers

        # Initialize layers
        self.convs = torch.nn.ModuleList()
        self.bns = torch.nn.ModuleList()

        # Input layer
        self.convs.append(SAGEConv(input_dim, hidden_dim if num_layers > 1 else output_dim))
        self.bns.append(BatchNorm(hidden_dim if num_layers > 1 else output_dim))

        # Hidden layers
        for _ in range(num_layers - 2):
            self.convs.append(SAGEConv(hidden_dim, hidden_dim))
            self.bns.append(BatchNorm(hidden_dim))

        # Output layer
        if num_layers > 1:
            self.convs.append(SAGEConv(hidden_dim, output_dim))
            self.bns.append(BatchNorm(output_dim))

        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # Pass through convolutional layers
        for i in range(self.num_layers - 1):
            x = self.convs[i](x, edge_index)
            x = self.bns[i](x)
            x = F.relu(x)
            x = self.dropout(x)

        # Final output layer
        x = self.convs[-1](x, edge_index)
        x = self.bns[-1](x)

        x = global_mean_pool(x, batch)
        return x




In [None]:
from torch_geometric.nn import GATConv

class GAT1(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, heads=1):
        super(GAT1, self).__init__()

        # Initialize layers
        self.conv1 = GATConv(input_dim, output_dim, heads=heads, concat=False)
        self.bn1 = BatchNorm(output_dim)

        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # Pass through the GAT convolutional layer
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Apply global mean pooling
        x = global_mean_pool(x, batch)
        return x


class GAT2(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, heads=1):
        super(GAT2, self).__init__()

        # Initialize layers
        self.conv1 = GATConv(input_dim, hidden_dim, heads=heads, concat=True)
        self.bn1 = BatchNorm(hidden_dim * heads)

        self.conv2 = GATConv(hidden_dim * heads, output_dim, heads=1, concat=False)
        self.bn2 = BatchNorm(output_dim)

        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # Pass through first convolutional layer
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through second convolutional layer
        x = self.conv2(x, edge_index)
        x = self.bn2(x)

        # Apply global mean pooling
        x = global_mean_pool(x, batch)
        return x

class GAT3(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, heads=1):
        super(GAT3, self).__init__()

        # Initialize layers
        self.conv1 = GATConv(input_dim, hidden_dim, heads=heads, concat=True)
        self.bn1 = BatchNorm(hidden_dim * heads)

        self.conv2 = GATConv(hidden_dim * heads, hidden_dim, heads=heads, concat=True)
        self.bn2 = BatchNorm(hidden_dim * heads)

        self.conv3 = GATConv(hidden_dim * heads, output_dim, heads=1, concat=False)
        self.bn3 = BatchNorm(output_dim)

        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # Pass through first convolutional layer
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through second convolutional layer
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through third convolutional layer
        x = self.conv3(x, edge_index)
        x = self.bn3(x)

        # Apply global mean pooling
        x = global_mean_pool(x, batch)
        return x

class GAT4(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, heads=1):
        super(GAT4, self).__init__()

        # Initialize layers
        self.conv1 = GATConv(input_dim, hidden_dim, heads=heads, concat=True)
        self.bn1 = BatchNorm(hidden_dim * heads)

        self.conv2 = GATConv(hidden_dim * heads, hidden_dim, heads=heads, concat=True)
        self.bn2 = BatchNorm(hidden_dim * heads)

        self.conv3 = GATConv(hidden_dim * heads, hidden_dim, heads=heads, concat=True)
        self.bn3 = BatchNorm(hidden_dim * heads)

        self.conv4 = GATConv(hidden_dim * heads, output_dim, heads=1, concat=False)
        self.bn4 = BatchNorm(output_dim)

        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # Pass through first convolutional layer
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through second convolutional layer
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through third convolutional layer
        x = self.conv3(x, edge_index)
        x = self.bn3(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through fourth convolutional layer
        x = self.conv4(x, edge_index)
        x = self.bn4(x)

        # Apply global mean pooling
        x = global_mean_pool(x, batch)
        return x
class GAT5(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, heads=1):
        super(GAT5, self).__init__()

        # Initialize layers
        self.conv1 = GATConv(input_dim, hidden_dim, heads=heads, concat=True)
        self.bn1 = BatchNorm(hidden_dim * heads)

        self.conv2 = GATConv(hidden_dim * heads, hidden_dim, heads=heads, concat=True)
        self.bn2 = BatchNorm(hidden_dim * heads)

        self.conv3 = GATConv(hidden_dim * heads, hidden_dim, heads=heads, concat=True)
        self.bn3 = BatchNorm(hidden_dim * heads)

        self.conv4 = GATConv(hidden_dim * heads, hidden_dim, heads=heads, concat=True)
        self.bn4 = BatchNorm(hidden_dim * heads)

        self.conv5 = GATConv(hidden_dim * heads, output_dim, heads=1, concat=False)
        self.bn5 = BatchNorm(output_dim)

        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # Pass through first convolutional layer
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through second convolutional layer
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through third convolutional layer
        x = self.conv3(x, edge_index)
        x = self.bn3(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through fourth convolutional layer
        x = self.conv4(x, edge_index)
        x = self.bn4(x)
        x = F.relu(x)
        x = self.dropout(x)

        # Pass through fifth convolutional layer
        x = self.conv5(x, edge_index)
        x = self.bn5(x)

        # Apply global mean pooling
        x = global_mean_pool(x, batch)
        return x

In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import SAGEConv, BatchNorm, global_mean_pool

class GraphSAGEWithLayers(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3):
        super(GraphSAGEWithLayers, self).__init__()
        self.num_layers = num_layers

        # Initialize layers with explicit names matching the state dict
        self.conv1 = SAGEConv(input_dim, hidden_dim if num_layers > 1 else output_dim)
        self.bn1 = BatchNorm(hidden_dim if num_layers > 1 else output_dim)

        if num_layers > 1:
            self.conv2 = SAGEConv(hidden_dim, hidden_dim)
            self.bn2 = BatchNorm(hidden_dim)

        if num_layers > 2:
            self.conv3 = SAGEConv(hidden_dim, output_dim)
            self.bn3 = BatchNorm(output_dim)

        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # Pass through convolutional layers with explicit names
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)

        if self.num_layers > 1:
            x = self.conv2(x, edge_index)
            x = self.bn2(x)
            x = F.relu(x)
            x = self.dropout(x)

        if self.num_layers > 2:
            x = self.conv3(x, edge_index)
            x = self.bn3(x)

        x = global_mean_pool(x, batch)
        return x


class GraphSAGEWithLayers(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=4):
        super(GraphSAGEWithLayers, self).__init__()
        self.num_layers = num_layers

        # Initialize layers with explicit names matching the state dict
        self.conv1 = SAGEConv(input_dim, hidden_dim if num_layers > 1 else output_dim)
        self.bn1 = BatchNorm(hidden_dim if num_layers > 1 else output_dim)

        if num_layers > 1:
            self.conv2 = SAGEConv(hidden_dim, hidden_dim)
            self.bn2 = BatchNorm(hidden_dim)

        if num_layers > 2:
            self.conv3 = SAGEConv(hidden_dim, hidden_dim)
            self.bn3 = BatchNorm(hidden_dim)

        if num_layers > 3:
            self.conv4 = SAGEConv(hidden_dim, output_dim)
            self.bn4 = BatchNorm(output_dim)

        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # Pass through convolutional layers with explicit names
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)

        if self.num_layers > 1:
            x = self.conv2(x, edge_index)
            x = self.bn2(x)
            x = F.relu(x)
            x = self.dropout(x)

        if self.num_layers > 2:
            x = self.conv3(x, edge_index)
            x = self.bn3(x)
            x = F.relu(x)
            x = self.dropout(x)

        if self.num_layers > 3:
            x = self.conv4(x, edge_index)
            x = self.bn4(x)

        x = global_mean_pool(x, batch)
        return x

class GraphSAGEWithLayers(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=5):
        super(GraphSAGEWithLayers, self).__init__()
        self.num_layers = num_layers

        # Initialize layers with explicit names matching the state dict
        self.conv1 = SAGEConv(input_dim, hidden_dim if num_layers > 1 else output_dim)
        self.bn1 = BatchNorm(hidden_dim if num_layers > 1 else output_dim)

        if num_layers > 1:
            self.conv2 = SAGEConv(hidden_dim, hidden_dim)
            self.bn2 = BatchNorm(hidden_dim)

        if num_layers > 2:
            self.conv3 = SAGEConv(hidden_dim, hidden_dim)
            self.bn3 = BatchNorm(hidden_dim)

        if num_layers > 3:
            self.conv4 = SAGEConv(hidden_dim, hidden_dim)
            self.bn4 = BatchNorm(hidden_dim)

        if num_layers > 4:
            self.conv5 = SAGEConv(hidden_dim, output_dim)
            self.bn5 = BatchNorm(output_dim)

        self.dropout = torch.nn.Dropout(p=0.2)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # Pass through convolutional layers with explicit names
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)

        if self.num_layers > 1:
            x = self.conv2(x, edge_index)
            x = self.bn2(x)
            x = F.relu(x)
            x = self.dropout(x)

        if self.num_layers > 2:
            x = self.conv3(x, edge_index)
            x = self.bn3(x)
            x = F.relu(x)
            x = self.dropout(x)

        if self.num_layers > 3:
            x = self.conv4(x, edge_index)
            x = self.bn4(x)
            x = F.relu(x)
            x = self.dropout(x)

        if self.num_layers > 4:
            x = self.conv5(x, edge_index)
            x = self.bn5(x)

        x = global_mean_pool(x, batch)
        return x



# train classifier

In [None]:
embeddings, names, labels = get_all_embeddings(modelGNN, loaders)

summed_vectors, final_labels = sum_embeddings_with_metadata(embeddings, names, labels, metadataVectors)

# Afficher les résultats pour vérifier
for name, summed_vector, label in zip(names, summed_vectors, final_labels):
    print(f"Nom du Graphe: {name}, Label: {label}, Vecteur Sommé: {summed_vector}")

In [None]:
embeddings, names, labels = get_all_embeddings(modelGNN, loaders)

In [None]:
import numpy as np
import torch
from torch_geometric.data import Data, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

# Visualisation avec t-SNE
def plot_embeddings(X, y, title):
    tsne = TSNE(n_components=2, random_state=42)
    X_tsne = tsne.fit_transform(X)

    plt.figure(figsize=(10, 7))
    for label in np.unique(y):
        indices = np.where(y == label)
        plt.scatter(X_tsne[indices, 0], X_tsne[indices, 1], label=f"Class {label}")

    plt.title(title)
    plt.xlabel("t-SNE component 1")
    plt.ylabel("t-SNE component 2")
    plt.legend()
    plt.show()

# Plotting the training set
plot_embeddings(X_train, y_train, "Training set")

# Plotting the test set with predicted labels
plot_embeddings(X_test, y_pred, "Test set with predicted labels")

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Assuming get_all_embeddings and sum_embeddings_with_metadata functions are defined elsewhere

# Get embeddings, names, and labels
embeddings, names, labels = get_all_embeddings(modelGNN, loaders)
summed_vectors, final_labels = sum_embeddings_with_metadata(embeddings, names, labels, metadataVectors)

# Convert data to NumPy arrays
X = np.array(summed_vectors)  # The summed vectors
y = np.array(final_labels)    # The corresponding labels

# Split data into training and testing sets (80% training, 20% testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create the Random Forest classifier with 100 trees
rf_clf = RandomForestClassifier(n_estimators=100, random_state=42)

# Train the Random Forest classifier
rf_clf.fit(X_train, y_train)

# Make predictions on the test set
y_pred = rf_clf.predict(X_test)

# Evaluate the accuracy
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy of the Random Forest classifier: {accuracy:.2f}")

# You can further use the trained model (rf_clf) to predict labels for new data points
# by calling rf_clf.predict(new_summed_vector)

with open('graphe_appel_classifier.pkl', 'wb') as model_file:
    pickle.dump(rf_clf, model_file)


In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

# Réduire la dimensionnalité avec t-SNE
tsne = TSNE(n_components=2, random_state=42)
X_embedded = tsne.fit_transform(X)

# Afficher le graphique de classification
plt.figure(figsize=(10, 8))
for label in np.unique(y):
    plt.scatter(X_embedded[y == label, 0], X_embedded[y == label, 1], label=label)
plt.title('t-SNE Projection of Embeddings')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.legend()
plt.show()


# test

In [None]:
!pip install torch_geometric

In [None]:
!pip install scikit-learn==1.3.2
import pickle
with open('graphe_appel_classifier.pkl', 'rb') as f:
  rf_clf_loaded = pickle.load(f)


In [None]:
def get_all_embeddings(model, batchs):
    model.eval()
    all_embeddings = []
    all_names = []
    all_labels = []

    # Itérer sur chaque DataLoader fourni

    for batch in batchs:
            with torch.no_grad():
                # Obtenir les embeddings du modèle pour le batch actuel
                embeddings = model.forward(batch)

            # Stocker les embeddings
            all_embeddings.extend(embeddings.cpu().numpy())

            # Si les noms sont stockés dans le DataLoader, les récupérer
           #if hasattr(batch, 'name'):
                #all_names.extend(batch.name)
            if hasattr(batch, 'name'):
               names = [name.split('.')[0] for name in batch.name]  # Split and get first part
               all_names.extend(names)


            if hasattr(batch, 'y'):
              all_labels.extend(batch.y)

    return all_embeddings, all_names, all_labels

# # Utilisation de la fonction avec plusieurs DataLoader
# loaders = [train_loader, test_loader]
# loader_test = DataLoader(pytorch_geometric_graphs, batch_size=2, shuffle=True)
# embeddings, names , labels = get_all_embeddings(modelGNN, loader_test)

# Afficher les résultats pour vérifier
#for name, embedding , label  in zip(names, embeddings,labels):
    #print(f"Nom du Graphe: {name}, Label {label}, Embedding: {embedding}")


In [None]:
import pickle
import torch
from torch_geometric.data import Data, DataLoader

# Charger les données malveillantes
with open('/content/drive/MyDrive/Dataset/Test/windows/windows_malware.pkl', 'rb') as f:
    loader_malware = pickle.load(f)

# Charger les données bénignes
with open('/content/drive/MyDrive/Dataset/Test/windows/windows_benin.pkl', 'rb') as f:
    loader_benign = pickle.load(f)
# Convertir les données malveillantes en objets PyTorch Geometric
pytorch_geometric_graphs_malware = [
    Data(x=entry['data'].x, edge_index=entry['data'].edge_index, y=torch.tensor([entry['label']]), name=entry['name'])
    for entry in loader_malware
]

# Convertir les données bénignes en objets PyTorch Geometric
pytorch_geometric_graphs_benign = [
    Data(x=entry['data'].x, edge_index=entry['data'].edge_index, y=torch.tensor([entry['label']]), name=entry['name'])
    for entry in loader_benign
]
# Combiner les deux ensembles de données (malware et bénin)
pytorch_geometric_graphs_combined = pytorch_geometric_graphs_malware + pytorch_geometric_graphs_benign
# Créer un DataLoader avec les données combinées
myloader = DataLoader(pytorch_geometric_graphs_combined, batch_size=2, shuffle=True)




In [None]:
input_dim = 768  # Dimension d'entrée
hidden_dim = 384  # Dimension cachée (facultatif, non utilisé ici)
output_dim = 64  # Dimension de sortie
heads = 8# Nombre de têtes d'attention

# Instanciation du modèle
modelGNN = GCN3(input_dim, hidden_dim, output_dim)
modelGNN.load_state_dict(torch.load('/content/drive/MyDrive/gcn_model.pth'))

print(modelGNN)

In [None]:
import pickle

# Charger les données malveillantes
with open('/content/drive/MyDrive/Dataset/Test/windows/windows_malware.pkl', 'rb') as f:
    loader_malware = pickle.load(f)

# Charger les données bénignes
with open('/content/drive/MyDrive/Dataset/Test/windows/windows_benin.pkl', 'rb') as f:
    loader_benign = pickle.load(f)

# Combiner les deux ensembles de données (malware et bénin)
combined_data = loader_malware + loader_benign

# Sauvegarder les données combinées
combined_data_path = '/content/windows.pkl'
with open(combined_data_path, 'wb') as f:
    pickle.dump(combined_data, f)

print(f"Les données combinées ont été sauvegardées dans {combined_data_path}")


In [None]:
import pickle
from torch_geometric.data import DataLoader
from torch_geometric.data import Data
import torch

#with open('/content/drive/MyDrive/Dataset/Test/virus/virus.pkl', 'rb') as f:
with open('/content/windows.pkl', 'rb') as f:
   loader = pickle.load(f)



pytorch_geometric_graphs = [
    Data(x=entry['data'].x, edge_index=entry['data'].edge_index, y=torch.tensor([entry['label']]) , name=entry['name'])
    for entry in loader
]

myloader = DataLoader(pytorch_geometric_graphs, batch_size=2, shuffle=True)
embeddings, names, labels = get_all_embeddings(modelGNN,myloader)

In [None]:
import json

# Charger les fichiers JSON
with open('/content/drive/MyDrive/Dataset/Test/windows/metadata_windows_benin.json', 'r') as f1, open('/content/drive/MyDrive/Dataset/Test/android/metadata_android_malware.json', 'r') as f2:
    data1 = json.load(f1)
    data2 = json.load(f2)

# Vérifier les structures
print(f"Structure de data1: {type(data1)}")
print(f"Structure de data2: {type(data2)}")

# Fusionner en fonction des structures
if isinstance(data1, dict) and isinstance(data2, dict):
    data_merged = {**data1, **data2}  # Fusionner les dictionnaires
elif isinstance(data1, list) and isinstance(data2, list):
    data_merged = data1 + data2  # Fusionner les listes
else:
    raise ValueError("Les structures des fichiers JSON ne sont pas compatibles pour la fusion.")

# Sauvegarder les données fusionnées dans un nouveau fichier JSON
with open('file.json', 'w') as f_out:
    json.dump(data_merged, f_out, indent=4)


In [None]:
def sum_embeddings_with_metadata(embeddings, names, labels, metadata):
    summed_vectors = []
    final_labels = []  # Liste pour stocker les labels correspondants

    for name, embedding, label in zip(names, embeddings, labels):
        embedding_tensor = torch.tensor(embedding)
        ''' if name in metadata:
            metadata_vector = metadata[name]'''
        if name in metadata:
            metadata_vector = torch.tensor(metadata[name]['embedding'])
            if metadata_vector.device != embedding_tensor.device:
                metadata_vector = metadata_vector.to(embedding_tensor.device)

            summed_vector = embedding_tensor + metadata_vector
        else:
            print(f"No metadata available for {name}, using embedding as is.")
            summed_vector = embedding_tensor

        summed_vectors.append(summed_vector.numpy())
        final_labels.append(label)




    return summed_vectors, final_labels


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import pickle
from sklearn.model_selection import train_test_split, cross_val_score
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report

# Assuming get_all_embeddings and sum_embeddings_with_metadata functions are defined elsewhere

# Get embeddings, names, and labels


embeddings, names, labels = get_all_embeddings(modelGNN,myloader)
summed_vectors, final_labels = sum_embeddings_with_metadata(embeddings, names, labels, data_merged)


# Faire des prédictions sur le nouveau dataset de test
y_pred_new = rf_clf_loaded.predict(summed_vectors)

# Calculer les métriques de performance
accuracy = accuracy_score(final_labels, y_pred_new)
precision = precision_score(final_labels, y_pred_new, average='binary')
recall = recall_score(final_labels, y_pred_new, average='binary')
f1 = f1_score(final_labels, y_pred_new, average='binary')

# Afficher les résultats
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")





In [None]:
with open('notre_android.pkl', 'wb') as f:
    # Utilisez pickle.dump pour écrire les objets dans le fichier
    pickle.dump((summed_vectors, final_labels), f)

In [None]:
import pickle
import torch
from torch_geometric.data import Data, DataLoader

# Charger les données malveillantes
with open('/content/drive/MyDrive/Dataset/Test/virus/virus_malware.pkl', 'rb') as f:
    loader_malware = pickle.load(f)

# Charger les données bénignes
with open('/content/drive/MyDrive/Dataset/Test/virus/virus_benin.pkl', 'rb') as f:
    loader_benign = pickle.load(f)
# Convertir les données malveillantes en objets PyTorch Geometric
pytorch_geometric_graphs_malware = [
    Data(x=entry['data'].x, edge_index=entry['data'].edge_index, y=torch.tensor([entry['label']]), name=entry['name'])
    for entry in loader_malware
]

# Convertir les données bénignes en objets PyTorch Geometric
pytorch_geometric_graphs_benign = [
    Data(x=entry['data'].x, edge_index=entry['data'].edge_index, y=torch.tensor([entry['label']]), name=entry['name'])
    for entry in loader_benign
]
# Combiner les deux ensembles de données (malware et bénin)
pytorch_geometric_graphs_combined = pytorch_geometric_graphs_malware + pytorch_geometric_graphs_benign
# Créer un DataLoader avec les données combinées
myloader = DataLoader(pytorch_geometric_graphs_combined, batch_size=2, shuffle=True)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report

# Supposons que get_all_embeddings et sum_embeddings_with_metadata sont définis ailleurs

# Obtenez les embeddings, les noms et les labels
embeddings, names, labels = get_all_embeddings(modelGNN, myloader)
summed_vectors, final_labels = sum_embeddings_with_metadata(embeddings, names, labels, data_merged)

# Convertir les vecteurs en tableaux NumPy
summed_vectors = np.array(summed_vectors)
final_labels = np.array(final_labels)

# Prédictions sur le dataset de test
y_pred_new = rf_clf_loaded.predict(summed_vectors)

# Calculer les métriques de performance
accuracy = accuracy_score(final_labels, y_pred_new)
precision = precision_score(final_labels, y_pred_new, average='binary')
recall = recall_score(final_labels, y_pred_new, average='binary')
f1 = f1_score(final_labels, y_pred_new, average='binary')

# Afficher les résultats
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")

# Rapport de classification
print("\nClassification Report:")
print(classification_report(final_labels, y_pred_new, target_names=['benign', 'malware']))

# Visualisation avec t-SNE
def plot_embeddings(X, y, title):
    tsne = TSNE(n_components=2, perplexity=min(30, len(X)-1), random_state=42)  # Ajuster perplexity
    X_tsne = tsne.fit_transform(X)

    plt.figure(figsize=(10, 7))
    scatter = plt.scatter(X_tsne[:, 0], X_tsne[:, 1], c=y, cmap='coolwarm', s=50, alpha=0.7, edgecolor='k')
    plt.colorbar(scatter, label='Class')
    plt.title(title)
    plt.xlabel("t-SNE component 1")
    plt.ylabel("t-SNE component 2")
    plt.show()

# Assurez-vous que les labels sont sous forme numérique pour la visualisation
if np.issubdtype(final_labels.dtype, np.number):
    numeric_labels = final_labels
    numeric_pred = y_pred_new
else:
    # Convertir les labels en valeurs numériques
    label_mapping = {'benign': 0, 'malware': 1}
    numeric_labels = np.array([label_mapping[label] for label in final_labels])
    numeric_pred = np.array([label_mapping[label] for label in y_pred_new])

# Afficher les embeddings avec les labels prédits
plot_embeddings(summed_vectors, numeric_pred, "t-SNE Visualization of Test Set with Predicted Labels")

# Demo


In [None]:
!pip install torch_geometric
!pip install streamlit
!pip install pyngrok
!pip install streamlit-option-menu
!pip install streamlit-lottie

In [None]:
#Installation de la bibliotheque pycg qui genre les graphe d'appel
!git clone https://github.com/vitsalis/PyCG.git
%cd PyCG
!pip install .


In [None]:
%cd ..

In [None]:
!ngrok authtoken 2K5Fn0LG8Miigzi0zMCW2oCoyKd_797ecEbvtBoxk5HCzCcP3

In [None]:

!pip install scikit-learn==1.3.2


In [None]:
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

# Charger les stopwords anglais
stop_words = set(stopwords.words('english'))


In [None]:
from pyngrok import ngrok

# Authenticate ngrok (if not already done)
ngrok.set_auth_token("2K5Fn0LG8Miigzi0zMCW2oCoyKd_797ecEbvtBoxk5HCzCcP3")

# Start ngrok tunnel to streamlit port
public_url = ngrok.connect(8501)
print(f"Streamlit is available at: {public_url}")


In [None]:
!streamlit run app.py &>/dev/null&

In [None]:
from pyngrok import ngrok
tunnels = ngrok.get_tunnels()
print("Active tunnels:", tunnels)