In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, when, col, length, regexp_replace
from pyspark.sql.types import IntegerType, BooleanType, StringType
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, ChiSqSelector
from pyspark.ml import Pipeline

In [37]:
spark = SparkSession.builder.appName('URLs_mining').getOrCreate()
spark

In [38]:
# Read the CSV file into a DataFrame
data = spark.read.csv("malicious_urls.csv", header=True, inferSchema=True)

# Show the first few rows of the DataFrame
data.show()

+--------------------+----------+
|                 url|      type|
+--------------------+----------+
|    br-icloud.com.br|  phishing|
|mp3raid.com/music...|    benign|
|bopsecrets.org/re...|    benign|
|http://www.garage...|defacement|
|http://adventure-...|defacement|
|http://buzzfil.ne...|    benign|
|espn.go.com/nba/p...|    benign|
|yourbittorrent.co...|    benign|
|http://www.pashmi...|defacement|
|allmusic.com/albu...|    benign|
|corporationwiki.c...|    benign|
|http://www.ikenmi...|defacement|
|myspace.com/video...|    benign|
|http://www.lebens...|defacement|
|http://www.szabad...|defacement|
|http://larcadelca...|defacement|
|quickfacts.census...|    benign|
|nugget.ca/Article...|    benign|
|uk.linkedin.com/p...|    benign|
|http://www.vnic.c...|defacement|
+--------------------+----------+
only showing top 20 rows



In [39]:
data.groupBy("type").count().show()

+--------------------+------+
|                type| count|
+--------------------+------+
|              benign|428103|
|          defacement| 96457|
|            phishing| 94108|
|             malware| 32520|
|                NULL|    15|
|                Ð|     1|
|PhµW\v;XyOy...|     1|
|cÔ¡æ>1\bHÇÕd...|     1|
|                spam| 12000|
+--------------------+------+



## Data pre-processing

In [40]:
data = data.dropDuplicates(["url"])
data = data.na.drop()

data.groupBy("type").count().show()

+--------------------+------+
|                type| count|
+--------------------+------+
|                spam| 11921|
|              benign|428080|
|          defacement| 95308|
|            phishing| 94083|
|             malware| 23645|
|                Ð|     1|
|PhµW\v;XyOy...|     1|
|cÔ¡æ>1\bHÇÕd...|     1|
+--------------------+------+



In [41]:
data = data.filter(col("type").rlike(r'^[ -~]+$'))
data.groupBy("type").count().show()

+----------+------+
|      type| count|
+----------+------+
|      spam| 11921|
|    benign|428080|
|defacement| 95308|
|  phishing| 94083|
|   malware| 23645|
+----------+------+



In [42]:
data = data.withColumn("url_type", when(data["type"].contains("benign"), 0).otherwise(1))
#data = data.drop("type")
data.show()

+--------------------+--------+--------+
|                 url|    type|url_type|
+--------------------+--------+--------+
|   H\vÖË]t¹[ÈöýE|phishing|       1|
|^oð]Â|¬|hõElò...|phishing|       1|
|"½<+U½¹1\f[...|phishing|       1|
|"äÕ3ñºT-\fTÖGÑîÊ...|phishing|       1|
|"ëËl×uÏB'JI¨GÙn"...|phishing|       1|
|'118bm.com/images...|  benign|       0|
|'1pcables-inox.co...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
|'9d345009-a-62cb3...|  benign|       0|
+--------------------+--------+--------+
only showing top

In [43]:
data = data.filter(col("url").rlike(r'^[ -~]+$'))
data.show()

+--------------------+------+--------+
|                 url|  type|url_type|
+--------------------+------+--------+
|'118bm.com/images...|benign|       0|
|'1pcables-inox.co...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
|'9d345009-a-62cb3...|benign|       0|
+--------------------+------+--------+
only showing top 20 rows



In [44]:
data.groupBy("url_type").count().show()

+--------+------+
|url_type| count|
+--------+------+
|       1|224323|
|       0|427801|
+--------+------+



In [45]:
sample_size = 100000

data = data.sampleBy("url_type", fractions={0 : sample_size/data.filter(col("url_type") == 0).count(),
                                               1 : sample_size/data.filter(col("url_type") == 1).count()}, seed=42)
data.groupBy("url_type").count().show()

+--------+------+
|url_type| count|
+--------+------+
|       1|100473|
|       0|100153|
+--------+------+



