In [20]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType, IntegerType, StructType,StructField
import numpy as np
from math import log, exp
from pyspark.sql import SparkSession
import os
import sys
JAVA_HOME_PATH = "/usr/lib/jvm/java-17-openjdk-amd64"

# Установка критических переменных окружения
os.environ["JAVA_HOME"] = JAVA_HOME_PATH
os.environ["PATH"] = f"{JAVA_HOME_PATH}/bin:{os.environ['PATH']}"
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

spark = SparkSession.builder \
    .appName("SpamFilter") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "4") \
    .config("spark.default.parallelism", "32") \
    .config("spark.sql.shuffle.partitions", "32") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.driver.extraJavaOptions", 
            "--add-opens=java.base/java.lang=ALL-UNNAMED "
            "--add-opens=java.base/java.nio=ALL-UNNAMED "
            "--add-opens=java.base/java.util=ALL-UNNAMED "
            "-XX:+UseG1GC "  # Использовать Garbage Collector G1
            "-XX:+IgnoreUnrecognizedVMOptions") \
    .config("spark.executor.extraJavaOptions", 
            "--add-opens=java.base/java.lang=ALL-UNNAMED "
            "--add-opens=java.base/java.nio=ALL-UNNAMED "
            "--add-opens=java.base/java.util=ALL-UNNAMED "
            "-XX:+UseG1GC "
            "-XX:+IgnoreUnrecognizedVMOptions") \
    .getOrCreate()

In [47]:
schema = StructType([
    StructField("label", IntegerType(), True),  # Метка как целое число
    StructField("email", StringType(), True)    # Текст письма как строка
])
df = spark.read \
    .format("csv") \
    .option("header", True) \
    .option("delimiter", ",") \
    .schema(schema) \
    .load("../../resources/combined_data.csv")

def repair_label(label):
    if label is None:
        return 0
    if label >= 1:
        return 1
    else:
        return 0
repair_label_udf = F.udf(repair_label, IntegerType())
df = df.withColumn("label", repair_label_udf(F.col("label")))

In [48]:
df.printSchema()
df.show(10)

root
 |-- label: integer (nullable = true)
 |-- email: string (nullable = true)



25/06/26 10:08:05 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: label, text
 Schema: label, email
Expected: email but found: text
CSV file: file:///home/odmen/SpamFilter/resources/combined_data.csv


+-----+--------------------+
|label|               email|
+-----+--------------------+
|    1|ounce feather bow...|
|    1|wulvob get your m...|
|    0| computer connect...|
|    1|university degree...|
|    0|thanks for all yo...|
|    0|larry king live a...|
|    0|michael pobega wr...|
|    0|hi i have this er...|
|    1|works gateway wor...|
|    1|upon this account...|
+-----+--------------------+
only showing top 10 rows


                                                                                

In [None]:
local_data = new_df.collect()
model = {}

for row in local_data:
    for word in row["processed_email"]:
        if word not in model:
            model[word] = {"spam": 0, "ham": 0}
        if row["label"] == 1:
            model[word]["spam"] += 1
        else:
            model[word]["ham"] += 1

In [24]:
# model.get("lottery")
# model.get("sale")

In [25]:
# def predict_bayes(word):
#     word = word.lower()
#     num_spam_with_word = model[word]['spam']
#     num_ham_with_word = model[word]['ham']
#     return 1.0*num_spam_with_word/(num_spam_with_word + num_ham_with_word)

In [26]:
# predict_bayes("sale")

In [27]:
# print(type(local_data))

In [28]:
# emails = new_df

# def predict_naive_bayes(email):
#     total = emails.count()
#     num_spam = emails.where("label=1").count()
#     num_ham = total - num_spam
    
#     # Исправление опечатки
#     words = set(email.strip().split())
    
#     log_spam_prob = log(num_spam / total)  # Логарифм P(спам)
#     log_ham_prob = log(num_ham / total)    # Логарифм P(не спам)
    
