In [1]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
#!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [3]:
#!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [5]:
#!pip install -q findspark

In [6]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [35]:
data = spark.read.csv('/content/drive/MyDrive/mlbd/train.csv', header=True, multiLine=True, inferSchema=True, escape='"', sep=',', encoding = "utf8")

In [16]:
data.show(5)

+----------------+--------------------+-----+------------+-------+------+------+-------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
|0000997932d777bf|Explanation
Why t...|    0|           0|      0|     0|     0|            0|
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|
|000113f07ec002fd|Hey man, I'm real...|    0|           0|      0|     0|     0|            0|
|0001b41b1c6bb37e|"
More
I can't ma...|    0|           0|      0|     0|     0|            0|
|0001d958c54c6e35|You, sir, are my ...|    0|           0|      0|     0|     0|            0|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
only showing top 5 rows



### 1. HashingTF и IDF

In [38]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
words = tokenizer.transform(data)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures") 
tf = hashingTF.transform(words)

idf = IDF(inputCol="rawFeatures", outputCol="features")

idfModel = idf.fit(tf) 
tfidf = idfModel.transform(tf)

In [42]:
# split train/val data
train, val = tfidf.randomSplit([0.8, 0.2], seed = 42)

In [44]:
# check shape
print('Train shape:')
print((train.count(), len(train.columns)))
print('Val shape:')
print((val.count(), len(val.columns)))

Train shape:
(127502, 11)
Val shape:
(32069, 11)


##### Logistic regression

In [45]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol='toxic')
lrModel = lr.fit(train)
predictions = lrModel.transform(val)

In [53]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='toxic')
evaluator.evaluate(predictions)

0.9086863668360863

In [59]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

extract_prob = F.udf(lambda x: float(x[1]), T.FloatType())
(predictions.withColumn("proba", extract_prob("probability"))
 .select("proba", "prediction")
 .show())

+-------------+----------+
|        proba|prediction|
+-------------+----------+
|3.0304389E-13|       0.0|
|          1.0|       1.0|
|          1.0|       1.0|
|3.8015295E-30|       0.0|
|1.6330913E-23|       0.0|
|3.5991588E-20|       0.0|
|1.1098904E-27|       0.0|
| 2.8516146E-4|       0.0|
|2.6165605E-18|       0.0|
| 9.940433E-12|       0.0|
|2.0163649E-11|       0.0|
| 6.2884395E-9|       0.0|
|          1.0|       1.0|
|1.4364526E-23|       0.0|
|5.0249637E-15|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
| 2.379466E-29|       0.0|
| 1.2177003E-9|       0.0|
| 9.9396406E-5|       0.0|
+-------------+----------+
only showing top 20 rows



### Test prediction

In [60]:
# test data
test = spark.read.csv('/content/drive/MyDrive/mlbd/test.csv', header=True, multiLine=True, inferSchema=True, escape='"', sep=',', encoding = "utf8")
test_tokens = tokenizer.transform(test)
test_tf = hashingTF.transform(test_tokens)
test_tfidf = idfModel.transform(test_tf)

In [65]:
labels = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']
for label in labels:
    print('Start with label: ',label)
    lr = LogisticRegression(featuresCol="features", labelCol=label)
    lrModel = lr.fit(tfidf)
    res = lrModel.transform(test_tfidf)
    test_res = test_res.join(res.select('id', 'probability'), on="id")
    test_res = test_res.withColumn(label, extract_prob('probability')).drop("probability")
    print('Prediction added')
test_res.show(5)

Start with label:  toxic
Prediction added
Start with label:  severe_toxic
Prediction added
Start with label:  obscene
Prediction added
Start with label:  threat
Prediction added
Start with label:  insult
Prediction added
Start with label:  identity_hate
Prediction added
+----------------+------------+------------+-------------+-------------+-------------+-------------+
|              id|       toxic|severe_toxic|      obscene|       threat|       insult|identity_hate|
+----------------+------------+------------+-------------+-------------+-------------+-------------+
|000968ce11f5ee34|1.8862696E-9|5.568692E-10| 0.0026673032|1.8312777E-11| 6.9151815E-7|1.5654224E-11|
|00491682330fdd1d|         0.0|         0.0|          0.0|          0.0|          0.0|          0.0|
|008eb47c4684d190|         1.0|         0.0|          0.0|          0.0|          0.0|          0.0|
|00d251f47486b6d2|1.5149961E-8| 1.391695E-9|1.9929523E-11|2.7775447E-12|  2.732317E-4| 1.5385221E-7|
|0114ae82c53101a9|   0

### 2. Word2Vec   
не сделано