In [19]:
import os
import warnings
import multiprocessing

import numpy as np
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, ByteType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, Word2Vec
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.pipeline import Pipeline
from tqdm import tqdm

In [2]:
warnings.filterwarnings("ignore")

In [3]:
TRAIN_PATH = os.path.join("data", "train.csv")
TEST_PATH = os.path.join("data", "test.csv")
PROC_CNT = multiprocessing.cpu_count()

In [4]:
spark = (
    SparkSession
    .builder.appName("task_5")
    .config("spark.driver.memory", "8g")
    .config("spark.driver.cores", "4")
    .config("spark.driver.maxResultSize", "12g")
    .config("spark.executor.memory", "8g")
    .config("spark.executor.cores", "4")
    .getOrCreate()
)
spark.sparkContext.setLogLevel('ERROR')

22/12/10 21:25:37 WARN Utils: Your hostname, pc resolves to a loopback address: 127.0.1.1; using 192.168.0.101 instead (on interface eno1)
22/12/10 21:25:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/10 21:25:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
schema = StructType([
        StructField("id", StringType()),
        StructField("comment_text", StringType()),
        StructField("toxic", ByteType()),
        StructField("severe_toxic", ByteType()),
        StructField("obscene", ByteType()),
        StructField("threat", ByteType()),
        StructField("insult", ByteType()),
        StructField("identity_hate", ByteType())
])
train_sdf = spark.read.option("multiline",True).csv(TRAIN_PATH, header=True, quote="\"", escape="\"", sep=',', schema=schema)
train_sdf.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- toxic: byte (nullable = true)
 |-- severe_toxic: byte (nullable = true)
 |-- obscene: byte (nullable = true)
 |-- threat: byte (nullable = true)
 |-- insult: byte (nullable = true)
 |-- identity_hate: byte (nullable = true)



In [6]:
train_sdf.show()

+----------------+--------------------+-----+------------+-------+------+------+-------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
|0000997932d777bf|Explanation\nWhy ...|    0|           0|      0|     0|     0|            0|
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|
|000113f07ec002fd|Hey man, I'm real...|    0|           0|      0|     0|     0|            0|
|0001b41b1c6bb37e|"\nMore\nI can't ...|    0|           0|      0|     0|     0|            0|
|0001d958c54c6e35|You, sir, are my ...|    0|           0|      0|     0|     0|            0|
|00025465d4725e87|"\n\nCongratulati...|    0|           0|      0|     0|     0|            0|
|0002bcb3da6cb337|COCKSUCKER BEFORE...|    1|           1|      1|     0|     1|            0|
|00031b1e95af7921|Your vandalism to...|    0|     

In [7]:
train_sdf = (
    train_sdf
    .withColumn("comment_text", F.lower("comment_text"))
    .withColumn("comment_text", F.regexp_replace(F.col("comment_text"), "[\n\"]", " "))
    .withColumn("comment_text", F.trim(F.col("comment_text")))
)
train_sdf.show()

+----------------+--------------------+-----+------------+-------+------+------+-------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
|0000997932d777bf|explanation why t...|    0|           0|      0|     0|     0|            0|
|000103f0d9cfb60f|d'aww! he matches...|    0|           0|      0|     0|     0|            0|
|000113f07ec002fd|hey man, i'm real...|    0|           0|      0|     0|     0|            0|
|0001b41b1c6bb37e|more i can't make...|    0|           0|      0|     0|     0|            0|
|0001d958c54c6e35|you, sir, are my ...|    0|           0|      0|     0|     0|            0|
|00025465d4725e87|congratulations f...|    0|           0|      0|     0|     0|            0|
|0002bcb3da6cb337|cocksucker before...|    1|           1|      1|     0|     1|            0|
|00031b1e95af7921|your vandalism to...|    0|     

## 1. HashingTF и IDF

In [12]:
def get_cv_model(col_name: str) -> CrossValidator:
    hashing_tf = HashingTF(inputCol="remover_out", outputCol="tf_out")
    log_reg = LogisticRegression(featuresCol="idf_out", labelCol=col_name)
    pipe = Pipeline(
        stages=[
            Tokenizer(inputCol="comment_text", outputCol="tokenizer_out"),
            (
                StopWordsRemover(
                    stopWords=StopWordsRemover.loadDefaultStopWords("english"),
                    inputCol="tokenizer_out",
                    outputCol="remover_out",
                )
            ),
            hashing_tf,
            IDF(inputCol="tf_out", outputCol="idf_out"),
            log_reg
        ]
    )
    
    
    grid = (
        ParamGridBuilder()
        # .addGrid(log_reg.maxIter, [100, 200])
        .baseOn({log_reg.maxIter: 100})
        .addGrid(log_reg.regParam, [0.03, 0.1, 1.0])
        .addGrid(log_reg.elasticNetParam, [0.05, 0.1, 1.0])
        .addGrid(hashing_tf.numFeatures, [1000, 10_000, 20_000])
        .build()
    )
    evaluator = BinaryClassificationEvaluator(labelCol=col_name)
    cv_model = CrossValidator(
        estimator=pipe,
        estimatorParamMaps=grid,
        evaluator=evaluator,
        parallelism=PROC_CNT
    )
    
    return cv_model