#     alpha = 1e-6  # Параметр сглаживания для новых слов
    
#     for word in words:
#         if word in model:
#             # Правильное вычисление логарифмов вероятностей
#             log_spam_prob += log(model[word]['spam'] / num_spam)
#             log_ham_prob += log(model[word]['ham'] / num_ham)
#         else:
#             # Сглаживание для новых слов
#             log_spam_prob += log(alpha)
#             log_ham_prob += log(alpha)
    
#     # Преобразуем обратно в вероятности с защитой от переполнения
#     log_ratio = log_spam_prob - log_ham_prob
#     probability = 1 / (1 + exp(-log_ratio))
    
#     return probability

In [29]:
# predict_naive_bayes("meet me at")

In [30]:
# predict_naive_bayes("lottery sale")

In [31]:
# predict_naive_bayes("hi mom how are you")

In [32]:
# predict_naive_bayes("a")

In [33]:
# predict_naive_bayes("hi dad")

In [34]:
# predict_naive_bayes("meet me at the lobby of the hotel at nine am")

In [35]:
# small_df = spark.read.csv("emails.csv", header=True)
# small_df.printSchema()
# small_df.show(20)

In [36]:
# model.get("mom")

In [37]:
# model.get("hi")

In [43]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

df = df.filter(df.email.isNotNull())
# 1. Токенизация
tokenizer = Tokenizer(inputCol="email", outputCol="words")
# 2. Удаление стоп-слов
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
# 3. Векторизация
vectorizer = CountVectorizer(inputCol="filtered", outputCol="features",vocabSize=50000, minDF=5)
# 4. Классификатор
nb = NaiveBayes(featuresCol="features", labelCol="label", modelType="multinomial")

# 5. Пайплайн
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, nb])

# 6. Разделение на train/test
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# 7. Обучение
bayes_model = pipeline.fit(train_data)

# 8. Предсказания
predictions = bayes_model.transform(test_data)

# 9. Оценка качества
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
bayes_accuracy = evaluator.evaluate(predictions)

print(f"Accuracy: {bayes_accuracy:.4f}")


CountVectorizer_6ea776332d1c


25/06/26 09:51:33 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: label, text
 Schema: label, email
Expected: email but found: text
CSV file: file:///home/odmen/SpamFilter/resources/combined_data.csv
25/06/26 09:52:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: label, text
 Schema: label, email
Expected: email but found: text
CSV file: file:///home/odmen/SpamFilter/resources/combined_data.csv
25/06/26 09:52:29 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: label, text
 Schema: label, email
Expected: email but found: text
CSV file: file:///home/odmen/SpamFilter/resources/combined_data.csv
ERROR:root:KeyboardInterrupt while sending command.====>          (26 + 2) / 32]
Traceback (most recent call last):
  File "/home/odmen/SpamFilter/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^

KeyboardInterrupt: 

25/06/26 09:52:49 ERROR Executor: Exception in task 29.0 in stage 24.0 (TID 386)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/odmen/SpamFilter/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1968, in main
    split_index = read_int(infile)
                  ^^^^^^^^^^^^^^^^
  File "/home/odmen/SpamFilter/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    length = stream.read(4)
             ^^^^^^^^^^^^^^
KeyboardInterrupt

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:581)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:107)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:90)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at o

In [None]:
from pyspark.sql import Row

# 1. Создаём DataFrame с новым сообщением
new_text = "hi mom how are you?"
new_df = spark.createDataFrame([Row(email=new_text)])

# 2. Прогоняем через модель
prediction = bayes_model.transform(new_df)

# 3. Смотрим результат
prediction.select("email", "prediction").show()



+-------------------+----------+
|              email|prediction|
+-------------------+----------+
|hi mom how are you?|       0.0|
+-------------------+----------+



                                                                                

In [None]:
bayes_model.write().overwrite().save("models/spam_model_bayes")

                                                                                

