In [1]:
 %%html
<style>
div.output_area pre {
    white-space: pre;
}
</style>

In [2]:
from IPython.core.display import display, HTML, clear_output
display(HTML("<style>.container { width:85% !important; }</style>"))
display(HTML("<style>.prompt { min-width:10ex !important; }</style>"))
display(HTML("<style>div#notebook { font-size:14px !important; }</style>"))

In [3]:
import pandas as pd
import numpy as np

import os
import pyspark

In [4]:
import pyspark
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *

In [5]:
conf = SparkConf().set("spark.ui.port", "4050")

sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [6]:
spark

### Reading data

In [7]:
train = spark.read.csv(
    'train.csv',
    header=True,
    multiLine=True,
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    encoding="utf-8",
    sep=",",
    quote='"',
    escape='"',
    inferSchema=False,
)

In [9]:
test = spark.read.csv(
    'test.csv',
    header=True,
    multiLine=True,
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    encoding="utf-8",
    sep=",",
    quote='"',
    escape='"',
    inferSchema=False,
)

In [11]:
test_labels = spark.read.csv(
    'test_labels.csv',
    header=True,
    multiLine=True,
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    encoding="utf-8",
    sep=",",
    quote='"',
    escape='"',
    inferSchema=False,
)

### See data details

In [14]:
train.count()

159571

In [17]:
train.drop('comment_text').describe().show(20, False)

+-------+----------------+-------------------+--------------------+-----------------+--------------------+-------------------+-------------------+
|summary|id              |toxic              |severe_toxic        |obscene          |threat              |insult             |identity_hate      |
+-------+----------------+-------------------+--------------------+-----------------+--------------------+-------------------+-------------------+
|count  |159571          |159571             |159571              |159571           |159571              |159571             |159571             |
|mean   |Infinity        |0.09584448302009764|0.009995550569965721|0.052948217407925|0.002995531769557125|0.04936360616904074|0.00880485802558109|
|stddev |NaN             |0.2943787715999705 |0.09947714085748408 |0.223930832915411|0.05464958623142267 |0.2166267172768179 |0.09342048594149767|
|min    |0000997932d777bf|0                  |0                   |0                |0                   |0           

In [80]:
all_test = (
    test
    .join(
        test_labels,
        on='id',
        how='inner'
    )
    .where(F.col('toxic') != -1)
)

In [82]:
all_test.count()

63978

### Hashing TF with 100 numFeatures

In [83]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, StringIndexer
from pyspark.ml.classification import LogisticRegression

In [84]:
regexTokenizer = RegexTokenizer(inputCol="comment_text", outputCol="words", pattern="\\W")

# стоп-слова
add_stopwords = ["http","https","amp","rt","t","c","the"]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

In [85]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=100)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf])

In [86]:
all_models = {}
predicted = {}
label_columns = [
    'toxic',
    'severe_toxic',
    'obscene',
    'threat',
    'insult',
    'identity_hate',
]

In [87]:
for col in label_columns:
    train = train.withColumn(col, F.col(col).cast(IntegerType()))

In [89]:
for col in label_columns:
    all_test = all_test.withColumn(col, F.col(col).cast(IntegerType()))

In [97]:
pipelineFit = pipeline.fit(train)
train_df = pipelineFit.transform(train)

In [99]:
test_df = pipelineFit.transform(all_test)

In [101]:
for col in label_columns:
    print(f'Processing {col}')
    lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0,
                            featuresCol='features', labelCol=col, predictionCol=f'{col}_pred', 
                            rawPredictionCol=f'raw_{col}_pred', probabilityCol=f'{col}_prob')
    lrModel = lr.fit(train_df)

    test_df = lrModel.transform(test_df)
    predicted[col] = test_df
    all_models[col] = lrModel

Processing toxic
Processing severe_toxic
Processing obscene
Processing threat
Processing insult
Processing identity_hate


In [102]:
pred_cols = [col for col in test_df.columns if 'pred' in col and 'raw' not in col]

In [103]:
test_df.select(*pred_cols).describe().show(20, False)

+-------+---------------------+---------------------+--------------------+-----------+--------------------+------------------+
|summary|toxic_pred           |severe_toxic_pred    |obscene_pred        |threat_pred|insult_pred         |identity_hate_pred|
+-------+---------------------+---------------------+--------------------+-----------+--------------------+------------------+
|count  |63978                |63978                |63978               |63978      |63978               |63978             |
|mean   |4.5328081528025257E-4|4.6891118822095096E-5|1.71934102347682E-4 |0.0        |1.406733564662853E-4|0.0               |
|stddev |0.02128573310060443  |0.006847601985208643 |0.013111339671681809|0.0        |0.011859838362100266|0.0               |
|min    |0.0                  |0.0                  |0.0                 |0.0        |0.0                 |0.0               |
|max    |1.0                  |1.0                  |1.0                 |0.0        |1.0                 |0.0 

In [104]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [105]:
metrics = {}

In [108]:
for col in label_columns:
    evaluator = BinaryClassificationEvaluator(rawPredictionCol=f'raw_{col}_pred', labelCol=col)
    metrics[col] = evaluator.evaluate(test_df)

In [109]:
metrics