In [13]:
%%time
target_cols = [col for col in train_sdf.columns if not col in ["id", "comment_text"]]
pbar = tqdm(target_cols)
results = {}

for target_col in pbar:
    pbar.set_postfix_str(target_col)
    # Free memory
    spark.catalog.clearCache()
    model = get_cv_model(target_col)
    model = model.fit(train_sdf)
    results[target_col] = {
        'model': model,
        'score': max(model.avgMetrics),
    }

100%|██████████████████████████████████████████████████████| 6/6 [30:26<00:00, 304.38s/it, identity_hate]

CPU times: user 57.5 s, sys: 17 s, total: 1min 14s
Wall time: 30min 26s





In [14]:
for col in results:
    print(f"Label: {col}\nScore: {results[col]['score']}\n")

Label: toxic
Score: 0.8937447870122099

Label: severe_toxic
Score: 0.9247631422657824

Label: obscene
Score: 0.9053453986836191

Label: threat
Score: 0.7556043304167565

Label: insult
Score: 0.9071599438837219

Label: identity_hate
Score: 0.8330782953044618



In [33]:
for col in results:
    cv_model = results['toxic']['model']
    params = cv_model.getEstimatorParamMaps()[np.argmax(cv_model.avgMetrics)]
    
    for param in params:
        if param.name == 'numFeatures':
            print(f"For {col} numFeatures: {params[param]}")

For toxic numFeatures: 20000
For severe_toxic numFeatures: 20000
For obscene numFeatures: 20000
For threat numFeatures: 20000
For insult numFeatures: 20000
For identity_hate numFeatures: 20000


In [34]:
spark.stop()

Вывод: numFeatures лучше выбирать большим. Попытки ставить больше 20 000 приводили к тому, что не хватало памяти

## Word2Vec

In [8]:
def get_cv_model(col_name: str) -> CrossValidator:
    w2v = Word2Vec(inputCol="remover_out", outputCol="w2v")
    log_reg = LogisticRegression(featuresCol="w2v", labelCol=col_name)
    pipe = Pipeline(
        stages=[
            Tokenizer(inputCol="comment_text", outputCol="tokenizer_out"),
            (
                StopWordsRemover(
                    stopWords=StopWordsRemover.loadDefaultStopWords("english"),
                    inputCol="tokenizer_out",
                    outputCol="remover_out",
                )
            ),
            w2v,
            log_reg
        ]
    )
    
    
    grid = (
        ParamGridBuilder()
        .baseOn({log_reg.maxIter: 100})
        .addGrid(log_reg.regParam, [0.03, 0.1])
        .addGrid(w2v.windowSize, [3, 5])
        .addGrid(w2v.vectorSize, [20, 30, 50])
        .build()
    )
    evaluator = BinaryClassificationEvaluator(labelCol=col_name)
    cv_model = CrossValidator(
        estimator=pipe,
        estimatorParamMaps=grid,
        evaluator=evaluator,
        parallelism=PROC_CNT
    )
    
    return cv_model

In [9]:
%%time
target_cols = [col for col in train_sdf.columns if not col in ["id", "comment_text"]]
pbar = tqdm(target_cols)
results = {}

for target_col in pbar:
    pbar.set_postfix_str(target_col)
    # Free memory
    spark.catalog.clearCache()
    model = get_cv_model(target_col)
    model = model.fit(train_sdf)
    results[target_col] = {
        'model': model,
        'score': max(model.avgMetrics),
    }

100%|██████████████████████████████████████████████████████| 6/6 [58:43<00:00, 587.22s/it, identity_hate]

CPU times: user 29.1 s, sys: 9.14 s, total: 38.2 s
Wall time: 58min 43s





In [10]:
for col in results:
    print(f"Label: {col}\nScore: {results[col]['score']}\n")

Label: toxic
Score: 0.9314225553711087

Label: severe_toxic
Score: 0.9763162168260412

Label: obscene
Score: 0.9539944046334209

Label: threat
Score: 0.9387554924296667

Label: insult
Score: 0.942575134980553

Label: identity_hate
Score: 0.9438553193847002



In [12]:
spark.stop()

Вывод: сравнительно с предыдущим подходом результаты улучшились примерно на 4%, не смотря на то что для экономии ресурсов использовалось кодирование w2v в вектора достаточно небольшой длины.