In [None]:
from pyspark.ml.pipeline import PipelineModel

loaded_model = PipelineModel.load("models/spam_model_bayes")
new_text = "hi mom how are you?"
new_df = spark.createDataFrame([Row(email=new_text)])

# 2. Прогоняем через модель
prediction = loaded_model.transform(new_df)

# 3. Смотрим результат
prediction.select("email", "prediction").show()



+-------------------+----------+
|              email|prediction|
+-------------------+----------+
|hi mom how are you?|       0.0|
+-------------------+----------+



                                                                                

In [None]:
new_text = "hi mom how are you"
new_df = spark.createDataFrame([Row(email=new_text)])

# 2. Прогоняем через модель
prediction = loaded_model.transform(new_df)

# 3. Смотрим результат
prediction.select("email", "prediction").show()



+------------------+----------+
|             email|prediction|
+------------------+----------+
|hi mom how are you|       0.0|
+------------------+----------+



                                                                                

In [46]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

df = df.filter(df.email.isNotNull())
# 1. Токенизация
tokenizer = Tokenizer(inputCol="email", outputCol="words")
# 2. Удаление стоп-слов
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
# 3. Векторизация
vectorizer = CountVectorizer(inputCol="filtered", outputCol="features",vocabSize=50000, minDF=1)
# 4. Классификатор
lr = LogisticRegression(featuresCol="features", labelCol="label")

# 5. Пайплайн
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, lr])

# 6. Разделение на train/test
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# 7. Обучение
lr_model = pipeline.fit(train_data)

# 8. Предсказания
predictions = lr_model.transform(test_data)

# 9. Оценка качества
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {lr_accuracy:.4f}")

25/06/26 09:59:34 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: label, text
 Schema: label, email
Expected: email but found: text
CSV file: file:///home/odmen/SpamFilter/resources/combined_data.csv
ERROR:root:KeyboardInterrupt while sending command.                (4 + 2) / 32]
Traceback (most recent call last):
  File "/home/odmen/SpamFilter/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/odmen/SpamFilter/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

KeyboardInterrupt: 

25/06/26 09:59:40 ERROR Executor: Exception in task 7.0 in stage 26.0 (TID 395)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/odmen/SpamFilter/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1968, in main
    split_index = read_int(infile)
                  ^^^^^^^^^^^^^^^^
  File "/home/odmen/SpamFilter/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    length = stream.read(4)
             ^^^^^^^^^^^^^^
KeyboardInterrupt

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:581)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:107)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:90)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at or

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, FloatType
import pandas as pd

# Загрузка данных и подготовка (предполагаем, что df уже существует)
df = df.filter(df.email.isNotNull())

# 1. Создаем этапы пайплайна
tokenizer = Tokenizer(inputCol="email", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
vectorizer = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=50000, minDF=1)
lr = LogisticRegression(featuresCol="features", labelCol="label")

# 2. Собираем пайплайн
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, lr])

# 3. Разделяем данные
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# 4. Обучаем модель (теперь переменная lr_model будет определена)
lr_model = pipeline.fit(train_data)  # <-- Это ключевая строка!

# 5. Извлекаем векторную модель
vectorizer_model = lr_model.stages[2]  # Теперь ошибки не будет
vocabulary = vectorizer_model.vocabulary

# 6. Преобразуем вектора в массивы
def vector_to_array(v):
    return v.toArray().tolist()

vector_to_array_udf = udf(vector_to_array, ArrayType(FloatType()))

# 7. Добавляем features в предсказания
predictions = lr_model.transform(test_data)
result = predictions.withColumn("features_array", vector_to_array_udf(col("features")))

# 8. Сохраняем в CSV
result.select("email", "label", "prediction", "features_array") \
     .toPandas() \
     .to_csv("lr_features_results.csv", index=False)

# 9. Сохраняем словарь
pd.DataFrame({"term": vocabulary}).to_csv("lr_vocabulary.csv", index=False)

