In [None]:
from pyspark.sql import SparkSession

In [None]:
#i strated a session and tested it

spark = SparkSession.builder \
    .appName("PhishTounsi_Urls").config("spark.task.maxFailures", "8") \
    .getOrCreate()


print(spark.version)

In [None]:
urls_df = spark.read.csv("balanced_urls.csv", header=True, inferSchema=True)

urls_df.show(10)

In [None]:
from pyspark.sql.functions import when

urls_merged_df = urls_merged_df.withColumn("label", 
                                           when((urls_merged_df["is_spam"] == "true"), 1)
                                           .otherwise(0))

urls_merged_df.show(20)

In [None]:
from pyspark.sql.functions import col

# Remove duplicates
urls_merged_df = urls_merged_df.dropDuplicates(["urls"])


# Filter out invalid URLs
urls_merged_df= urls_merged_df.filter(col("urls").rlike(r"^https?://"))


In [None]:
from pyspark.sql.functions import length, regexp_replace, split

# Feature: URL length
urls_merged_df = urls_merged_df.withColumn("url_length", length(col("urls")))

# Feature: Special character count
urls_merged_df= urls_merged_df.withColumn("special_char_count", length(regexp_replace(col("urls"), "[^@#?=%&-]", "")))

# Feature: Presence of IP address
urls_merged_df= urls_merged_df.withColumn("contains_ip", when(col("urls").rlike(r"[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+"), 1).otherwise(0))
urls_merged_df.show(10)

In [None]:
from pyspark.sql.functions import lower

# Convert URLs to lowercase
urls_merged_df= urls_merged_df.withColumn("urls", lower(col("urls")))

# Filter out short URLs (e.g., less than 10 characters)
urls_merged_df =urls_merged_df.filter(length(col("urls")) > 10)


In [None]:
from pyspark.sql.functions import length, when, split, col, size

# Feature: Domain length
urls_merged_df = urls_merged_df.withColumn(
    "domain_length", 
    length(split(col("urls"), "/")[2])  # Get the domain part of the URL
)

# Feature: Suspicious TLDs
urls_merged_df= urls_merged_df.withColumn(
    "suspicious_tld", 
    when(col("urls").rlike(r"\.(xyz|info|tk|top|icu)$"), 1).otherwise(0)
)

# Feature: HTTPS usage
urls_merged_df =urls_merged_df.withColumn(
    "uses_https", 
    when(col("urls").startswith("https"), 1).otherwise(0)
)

# Feature: Subdomain count
urls_merged_df= urls_merged_df.withColumn(
    "subdomain_count", 
    size(split(col("urls"), "\\.")) - 2  # Subdomains are the parts before the domain, adjust accordingly
)

# Feature: Presence of phishing keywords
urls_merged_df =urls_merged_df.withColumn(
    "phishing_keywords", 
    when(col("urls").rlike(r"(login|verify|secure|bank|update)"), 1).otherwise(0)
)

# Feature: URL encoding
urls_merged_df = urls_merged_df.withColumn(
    "url_encoded", 
    when(col("urls").rlike(r"%[0-9A-Fa-f]{2}"), 1).otherwise(0)
)

# Show the resulting DataFrame with the new features
urls_merged_df.show(10)


In [None]:
from pyspark.ml.feature import VectorAssembler

# Define the list of feature columns to be included in the model
feature_columns = [
    "url_length", "special_char_count", "contains_ip", "domain_length",
    "suspicious_tld", "uses_https", "subdomain_count", "phishing_keywords", "url_encoded"
]

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
urls_merged_df = assembler.transform(urls_merged_df)


In [None]:
# Split the dataset into training and testing sets (80% train, 20% test)
train_ratio = 0.8
urls_train_df, urls_test_df = urls_merged_df.randomSplit([train_ratio, 1 - train_ratio], seed=42)

# Check the count of each label (0 and 1) in the training set
train_counts = urls_train_df.groupBy("label").count()

# Check the count of each label (0 and 1) in the testing set
test_counts = urls_test_df.groupBy("label").count()

# Show the counts for both training and testing sets
train_counts.show()
test_counts.show()