{'toxic': 0.7541931801265525,
 'severe_toxic': 0.8575037811781477,
 'obscene': 0.7582951685942847,
 'threat': 0.8149224698894523,
 'insult': 0.754035473968106,
 'identity_hate': 0.7485780454524604}

### Hashing TF with 1000 numFeatures

In [110]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf])

In [111]:
all_models = {}
predicted = {}
label_columns = [
    'toxic',
    'severe_toxic',
    'obscene',
    'threat',
    'insult',
    'identity_hate',
]

In [112]:
pipelineFit = pipeline.fit(train)
train_df = pipelineFit.transform(train)

In [113]:
test_df = pipelineFit.transform(all_test)

In [114]:
for col in label_columns:
    print(f'Processing {col}')
    lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0,
                            featuresCol='features', labelCol=col, predictionCol=f'{col}_pred', 
                            rawPredictionCol=f'raw_{col}_pred', probabilityCol=f'{col}_prob')
    lrModel = lr.fit(train_df)

    test_df = lrModel.transform(test_df)
    predicted[col] = test_df
    all_models[col] = lrModel

Processing toxic
Processing severe_toxic
Processing obscene
Processing threat
Processing insult
Processing identity_hate


In [115]:
metrics = {}

In [116]:
for col in label_columns:
    evaluator = BinaryClassificationEvaluator(rawPredictionCol=f'raw_{col}_pred', labelCol=col)
    metrics[col] = evaluator.evaluate(test_df)

In [117]:
metrics

{'toxic': 0.8289137931601808,
 'severe_toxic': 0.8579595272474665,
 'obscene': 0.8361701492190003,
 'threat': 0.9138637279663775,
 'insult': 0.8342956917328159,
 'identity_hate': 0.8491797229781038}

In [118]:
test_df.select(*pred_cols).describe().show(20, False)

+-------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|toxic_pred           |severe_toxic_pred   |obscene_pred        |threat_pred         |insult_pred         |identity_hate_pred  |
+-------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|count  |63978                |63978               |63978               |63978               |63978               |63978               |
|mean   |0.0014848854293663446|1.71934102347682E-4 |8.440401387977117E-4|3.126074588139673E-5|7.502579011535215E-4|9.378223764419019E-5|
|stddev |0.03850589201460077  |0.013111339671681745|0.029040332587194247|0.005591087305403376|0.027380773042978507|0.009683744537774713|
|min    |0.0                  |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |
|max    |1.0                  |1.0       

##### Делаем вывод, что чем больше numFeatures, тем больше качество модели

### Word2Vec

In [119]:
from pyspark.ml.feature import Word2Vec

In [120]:
w2v = Word2Vec(inputCol="filtered", outputCol="features", vectorSize=200)

In [121]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, w2v])

In [122]:
all_models = {}
predicted = {}
label_columns = [
    'toxic',
    'severe_toxic',
    'obscene',
    'threat',
    'insult',
    'identity_hate',
]

In [123]:
pipelineFit = pipeline.fit(train)
train_df = pipelineFit.transform(train)

In [124]:
test_df = pipelineFit.transform(all_test)

In [125]:
for col in label_columns:
    print(f'Processing {col}')
    lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0,
                            featuresCol='features', labelCol=col, predictionCol=f'{col}_pred', 
                            rawPredictionCol=f'raw_{col}_pred', probabilityCol=f'{col}_prob')
    lrModel = lr.fit(train_df)

    test_df = lrModel.transform(test_df)
    predicted[col] = test_df
    all_models[col] = lrModel

Processing toxic
Processing severe_toxic
Processing obscene
Processing threat
Processing insult
Processing identity_hate


In [126]:
metrics = {}

In [127]:
for col in label_columns:
    evaluator = BinaryClassificationEvaluator(rawPredictionCol=f'raw_{col}_pred', labelCol=col)
    metrics[col] = evaluator.evaluate(test_df)

In [128]:
metrics

{'toxic': 0.9325592478108421,
 'severe_toxic': 0.9704686227858818,
 'obscene': 0.9481559391987501,
 'threat': 0.9521529692258611,
 'insult': 0.9411520293236308,
 'identity_hate': 0.9506073895416445}

In [129]:
spark.stop()

### Comparison

##### HashigTF (100)
'toxic': 0.7541931801265525    
'severe_toxic': 0.8575037811781477    
'obscene': 0.7582951685942847    
'threat': 0.8149224698894523    
'insult': 0.754035473968106    
'identity_hate': 0.748578045452460    

##### HashigTF(1000)
'toxic': 0.8289137931601808    
'severe_toxic': 0.8579595272474665    
'obscene': 0.8361701492190003    
'threat': 0.9138637279663775    
'insult': 0.8342956917328159    
'identity_hate': 0.849179722978103    

#### Word2Vec
'toxic': 0.9325592478108421   
'severe_toxic': 0.9704686227858818  
'obscene': 0.9481559391987501  
'threat': 0.9521529692258611  
'insult': 0.9411520293236308   
'identity_hate': 0.9506073895416445  

Используя word2vec удалось достичь большего качества, хотя по времени обучение происходило дольше. Если поиграться с настройками HashingTF и попробовать на большем numFeatures, возможно удалось бы достичь приближенного результата.