In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/jigsaw-toxic-comment-classification-challenge/sample_submission.csv
/kaggle/input/jigsaw-toxic-comment-classification-challenge/test_labels.csv
/kaggle/input/jigsaw-toxic-comment-classification-challenge/train.csv
/kaggle/input/jigsaw-toxic-comment-classification-challenge/test.csv


### Set configurations and libraries

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m18.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=18a114435ac5ea0192dca177a95d35676190ebd331982cc58dd1c2852e3ccbdf
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
  

In [4]:
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

In [5]:
configs = (SparkSession.builder
                  .appName('Toxic Comment Classification')
                  .enableHiveSupport()
                  .config("spark.executor.memory", "4G")
                  .config("spark.driver.memory","18G")
                  .config("spark.executor.cores","7")
                  .config("spark.python.worker.memory","4G")
                  .config("spark.driver.maxResultSize","0")
                  .config("spark.sql.crossJoin.enabled", "true")
                  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                  .config("spark.default.parallelism","2")
                  .getOrCreate())

configs.sparkContext.setLogLevel('ERROR')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/04 16:31:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Read dataframe to spark

In [6]:
pd_test_answear = pd.DataFrame()
def to_spark_df(fin):
    df = pd.read_csv(fin)
    df.fillna("", inplace=True)
    df_train = df.sample(frac=0.7)
    df_test = df.loc[~df.index.isin(df_train.index)]
    global pd_test_answear
    pd_test_answear = df_test.iloc[:,2:].copy()
    df_train = configs.createDataFrame(df_train)
    df_test = configs.createDataFrame(df_test.iloc[:,:2])
    return(df_train, df_test)

def convert_to_spark_df(fin):
    df = pd.read_csv(fin)
    df.fillna("", inplace=True)
    df = configs.createDataFrame(df)
    return df


train, test = to_spark_df("/kaggle/input/jigsaw-toxic-comment-classification-challenge/train.csv")
test_o = convert_to_spark_df("/kaggle/input/jigsaw-toxic-comment-classification-challenge/test.csv")

### TFIDF tokenization

In [178]:
tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
wordsData = tokenizer.transform(train)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
tf = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(tf) 
tfidf = idfModel.transform(tf)


                                                                                

### Logistic regression

In [179]:
lr = LogisticRegression(featuresCol="features", labelCol='toxic', regParam=0.1)
lrModel = lr.fit(tfidf)
res_train = lrModel.transform(tfidf)

                                                                                

In [180]:
test_tokens = tokenizer.transform(test)
test_tf = hashingTF.transform(test_tokens)
test_tfidf = idfModel.transform(test_tf)
test_res_log = test.select('id')
extract_prob = F.udf(lambda x: float(x[1]), T.FloatType())
# extract_prob = F.map(f)
focus_cols = list(filter(lambda x: x not in ["id", "comment_text"], train.columns))
for col in focus_cols:
    lr = LogisticRegression(featuresCol="features", labelCol=col, regParam=0.1)
    lrModel = lr.fit(tfidf)
    res = lrModel.transform(test_tfidf)
    test_res_log = test_res_log.join(res.select('id', 'probability'), on="id")
    test_res_log = test_res_log.withColumn(col, extract_prob('probability')).drop("probability")
    

                                                                                

### Linear regression

In [182]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol='toxic', regParam=0.1)
lrModel = lr.fit(tfidf)
res_train = lrModel.transform(tfidf)

                                                                                

In [183]:
test_tokens = tokenizer.transform(test)
test_tf = hashingTF.transform(test_tokens)
test_tfidf = idfModel.transform(test_tf)
test_res_lin = test.select('id')
extract_prob = F.udf(lambda x: float(x[1]), T.FloatType())
focus_cols = list(filter(lambda x: x not in ["id", "comment_text"], train.columns))
for col in focus_cols:
    lr = LinearRegression(featuresCol="features", labelCol=col, regParam=0.1)
    lrModel = lr.fit(tfidf)
    res = lrModel.transform(test_tfidf)
    test_res_lin = test_res_lin.join(res.select('id', 'prediction'), on="id")
    test_res_lin = test_res_lin.withColumnRenamed('prediction', col)

                                                                                

### Metrics TfIdf

numFeatures=20000

In [177]:
from sklearn.metrics import accuracy_score
y_true = pd_test_answear.toxic
f = lambda x: 1 if x > 0.5 else 0
df_test_log = test_res_log.toPandas()
print('Logistic accuracy', accuracy_score(y_true, df_test_log.toxic.map(f)))
df_test_lin = test_res_lin.toPandas()
print('Linear accuracy', accuracy_score(y_true, df_test_lin.toxic.map(f)))

                                                                                

Logistic accuracy 0.8907689415303629
Linear accuracy 0.8902049257379206


numFeatures=1000

In [184]:
df_test_log = test_res_log.toPandas()
print('Logistic accuracy', accuracy_score(y_true, df_test_log.toxic.map(f)))
df_test_lin = test_res_lin.toPandas()
print('Linear accuracy', accuracy_score(y_true, df_test_lin.toxic.map(f)))

                                                                                

Logistic accuracy 0.9020701468530008


                                                                                

Linear accuracy 0.9021119258005891


Given the nature of the algorithm, if numFeatures is less than the actual number of distinct words/tokens in the DataFrame you are guaranteed to have an 'incorrect' frequency for at least 1 token (i.e. different tokens will hash to the same bucket). Even with numFeatures >= vocabularySize collisions 'might' still happen.

### Word2vec approach

In [7]:
from pyspark.ml.feature import Word2Vec

In [8]:
tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
wordsData = tokenizer.transform(train)
w2v_tokenizer = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="rawFeatures")
w2v = w2v_tokenizer.fit(wordsData)
w2v_df = w2v.transform(wordsData)

                                                                                

In [9]:
lr = LogisticRegression(featuresCol="rawFeatures", labelCol='toxic', regParam=0.1)
lrModel = lr.fit(w2v_df)
res_train = lrModel.transform(w2v_df)


                                                                                

In [15]:
test_tokens = tokenizer.transform(test)
test_w2v = w2v.transform(test_tokens)
test_res_log = test.select('id')
focus_cols = list(filter(lambda x: x not in ["id", "comment_text"], train.columns))
for col in focus_cols:
    lr = LogisticRegression(featuresCol="rawFeatures", labelCol=col, regParam=0.1)
    lrModel = lr.fit(w2v_df)
    res = lrModel.transform(test_w2v)
    test_res_log = test_res_log.join(res.select('id', 'prediction'), on="id")
    test_res_log = test_res_log.withColumnRenamed('prediction', col)

                                                                                

### Metrics Word2Vec

In [18]:
df_test_log_w2v = test_res_log.toPandas()
print('Logistic accuracy w2v', accuracy_score(y_true, df_test_log_w2v.toxic.map(f)))

Logistic accuracy w2v 0.9038666415992981