## Features engineering

In [46]:
data = data.withColumn("url_length", length("url"))
data = data.withColumn("digit_count", length(regexp_replace(col("url"), "[^0-9]", "")))
data = data.withColumn("ampersand_count", length(regexp_replace(col("url"), "[^&]", "")))
data = data.withColumn("underscore_count", length(regexp_replace(col("url"), "[^_]", "")))
data = data.withColumn("dot_count", length(regexp_replace(col("url"), "[^.]", "")))
data = data.withColumn("percent_count", length(regexp_replace(col("url"), "[^%]", "")))
data = data.withColumn("at_count", length(regexp_replace(col("url"), "[^@]", "")))
data = data.withColumn("tilde_count", length(regexp_replace(col("url"), "[^~]", "")))
data = data.withColumn("hash_count", length(regexp_replace(col("url"), "[^#]", "")))
data = data.withColumn("has_https", when(data["url"].startswith("https"), 1).otherwise(0))
data = data.withColumn("has_http", when(data["url"].startswith("http"), 1).otherwise(0))
data = data.withColumn("starts_with_digit", when(data["url"].rlike("^[0-9]"), 1).otherwise(0))

In [47]:
data.show()

+--------------------+------+--------+----------+-----------+---------------+----------------+---------+-------------+--------+-----------+----------+---------+--------+-----------------+
|                 url|  type|url_type|url_length|digit_count|ampersand_count|underscore_count|dot_count|percent_count|at_count|tilde_count|hash_count|has_https|has_http|starts_with_digit|
+--------------------+------+--------+----------+-----------+---------------+----------------+---------+-------------+--------+-----------+----------+---------+--------+-----------------+
|'9d345009-a-62cb3...|benign|       0|       363|         46|              2|               1|        3|            3|       0|          0|         0|        0|       0|                0|
|'9d345009-a-62cb3...|benign|       0|       371|         51|              2|               6|        3|            3|       0|          0|         0|        0|       0|                0|
|'9d345009-a-62cb3...|benign|       0|       371|         50

### Features selection

In [48]:
feature_columns = [
    "url_length",
    "digit_count",
    "ampersand_count",
    "underscore_count",
    "dot_count",
    "percent_count",
    "at_count",
    "tilde_count",
    "hash_count",
    "has_https",
    "has_http",
    "starts_with_digit"
]

In [49]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features_vector")
data = assembler.transform(data)

In [50]:
data.show()

+--------------------+------+--------+----------+-----------+---------------+----------------+---------+-------------+--------+-----------+----------+---------+--------+-----------------+--------------------+
|                 url|  type|url_type|url_length|digit_count|ampersand_count|underscore_count|dot_count|percent_count|at_count|tilde_count|hash_count|has_https|has_http|starts_with_digit|     features_vector|
+--------------------+------+--------+----------+-----------+---------------+----------------+---------+-------------+--------+-----------+----------+---------+--------+-----------------+--------------------+
|'9d345009-a-62cb3...|benign|       0|       363|         46|              2|               1|        3|            3|       0|          0|         0|        0|       0|                0|(12,[0,1,2,3,4,5]...|
|'9d345009-a-62cb3...|benign|       0|       371|         51|              2|               6|        3|            3|       0|          0|         0|        0|    

#### Min-Max scaling

In [51]:
# Creazione di un oggetto MinMaxScaler
scaler = MinMaxScaler(inputCol="features_vector", outputCol="scaled_features")

# Creazione di un oggetto Pipeline
pipeline = Pipeline(stages=[scaler])

# Addestramento del modello e trasformazione del DataFrame
model = pipeline.fit(data)
data = model.transform(data)

In [None]:
data.select("scaled_features").show(truncate=False)

#### Train test split

In [None]:
train, test = data.randomSplit([0.8, 0.2], seed = 2018)

## Classification models

In [None]:
from pyspark.ml.classification import RandomForestClassifier, NaiveBayes, LinearSVC, GBTClassifier

### Random forest classifier

In [None]:
rf = RandomForestClassifier(featuresCol = 'scaled_features', labelCol = 'url_type')

In [None]:
rfModel = rf.fit(train)
predictions = rfModel.transform(test)

In [None]:
predictions.select("url_type", "prediction").show()

### Naive Bayes

### Support Vector Machine

### Gradient-Boosted Trees 