<a href="https://colab.research.google.com/github/susichao/ml-project/blob/main/Copy_of_dga_ml_detec.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pushbullet.py
!pip install tldextract
!pip install scapy
!pip install configparser


Collecting pushbullet.py
  Downloading pushbullet.py-0.12.0-py2.py3-none-any.whl (10 kB)
Collecting python-magic (from pushbullet.py)
  Downloading python_magic-0.4.27-py2.py3-none-any.whl (13 kB)
Installing collected packages: python-magic, pushbullet.py
Successfully installed pushbullet.py-0.12.0 python-magic-0.4.27
Collecting tldextract
  Downloading tldextract-5.1.2-py3-none-any.whl (97 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m97.6/97.6 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
Collecting requests-file>=1.4 (from tldextract)
  Downloading requests_file-2.0.0-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: requests-file, tldextract
Successfully installed requests-file-2.0.0 tldextract-5.1.2
Collecting scapy
  Downloading scapy-2.5.0.tar.gz (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels 

In [None]:
!pip install --upgrade tldextract



In [24]:
import os
import pandas as pd
import numpy as np
import tldextract
import scipy
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer

def load_data() -> pd.DataFrame:
    if os.path.isfile('/content/sample_data/dga_data.csv'):
        data = pd.read_csv('/content/sample_data/dga_data.csv')
        return data
    else:
        raise ValueError("Error loading data. Please check the files.")

# Extract bigram features from domain names
def extract_bigram_features(domain: str) -> list:
    features = []
    for i in range(len(domain) - 1):
        features.append(domain[i:i+2])
    return features

# Calculate entropy of a domain name
def calculate_entropy(fqdn_counts: dict) -> float:
    entropy = 0
    total_count = sum(fqdn_counts.values())
    for count in fqdn_counts.values():
        p = count / total_count
        entropy += -p * np.log2(p)
    return entropy

# Extract lexical features from domain names
def extract_lexical_features(domain: str) -> list:
    features = []
    if domain.count('.') == 1:
        features.append(1)
    else:
        features.append(0)
    if domain.islower():
        features.append(1)
    else:
        features.append(0)
    if domain.isalnum():
        features.append(1)
    else:
        features.append(0)
    if domain.count('-') > 0:
        features.append(1)
    else:
        features.append(0)
    if domain.count('_') > 0:
        features.append(1)
    else:
        features.append(0)
    if domain.count('0') == len(domain):
        features.append(1)
    else:
        features.append(0)
    return features


import re

def preprocess_domain(domain: str) -> str:
    # Remove stop words
    stop_words = set(stopwords.words('english'))
    domain = ' '.join([word for word in domain.split() if word not in stop_words])

    # Remove non-alphabetical characters
    domain = re.sub(r'[^a-zA-Z]', '', domain)

    return domain

def extract_clustering_features(domain: str) -> list:
    if not domain:
        return []
    vectorizer = TfidfVectorizer(analyzer='word', ngram_range=(1, 2))
    X = vectorizer.fit_transform([domain])
    return X.toarray()[0]

# Extract features from domain names

def extract_features(domain: str) -> list:
    extract_result = tldextract.extract(domain)
    fqdn_counts = extract_result.fqdn_counts if hasattr(extract_result, 'fqdn_counts') else {}
    entropy = calculate_entropy(fqdn_counts)

    features = []
    features.extend(extract_bigram_features(domain))
    features.append(entropy)
    features.extend(extract_lexical_features(domain))
    features.extend(extract_clustering_features(domain))

    return features



def train_model():
    data = load_data()
    if data is not None:
        X, y = [], []
        for index, row in data.iterrows():
            domain = row['domain']
            features = extract_features(domain)
            X.append(features)
           # Assuming the column name for label is 'malicious'
            label = 1 if row['isDGA'] == 1 else 0
            y.append(label)
        # Split data into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # Train the ML model
        model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
        model.fit(X_train, y_train)

        # Evaluate the model
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        print("Model accuracy:", accuracy)
    else:
        raise ValueError("Error loading data. Model training failed.")


if __name__ == "__main__":
    train_model()


ValueError: empty vocabulary; perhaps the documents only contain stop words

In [26]:
data = pd.read_csv('/content/sample_data/malicious_phish.csv')
print(data.columns)


Index(['url', 'type'], dtype='object')


In [27]:
def has_valid_ngrams(domain, n=2):
    # Convert the domain to a string
    domain = str(domain)

    # Extract n-grams from the domain
    ngrams_list = list(ngrams(domain, n))

    # Check if the list of n-grams is empty
    if not ngrams_list:
        return False
    else:
        return True

In [30]:
df = pd.read_csv('/content/sample_data/malicious_phish.csv')

# Check if each domain has valid 2-grams
df['has_valid_2grams'] = df['url'].apply(has_valid_ngrams)

# Print the results
print(df.head())

                                                 url        type  \
0                                   br-icloud.com.br    phishing   
1                mp3raid.com/music/krizz_kaliko.html      benign   
2                    bopsecrets.org/rexroth/cr/1.htm      benign   
3  http://www.garage-pirenne.be/index.php?option=...  defacement   
4  http://adventure-nicaragua.net/index.php?optio...  defacement   

   has_valid_2grams  
0              True  
1              True  
2              True  
3              True  
4              True  


In [64]:
import os
import pandas as pd
import numpy as np
import tldextract
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
import logging

In [65]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s -%(lineno)s')

In [66]:
def load_data() -> pd.DataFrame:
    """Load the data from the specified file path."""
    file_path = '/content/sample_data/malicious_phish.csv'
    if os.path.isfile(file_path):
        data = pd.read_csv(file_path)
        return data
    else:
        raise ValueError(f"Error loading data. File not found: {file_path}")

In [67]:
def calculate_entropy(fqdn_counts: dict) -> float:
    """Calculate the entropy of a domain name based on the frequency counts of its components."""
    entropy = 0
    total_count = sum(fqdn_counts.values())
    for count in fqdn_counts.values():
        p = count / total_count
        entropy += -p * np.log2(p)
    return entropy

In [68]:
def extract_features(domain: str) -> list:
    """Extract features from a domain name."""
    try:
        extract_result = tldextract.extract(domain)
        fqdn_counts = extract_result.fqdn_counts if hasattr(extract_result, 'fqdn_counts') else {}
        entropy = calculate_entropy(fqdn_counts)

        vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4))
        X = vectorizer.fit_transform([domain])
        features = list(X.toarray()[0])
        features.append(entropy)

        return features
    except ValueError as e:
        if "empty vocabulary" in str(e):
            logging.warning(f"Empty vocabulary for domain '{domain}'. Using entropy as the only feature.")
            extract_result = tldextract.extract(domain)
            fqdn_counts = extract_result.fqdn_counts if hasattr(extract_result, 'fqdn_counts') else {}
            entropy = calculate_entropy(fqdn_counts)
            return [entropy]
        else:
            raise e

In [69]:
def train_model(data: pd.DataFrame) -> None:
    """Train and evaluate the machine learning model."""
    X, y = [], []
    for index, row in data.iterrows():
        domain = row['url']
        features = extract_features(domain)
        X.append(features)
        label = 1 if row['type'] == 1 else 0
        y.append(label)
    # Normalize features
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    models = [
        RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42),
        LogisticRegression(random_state=42)
    ]

    for model in models:
        logging.info(f"Training {model.__class__.__name__}...")
        # Cross-validation
        scores = cross_val_score(model, X, y, cv=5)
        logging.info(f"Cross-validation scores: {scores}")
        logging.info(f"Mean accuracy: {scores.mean():.3f}")
         # Fit the model
        model.fit(X_train, y_train)
        # Evaluate the model
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        logging.info(f"Test accuracy: {accuracy:.3f}")
        logging.info(f"Classification report:\n{classification_report(y_test, y_pred)}")



In [None]:
if __name__ == "__main__":
    try:
        data = load_data()
        train_model(data)
    except Exception as e:
        logging.error(f"Error: {e}")