print("Данные успешно сохранены!")

25/06/26 10:30:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: label, text
 Schema: label, email
Expected: email but found: text
CSV file: file:///home/odmen/SpamFilter/resources/combined_data.csv
25/06/26 10:31:25 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: label, text
 Schema: label, email
Expected: email but found: text
CSV file: file:///home/odmen/SpamFilter/resources/combined_data.csv
25/06/26 10:31:51 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: label, text
 Schema: label, email
Expected: email but found: text
CSV file: file:///home/odmen/SpamFilter/resources/combined_data.csv
  [PACKAGE_NOT_INSTALLED] PyArrow >= 11.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
25/06/26 11:30:29 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: label, text
 Schema: lab

In [45]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, ArrayType, FloatType
import pandas as pd

# 1. Извлекаем CountVectorizerModel из обученного пайплайна
vectorizer_model = lr_model.stages[2]  # Индекс 2, так как vectorizer — третий этап в pipeline
vocabulary = vectorizer_model.vocabulary  # Получаем словарь терминов

# 2. Создаем UDF для преобразования SparseVector в массив чисел
def vector_to_array(v):
    return v.toArray().tolist()  # Преобразуем SparseVector в list

# Регистрируем UDF
vector_to_array_udf = udf(vector_to_array, ArrayType(FloatType()))

# 3. Добавляем столбец с признаками в виде массива
predictions_with_features = predictions.withColumn(
    "features_array",
    vector_to_array_udf(col("features"))
)

# 4. Собираем данные в Pandas DataFrame (если данные небольшие)
pdf = predictions_with_features.select(
    "email", "label", "prediction", "features_array"
).toPandas()

# 5. Сохраняем признаки и метки в CSV
pdf.to_csv("output/spark_lr_features.csv", index=False)

# 6. Дополнительно сохраняем словарь (vocabulary) в CSV
pd.DataFrame({"term": vocabulary}).to_csv(
    "output/spark_lr_vocabulary.csv",
    index=False
)

print("Данные успешно сохранены в CSV!")

NameError: name 'lr_model' is not defined

In [None]:
print(df.count())
print(df.where("label>0").count())
print(df.where("label=1").count())
print(df.where("label=0").count())

                                                                                

249688


                                                                                

43910


                                                                                

43910




205778


                                                                                

In [None]:
lr_model.write().overwrite().save("models/spam_model_lr")

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import DecisionTreeClassifier

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

df = df.filter(df.email.isNotNull())
# 1. Токенизация
tokenizer = Tokenizer(inputCol="email", outputCol="words")
# 2. Удаление стоп-слов
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
# 3. Векторизация
vectorizer = CountVectorizer(inputCol="filtered", outputCol="features",vocabSize=50000, minDF=1)
# 4. Классификатор
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# 5. Пайплайн
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, dt])

# 6. Разделение на train/test
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# 7. Обучение
dt_model = pipeline.fit(train_data)

# 8. Предсказания
predictions = dt_model.transform(test_data)

# 9. Оценка качества
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {dt_accuracy:.4f}")

25/06/23 11:01:24 WARN DAGScheduler: Broadcasting large task binary with size 1300.3 KiB
25/06/23 11:01:44 WARN MemoryStore: Not enough space to cache rdd_777_2 in memory! (computed 969.6 MiB so far)
25/06/23 11:01:44 WARN BlockManager: Persisting block rdd_777_2 to disk instead.
25/06/23 11:01:45 WARN MemoryStore: Not enough space to cache rdd_777_3 in memory! (computed 969.6 MiB so far)
25/06/23 11:01:45 WARN BlockManager: Persisting block rdd_777_3 to disk instead.
25/06/23 11:01:50 WARN MemoryStore: Not enough space to cache rdd_777_3 in memory! (computed 969.7 MiB so far)
25/06/23 11:01:55 WARN MemoryStore: Not enough space to cache rdd_777_5 in memory! (computed 419.8 MiB so far)
25/06/23 11:01:55 WARN BlockManager: Persisting block rdd_777_5 to disk instead.
25/06/23 11:02:00 WARN MemoryStore: Not enough space to cache rdd_777_4 in memory! (computed 969.6 MiB so far)
25/06/23 11:02:00 WARN BlockManager: Persisting block rdd_777_4 to disk instead.
25/06/23 11:02:12 WARN MemorySto

