In [1]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 pyspark-shell'
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [2]:
spark

## Toxic Comment Classification Challenge

In [22]:
from pyspark.sql.types import *

In [23]:
schema = StructType([
    StructField("id", StringType()),
    StructField("comment_text", StringType()),
    StructField("toxic", IntegerType()),
    StructField("severe_toxic", IntegerType()),
    StructField("obscene", IntegerType()),
    StructField("threat", IntegerType()),
    StructField("insult", IntegerType()),
    StructField("identity_hate", IntegerType())
])

In [25]:
!hdfs dfs -head /data/train.csv

"id","comment_text","toxic","severe_toxic","obscene","threat","insult","identity_hate"
"0000997932d777bf","Explanation
Why the edits made under my username Hardcore Metallica Fan were reverted? They weren't vandalisms, just closure on some GAs after I voted at New York Dolls FAC. And please don't remove the template from the talk page since I'm retired now.89.205.38.27",0,0,0,0,0,0
"000103f0d9cfb60f","D'aww! He matches this background colour I'm seemingly stuck with. Thanks.  (talk) 21:51, January 11, 2016 (UTC)",0,0,0,0,0,0
"000113f07ec002fd","Hey man, I'm really not trying to edit war. It's just that this guy is constantly removing relevant information and talking to me through edits instead of my talk page. He seems to care more about the formatting than the actual info.",0,0,0,0,0,0
"0001b41b1c6bb37e","""
More
I can't make any real suggestions on improvement - I wondered if the section statistics should be later on, or a subsection of """"types of accidents""""  -I think the refere

In [31]:
dataset = spark.read.csv("/data/train.csv",
                         schema=schema, header=True, multiLine=True, escape='"')

In [32]:
dataset.select("id").show(10)

+----------------+
|              id|
+----------------+
|0000997932d777bf|
|000103f0d9cfb60f|
|000113f07ec002fd|
|0001b41b1c6bb37e|
|0001d958c54c6e35|
|00025465d4725e87|
|0002bcb3da6cb337|
|00031b1e95af7921|
|00037261f536c51d|
|00040093b2687caa|
+----------------+
only showing top 10 rows



In [33]:
dataset.show(2, vertical=True, truncate=False)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id            | 0000997932d777bf                                                                                                                                                                                                                                                         
 comment_text  | Explanation
Why the edits made under my username Hardcore Metallica Fan were reverted? They weren't vandalisms, just closure on some GAs after I voted at New York Dolls FAC. And please don't remove the template from the talk page since I'm retired now.89.205.38.27 
 toxic         | 0                                                                                                                                     

In [34]:
dataset.rdd.getNumPartitions()

1

In [35]:
dataset.count()

159571

In [36]:
dataset = dataset.repartition(4)

In [37]:
dataset.count()

159571

## Set target (toxic/non-toxic)

In [41]:
from pyspark.sql import functions as f

In [42]:
target = f.when(
    (dataset.toxic == 0) &
    (dataset.severe_toxic == 0) &
    (dataset.obscene == 0) &
    (dataset.threat == 0) &
    (dataset.insult == 0) &
    (dataset.identity_hate == 0),
    0
).otherwise(1)

In [43]:
dataset = dataset.withColumn("target", target)

In [44]:
dataset.select("id", "target").show(20)

+----------------+------+
|              id|target|
+----------------+------+
|d7dda6819645adde|     0|
|b63771a6dd0bc63c|     0|
|50d5173d2859f0f6|     0|
|9b37a5f022dbf83d|     0|
|1b3a6936a549fb69|     0|
|a5500eb5e6fd55e1|     0|
|b2b0261c102976ea|     0|
|63d49f7a1ffb31e8|     0|
|65900680c3e6830c|     0|
|30b38494a74d3b93|     0|
|ce1e5dea4c45912d|     0|
|a82c1a417bc9a127|     0|
|cb8c35fe680e2922|     0|
|94691a813924fe0f|     0|
|f6123cf9383dc725|     1|
|80f80c982361b92d|     1|
|034f7ba2351c5a38|     0|
|6c8ed3b4acadab3c|     0|
|d341af87710d4e0c|     0|
|ba87d5df3f47429e|     0|
+----------------+------+
only showing top 20 rows



In [45]:
dataset.filter(f.col("id") == "901d7783bca3d63f").show(2, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------
 id            | 901d7783bca3d63f                                     
 comment_text  | Use one that doesn't round numbers up then, asshole. 
 toxic         | 1                                                    
 severe_toxic  | 0                                                    
 obscene       | 1                                                    
 threat        | 0                                                    
 insult        | 1                                                    
 identity_hate | 0                                                    
 target        | 1                                                    



In [46]:
dataset.groupBy("target").count().collect()

[Row(target=1, count=16225), Row(target=0, count=143346)]

In [47]:
16225 / (16225 + 143346)

0.10167887648758234

In [48]:
dataset = dataset.drop("toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate").cache()

In [49]:
dataset.count()

159571

In [50]:
dataset.show(2, False, True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id           | 6fdb7b6734f8bf40                                                                                                                                                                                                                                                                                                                    
 comment_text | "

""Katara""
I've removed the section entirely. I don't care if you like to pretend that Katara and Zuko are meant for each other. It's still not case, and there has been no indication whatsoever. Thus, there's little point to actually have the section and exempt Toph and Sokka beyond the insane delu

In [52]:
dataset.write.parquet("/user/stanislav.volkov/dataset", mode="overwrite")

## train logistic regression 

In [53]:
from pyspark.ml.feature import *

## NLP in pyspark

In [54]:
tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")

In [55]:
dataset2 = tokenizer.transform(dataset)

In [56]:
dataset2.select("id", "words").show(2, False, True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id    | 6fdb7b6734f8bf40                                                                                                                                                                                                                                                                                                                                                                               
 words | [", , ""katara"", i've, removed, the, section, entirely., i, don't, care, if, you, like, to, pretend, that, katara, and, zuko, are, meant, for, each, other., it's, still, not, case,, and, there, has, been,

In [57]:
dataset2.take(1)

[Row(id='6fdb7b6734f8bf40', comment_text='"\n\n""Katara""\nI\'ve removed the section entirely. I don\'t care if you like to pretend that Katara and Zuko are meant for each other. It\'s still not case, and there has been no indication whatsoever. Thus, there\'s little point to actually have the section and exempt Toph and Sokka beyond the insane delusions of shippers.  "', target=0, words=['"', '', '""katara""', "i've", 'removed', 'the', 'section', 'entirely.', 'i', "don't", 'care', 'if', 'you', 'like', 'to', 'pretend', 'that', 'katara', 'and', 'zuko', 'are', 'meant', 'for', 'each', 'other.', "it's", 'still', 'not', 'case,', 'and', 'there', 'has', 'been', 'no', 'indication', 'whatsoever.', 'thus,', "there's", 'little', 'point', 'to', 'actually', 'have', 'the', 'section', 'and', 'exempt', 'toph', 'and', 'sokka', 'beyond', 'the', 'insane', 'delusions', 'of', 'shippers.', '', '"'])]

In [58]:
type(dataset2.take(1))

list

In [59]:
type(dataset2.take(1)[0])

pyspark.sql.types.Row

In [60]:
type(dataset2.take(1)[0].words)

list

In [61]:
dataset2.take(1)[0].words

['"',
 '',
 '""katara""',
 "i've",
 'removed',
 'the',
 'section',
 'entirely.',
 'i',
 "don't",
 'care',
 'if',
 'you',
 'like',
 'to',
 'pretend',
 'that',
 'katara',
 'and',
 'zuko',
 'are',
 'meant',
 'for',
 'each',
 'other.',
 "it's",
 'still',
 'not',
 'case,',
 'and',
 'there',
 'has',
 'been',
 'no',
 'indication',
 'whatsoever.',
 'thus,',
 "there's",
 'little',
 'point',
 'to',
 'actually',
 'have',
 'the',
 'section',
 'and',
 'exempt',
 'toph',
 'and',
 'sokka',
 'beyond',
 'the',
 'insane',
 'delusions',
 'of',
 'shippers.',
 '',
 '"']

## hashing trick

In [62]:
hasher = HashingTF(numFeatures=100, binary=True, inputCol=tokenizer.getOutputCol(), outputCol="word_vector")
dataset2 = hasher.transform(dataset2)

In [63]:
dataset2.select("id", "word_vector").show(2, False, True)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id          | 6fdb7b6734f8bf40                                                                                                                                                                                                                                                                 
 word_vector | (100,[0,3,5,10,12,13,18,19,20,25,26,28,29,30,31,33,35,36,37,38,42,43,46,47,58,59,60,63,70,72,80,82,85,88,91,96,98,99],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) 
-RECORD 1----------------------------------------------------------------------------------------------------------------------------

In [65]:
hasher_freq = HashingTF(numFeatures=100, binary=False, inputCol=tokenizer.getOutputCol(), outputCol="word_vector_freq")
dataset2_freq = hasher_freq.transform(dataset2)

In [66]:
dataset2_freq.select("id", "word_vector_freq").show(2, False, True)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id               | 6fdb7b6734f8bf40                                                                                                                                                                                                                                                                 
 word_vector_freq | (100,[0,3,5,10,12,13,18,19,20,25,26,28,29,30,31,33,35,36,37,38,42,43,46,47,58,59,60,63,70,72,80,82,85,88,91,96,98,99],[2.0,1.0,1.0,5.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,4.0,1.0,1.0,2.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,2.0]) 
-RECORD 1-------------------------------------------------------------------------------------------------------------

## Train Test split dataset

In [67]:
train = dataset2.sampleBy("target", fractions={0: 0.8, 1: 0.8}, seed=5757)

In [68]:
train.groupby("target").count().collect()

[Row(target=1, count=12906), Row(target=0, count=114769)]

In [69]:
12956 / (12956 + 114769)

0.10143668036797808

In [70]:
test = dataset2.join(train, on="id", how="leftanti")

In [71]:
test.groupby("target").count().collect()

[Row(target=1, count=3319), Row(target=0, count=28577)]

In [72]:
3319 / (3319 + 28577)

0.10405693503887635

In [73]:
train.rdd.getNumPartitions()

4

In [74]:
test.rdd.getNumPartitions()

200

In [75]:
train

DataFrame[id: string, comment_text: string, target: int, words: array<string>, word_vector: vector]

In [76]:
train = train.drop("comment_text", "words").cache()

In [77]:
test = test.drop("comment_text", "words").coalesce(4).cache()

### logistic regression

In [78]:
from pyspark.ml.classification import LogisticRegression

In [79]:
lr = LogisticRegression(featuresCol=hasher.getOutputCol(), labelCol="target", maxIter=15)

In [80]:
lr_model = lr.fit(train)

In [81]:
lr_model

LogisticRegressionModel: uid = LogisticRegression_43aa6ddceed6, numClasses = 2, numFeatures = 100

In [82]:
predictions = lr_model.transform(test)

In [83]:
predictions

DataFrame[id: string, target: int, word_vector: vector, rawPrediction: vector, probability: vector, prediction: double]

In [84]:
predictions.select("id", "target", "prediction", "probability", "rawPrediction").show(5, False, True)

-RECORD 0--------------------------------------------------
 id            | 00b4fb897de56d6d                          
 target        | 0                                         
 prediction    | 0.0                                       
 probability   | [0.8219752022441963,0.17802479775580368]  
 rawPrediction | [1.5297873731912435,-1.5297873731912435]  
-RECORD 1--------------------------------------------------
 id            | 016f929ca882f152                          
 target        | 0                                         
 prediction    | 0.0                                       
 probability   | [0.9682786314416439,0.031721368558356056] 
 rawPrediction | [3.4185293474532754,-3.4185293474532754]  
-RECORD 2--------------------------------------------------
 id            | 01f29c98adb73328                          
 target        | 0                                         
 prediction    | 0.0                                       
 probability   | [0.9283264668215503,0.0

In [85]:
predictions.select("target", f.col("prediction").cast("int")).filter("target == prediction").count()

28612

In [86]:
predictions.count()

31896

In [87]:
print("Accuracy is {}".format(28612 / 31896))

Accuracy is 0.8970403812390269


In [88]:
predictions.select("target", f.col("prediction").cast("int"))\
           .filter((f.col("target") == 1) & (f.col("prediction") == f.col("target")))\
           .count()

123

In [89]:
predictions_pd = predictions.select("target", f.col("prediction").cast("int")).toPandas()

In [90]:
predictions_pd.head()

Unnamed: 0,target,prediction
0,0,0
1,0,0
2,0,0
3,0,0
4,1,0


In [91]:
lr.getOrDefault("threshold")

0.5

In [92]:
from sklearn.metrics import classification_report, precision_score

In [93]:
print(classification_report(predictions_pd.target, predictions_pd.prediction))

              precision    recall  f1-score   support

           0       0.90      1.00      0.95     28577
           1       0.58      0.04      0.07      3319

    accuracy                           0.90     31896
   macro avg       0.74      0.52      0.51     31896
weighted avg       0.87      0.90      0.85     31896



In [99]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [100]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="target", metricName="accuracy")

In [101]:
evaluator.evaluate(predictions)

0.8970403812390269

In [102]:
evaluator = evaluator.setMetricName("weightedPrecision")

In [103]:
evaluator.evaluate(predictions)

0.8662299348750517

In [104]:
evaluator = evaluator.setMetricName("weightedRecall")

In [105]:
evaluator.evaluate(predictions)

0.8970403812390269

## Wrap on pipeline

In [106]:
dataset = spark.read.parquet("/data/dataset")

In [107]:
dataset

DataFrame[id: string, comment_text: string, target: int]

In [108]:
dataset.rdd.getNumPartitions()

4

In [109]:
from pyspark.ml import Pipeline

In [110]:
pipeline = Pipeline(stages=[
    tokenizer,
    hasher,
    lr
])

In [111]:
train = dataset.sampleBy("target", fractions={0: 0.8, 1: 0.8}).cache()

In [112]:
test = dataset.join(train, on="id", how="leftanti").cache()

In [113]:
pipeline_model = pipeline.fit(train)

In [114]:
pipeline_model

PipelineModel_1c3cdd58b818

In [115]:
predictions = pipeline_model.transform(test)

In [116]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="target", metricName='areaUnderROC')

In [117]:
evaluator.evaluate(predictions)

0.517563489329928

## Train GBTClassifier

In [118]:
from pyspark.ml.classification import GBTClassifier

In [119]:
gbt = GBTClassifier(featuresCol=hasher.getOutputCol(), labelCol="target", maxIter=15)

In [120]:
pipeline = Pipeline(stages=[
    tokenizer,
    hasher,
    gbt
])

In [121]:
pipeline_model = pipeline.fit(train)

In [122]:
predictions = pipeline_model.transform(test)

In [123]:
evaluator.evaluate(predictions)

0.503875266050001

## Drop stopwords

In [129]:
stop_words = StopWordsRemover.loadDefaultStopWords("english")

In [130]:
stop_words

['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 'your',
 'yours',
 'yourself',
 'yourselves',
 'he',
 'him',
 'his',
 'himself',
 'she',
 'her',
 'hers',
 'herself',
 'it',
 'its',
 'itself',
 'they',
 'them',
 'their',
 'theirs',
 'themselves',
 'what',
 'which',
 'who',
 'whom',
 'this',
 'that',
 'these',
 'those',
 'am',
 'is',
 'are',
 'was',
 'were',
 'be',
 'been',
 'being',
 'have',
 'has',
 'had',
 'having',
 'do',
 'does',
 'did',
 'doing',
 'a',
 'an',
 'the',
 'and',
 'but',
 'if',
 'or',
 'because',
 'as',
 'until',
 'while',
 'of',
 'at',
 'by',
 'for',
 'with',
 'about',
 'against',
 'between',
 'into',
 'through',
 'during',
 'before',
 'after',
 'above',
 'below',
 'to',
 'from',
 'up',
 'down',
 'in',
 'out',
 'on',
 'off',
 'over',
 'under',
 'again',
 'further',
 'then',
 'once',
 'here',
 'there',
 'when',
 'where',
 'why',
 'how',
 'all',
 'any',
 'both',
 'each',
 'few',
 'more',
 'most',
 'other',
 'some',
 'such',
 'no',
 'nor',
 '

In [132]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="words_filtered", stopWords=stop_words)

In [133]:
hasher = HashingTF(numFeatures=1000, binary=True, inputCol=swr.getOutputCol(), outputCol="word_vector")

In [134]:
pipeline = Pipeline(stages=[
    tokenizer,
    swr,
    hasher,
    lr
])

In [135]:
pipeline_model = pipeline.fit(train)

In [136]:
pipeline_model.stages

[Tokenizer_1754c9eb1a31,
 StopWordsRemover_aff2fe31c015,
 HashingTF_e8c73cda5907,
 LogisticRegressionModel: uid = LogisticRegression_43aa6ddceed6, numClasses = 2, numFeatures = 1000]

In [137]:
predictions = pipeline_model.transform(test)

In [138]:
evaluator.evaluate(predictions)

0.6421312094465557

## Featute Engineering

In [139]:
import pyspark.sql.functions as f

In [140]:
dataset.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- target: integer (nullable = true)



In [141]:
dataset = dataset.withColumn("comment_length", f.length(dataset.comment_text))

In [142]:
train = dataset.sampleBy("target", fractions={0: 0.8, 1: 0.8}).cache()
test = dataset.join(train, on="id", how="leftanti").cache()

In [143]:
train

DataFrame[id: string, comment_text: string, target: int, comment_length: int]

In [144]:
assembler = VectorAssembler(inputCols=[hasher.getOutputCol(), "comment_length"], outputCol="features")

In [145]:
lr = LogisticRegression(labelCol="target", maxIter=15)

In [146]:
pipeline = Pipeline(stages=[
    tokenizer,
    swr,
    hasher,
    assembler,
    lr
])

In [147]:
pipeline_model = pipeline.fit(train)

In [148]:
pipeline_model.stages

[Tokenizer_1754c9eb1a31,
 StopWordsRemover_aff2fe31c015,
 HashingTF_e8c73cda5907,
 VectorAssembler_c0d450754c81,
 LogisticRegressionModel: uid = LogisticRegression_3f0d96417684, numClasses = 2, numFeatures = 1001]

In [149]:
predictions = pipeline_model.transform(test)

In [150]:
evaluator.evaluate(predictions)

0.6483890430396612

In [151]:
pipeline_model.stages[-1].coefficients[-1]

0.0007232870033029963

## В любом случае, у нас еще не иссякли идеи)

In [152]:
count_vectorizer = CountVectorizer(inputCol=swr.getOutputCol(), outputCol="word_vector", binary=True)

In [153]:
assembler = VectorAssembler(inputCols=[count_vectorizer.getOutputCol(), "comment_length"], outputCol="features")

In [154]:
pipeline = Pipeline(stages=[
    tokenizer,
    swr,
    count_vectorizer,
    assembler,
    lr
])

In [155]:
pipeline_model = pipeline.fit(train)

In [156]:
predictions = pipeline_model.transform(test)

In [157]:
evaluator.evaluate(predictions)

0.8408287713365735

## Hyperparameter tuning

In [159]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [160]:
paramGrid = ParamGridBuilder().addGrid(count_vectorizer.vocabSize, [100, 500])\
                              .addGrid(lr.regParam, [0.01, 0.05])\
                              .build()

In [161]:
paramGrid

[{Param(parent='CountVectorizer_f03f497725c8', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.'): 100,
  Param(parent='LogisticRegression_3f0d96417684', name='regParam', doc='regularization parameter (>= 0).'): 0.01},
 {Param(parent='CountVectorizer_f03f497725c8', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.'): 100,
  Param(parent='LogisticRegression_3f0d96417684', name='regParam', doc='regularization parameter (>= 0).'): 0.05},
 {Param(parent='CountVectorizer_f03f497725c8', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.'): 500,
  Param(parent='LogisticRegression_3f0d96417684', name='regParam', doc='regularization parameter (>= 0).'): 0.01},
 {Param(parent='CountVectorizer_f03f497725c8', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.'): 500,
  Param(parent='LogisticRegression_3f0d96417684', name='regParam', doc='regularization parameter (>= 0).'): 0.05}]

In [162]:
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,
                          evaluator=evaluator, numFolds=3, parallelism=4)

In [163]:
cv_model = crossval.fit(train)

In [164]:
cv_model.avgMetrics

[0.5752475340153035, 0.5707769791166641, 0.67927483923572, 0.662005513278917]

In [165]:
cv_model.bestModel

PipelineModel_2c47c95b27cc

In [166]:
predictions = cv_model.transform(test)

In [167]:
evaluator.evaluate(predictions)

0.6849586581399666

In [168]:
spark.stop()