Accuracy: 0.9326


                                                                                

In [None]:
dt_model.write().overwrite().save("models/spam_model_dt")

                                                                                

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

df = df.filter(df.email.isNotNull())
# 1. Токенизация
tokenizer = Tokenizer(inputCol="email", outputCol="words")
# 2. Удаление стоп-слов
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
# 3. Векторизация
vectorizer = CountVectorizer(inputCol="filtered", outputCol="features",vocabSize=50000, minDF=1)
# 4. Классификатор
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50)

# 5. Пайплайн
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, rf])

# 6. Разделение на train/test
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# 7. Обучение
rf_model = pipeline.fit(train_data)

# 8. Предсказания
predictions = rf_model.transform(test_data)

# 9. Оценка качества
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rf_accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {rf_accuracy:.4f}")

25/06/23 11:10:31 WARN DAGScheduler: Broadcasting large task binary with size 1324.4 KiB
25/06/23 11:10:49 WARN MemoryStore: Not enough space to cache rdd_881_2 in memory! (computed 970.6 MiB so far)
25/06/23 11:10:49 WARN BlockManager: Persisting block rdd_881_2 to disk instead.
25/06/23 11:10:49 WARN MemoryStore: Not enough space to cache rdd_881_3 in memory! (computed 970.6 MiB so far)
25/06/23 11:10:49 WARN BlockManager: Persisting block rdd_881_3 to disk instead.
25/06/23 11:10:55 WARN MemoryStore: Not enough space to cache rdd_881_3 in memory! (computed 970.6 MiB so far)
25/06/23 11:10:59 WARN MemoryStore: Not enough space to cache rdd_881_5 in memory! (computed 420.3 MiB so far)
25/06/23 11:10:59 WARN BlockManager: Persisting block rdd_881_5 to disk instead.
25/06/23 11:11:03 WARN MemoryStore: Not enough space to cache rdd_881_4 in memory! (computed 970.6 MiB so far)
25/06/23 11:11:03 WARN BlockManager: Persisting block rdd_881_4 to disk instead.
25/06/23 11:11:05 WARN MemorySto

Accuracy: 0.8298


                                                                                

In [None]:
rf_model.write().overwrite().save("models/spam_model_rf")

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LinearSVC  # Импорт SVM
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Фильтрация пустых значений
df = df.filter(df.email.isNotNull())

# 1. Токенизация
tokenizer = Tokenizer(inputCol="email", outputCol="words")

# 2. Удаление стоп-слов
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# 3. Векторизация
vectorizer = CountVectorizer(
    inputCol="filtered", 
    outputCol="features",
    vocabSize=50000, 
    minDF=1
)

# 4. Классификатор SVM (LinearSVC)
svm = LinearSVC(
    featuresCol="features", 
    labelCol="label",
    maxIter=10,          # Количество итераций
    regParam=0.1         # Параметр регуляризации
)

# 5. Пайплайн
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, svm])

# 6. Разделение на train/test
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# 7. Обучение модели
svm_model = pipeline.fit(train_data)

# 8. Предсказания
predictions = svm_model.transform(test_data)

# 9. Оценка качества
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
svm_accuracy = evaluator.evaluate(predictions)
print(f"SVM Accuracy: {svm_accuracy:.4f}")

# Дополнительные метрики (F1, Precision, Recall)
evaluator.setMetricName("f1").evaluate(predictions)
evaluator.setMetricName("weightedPrecision").evaluate(predictions)
evaluator.setMetricName("weightedRecall").evaluate(predictions)

                                                                                

SVM Accuracy: 0.9485


                                                                                

0.9485006083131676

In [None]:
svm_model.write().overwrite().save("models/spam_model_svm")

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import GBTClassifier  # Импорт GBT
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Фильтрация пустых значений
df = df.filter(df.email.isNotNull())

# 1. Токенизация
tokenizer = Tokenizer(inputCol="email", outputCol="words")

# 2. Удаление стоп-слов
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# 3. Векторизация
vectorizer = CountVectorizer(
    inputCol="filtered", 
    outputCol="features",
    vocabSize=50000, 
    minDF=1
)

# 4. Классификатор GBT (Gradient-Boosted Trees)
gbt = GBTClassifier(
    featuresCol="features", 
    labelCol="label",
    maxIter=50,           # Количество деревьев (аналог n_estimators)
    maxDepth=5,           # Глубина дерева
    stepSize=0.1,         # Скорость обучения (learning rate)
    subsamplingRate=0.8,  # Доля данных для каждого дерева
    seed=42
)

# 5. Пайплайн
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, gbt])

# 6. Разделение на train/test
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# 7. Обучение модели
gbt_model = pipeline.fit(train_data)

# 8. Предсказания
predictions = gbt_model.transform(test_data)

# 9. Оценка качества
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
gbt_accuracy = evaluator.evaluate(predictions)
print(f"GBT Accuracy: {gbt_accuracy:.4f}")

# Дополнительные метрики
print(f"F1-score: {evaluator.setMetricName('f1').evaluate(predictions):.4f}")
print(f"Precision: {evaluator.setMetricName('weightedPrecision').evaluate(predictions):.4f}")
print(f"Recall: {evaluator.setMetricName('weightedRecall').evaluate(predictions):.4f}")

25/06/23 11:27:16 WARN DAGScheduler: Broadcasting large task binary with size 1300.6 KiB
25/06/23 11:27:36 WARN MemoryStore: Not enough space to cache rdd_1221_2 in memory! (computed 969.4 MiB so far)
25/06/23 11:27:36 WARN BlockManager: Persisting block rdd_1221_2 to disk instead.
25/06/23 11:27:36 WARN MemoryStore: Not enough space to cache rdd_1221_3 in memory! (computed 969.4 MiB so far)
25/06/23 11:27:36 WARN BlockManager: Persisting block rdd_1221_3 to disk instead.
25/06/23 11:27:41 WARN MemoryStore: Not enough space to cache rdd_1221_3 in memory! (computed 969.4 MiB so far)
25/06/23 11:27:42 WARN MemoryStore: Not enough space to cache rdd_1221_3 in memory! (computed 419.7 MiB so far)
25/06/23 11:27:48 WARN MemoryStore: Not enough space to cache rdd_1221_5 in memory! (computed 273.8 MiB so far)
25/06/23 11:27:48 WARN BlockManager: Persisting block rdd_1221_5 to disk instead.
25/06/23 11:27:51 WARN MemoryStore: Not enough space to cache rdd_1221_4 in memory! (computed 969.4 MiB s

GBT Accuracy: 0.9613


                                                                                

F1-score: 0.9619


                                                                                

Precision: 0.9631


                                                                                

Recall: 0.9613


In [None]:
gbt_model.write().overwrite().save("models/spam_model_gbt")

                                                                                

In [None]:
print("Bayes:",bayes_accuracy)
print("logical regression:",lr_accuracy)
print("random forest:",rf_accuracy)
print("decision_tree",dt_accuracy)
print("SVM:", svm_accuracy)
print("gbt_accuracy", gbt_accuracy)

Bayes: 0.9544366752677247
logical regression: 0.9760284503389174
random forest: 0.8298193777825313
decision_tree 0.9325641402730056
SVM: 0.9485006083131676
gbt_accuracy 0.9612685000735324
