### Simple Text Processing and Classification with Apache Spark
---
The aim of this notebook is to practise basic text processing using the Apache Spark with the use of the toxic comment text classification dataset. The machine learning and text processing used here are at a poor standard. The goal was mainly to convert the column `comment_text` into a column of sparse vectors for use in a classification algorithm in the spark `ml` library.  

The `pyspark.ml` library is used for machine learning with Spark DataFrames. For machine learning with Spark RDDs use the `pyspark.mllib` library. 

In [96]:
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SQLContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
# from pyspark import SparkContext
# sc =SparkContext()
# sqlContext = SQLContext(sc)
from nltk.corpus import stopwords

In [97]:
# Build a spark context
hc = (SparkSession.builder
                  .appName('Toxic Comment Classification')
                  .enableHiveSupport()
                  .config("spark.executor.memory", "4G")
                  .config("spark.driver.memory","18G")
                  .config("spark.executor.cores","7")
                  .config("spark.python.worker.memory","4G")
                  .config("spark.driver.maxResultSize","0")
                  .config("spark.sql.crossJoin.enabled", "true")
                  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                  .config("spark.default.parallelism","2")
                  .getOrCreate())

In [98]:
hc.sparkContext.setLogLevel('INFO')

In [99]:
hc.version
sqlContext = SQLContext(hc)

Unfortunately, as much as I love the addition of the csv reader in Spark version 2+ and the databricks spark-csv package, I was unable to use the packages to parse a multiline multi-character quoted record in a csv. As a result, I loaded the data into a DataFrame using Pandas, and then I converted the Pandas DataFrame to a Spark DataFrame.

In [100]:
def to_spark_df(fin):
    """
    Parse a filepath to a spark dataframe using the pandas api.
    
    Parameters
    ----------
    fin : str
        The path to the file on the local filesystem that contains the csv data.
        
    Returns
    -------
    df : pyspark.sql.dataframe.DataFrame
        A spark DataFrame containing the parsed csv data.
    """
    df = pd.read_csv(fin)
    df.fillna("", inplace=True)
    df = hc.createDataFrame(df)
    return(df)

# Load the train-test sets
# train = to_spark_df("../input/train.csv")
# test = to_spark_df("../input/test.csv")
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('articles-train.csv')
drop_list = ['Dates', 'Topic', 'Page']
train = train.select([column for column in train.columns if column not in drop_list])
train.show(5)

+--------+--------------------+
|Category|                Body|
+--------+--------------------+
|business|SEOUL  WITH A FAL...|
|business|JEFFERSON CITY  M...|
|business|WASHINGTON  THE T...|
|business|REUTERS    METLIF...|
|  sports|DALLAS  WHEN DALL...|
+--------+--------------------+
only showing top 5 rows



In [101]:
out_cols = [i for i in train.columns if i not in ["Category", "Body"]]

In [102]:
# Sadly the output is not as  pretty as the pandas.head() function
train.show(5)

+--------+--------------------+
|Category|                Body|
+--------+--------------------+
|business|SEOUL  WITH A FAL...|
|business|JEFFERSON CITY  M...|
|business|WASHINGTON  THE T...|
|business|REUTERS    METLIF...|
|  sports|DALLAS  WHEN DALL...|
+--------+--------------------+
only showing top 5 rows



In [103]:
# View some toxic comments
# train.filter(F.col('toxic') == 1).show(5)

In [104]:
# Basic sentence tokenizer
tokenizer = Tokenizer(inputCol="Body", outputCol="words")
wordsData = tokenizer.transform(train)

In [105]:
# Count the words in a document
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
tf = hashingTF.transform(wordsData)

In [106]:
tf.select('rawFeatures').take(2)

[Row(rawFeatures=SparseVector(262144, {619: 1.0, 4869: 1.0, 5377: 1.0, 5381: 1.0, 6051: 1.0, 8630: 1.0, 9639: 6.0, 10625: 1.0, 11689: 3.0, 15664: 1.0, 15773: 1.0, 15889: 1.0, 16332: 4.0, 17349: 1.0, 19153: 1.0, 19208: 1.0, 19862: 1.0, 22939: 2.0, 24145: 1.0, 24176: 1.0, 24417: 1.0, 24661: 1.0, 24980: 1.0, 25000: 1.0, 25217: 2.0, 25551: 1.0, 25570: 2.0, 29945: 1.0, 30905: 1.0, 31463: 1.0, 33182: 1.0, 34140: 1.0, 35584: 1.0, 42438: 1.0, 44548: 1.0, 47484: 1.0, 48448: 1.0, 49213: 1.0, 49936: 1.0, 50293: 1.0, 50940: 4.0, 52206: 1.0, 52805: 1.0, 52914: 1.0, 53049: 1.0, 56614: 2.0, 59853: 1.0, 61951: 1.0, 63295: 1.0, 66458: 1.0, 66980: 1.0, 71002: 1.0, 71524: 1.0, 71826: 1.0, 72609: 1.0, 74383: 1.0, 77113: 1.0, 77275: 1.0, 80689: 1.0, 80848: 1.0, 81566: 1.0, 82111: 2.0, 87478: 1.0, 89188: 1.0, 90138: 1.0, 90757: 2.0, 91028: 1.0, 91677: 4.0, 91878: 1.0, 94533: 3.0, 95402: 1.0, 95477: 1.0, 95975: 1.0, 96638: 1.0, 96822: 1.0, 99500: 1.0, 100258: 2.0, 100604: 1.0, 101160: 1.0, 101169: 1.0, 10383

In [107]:
# Build the idf model and transform the original token frequencies into their tf-idf counterparts
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(tf) 
tfidf = idfModel.transform(tf)

In [108]:
tfidf.select("features").first()

Row(features=SparseVector(262144, {619: 2.2713, 4869: 3.1033, 5377: 3.5849, 5381: 0.5466, 6051: 2.1258, 8630: 1.9813, 9639: 0.1014, 10625: 5.9544, 11689: 14.7254, 15664: 0.6228, 15773: 3.2413, 15889: 0.1185, 16332: 0.2657, 17349: 4.7352, 19153: 2.0897, 19208: 0.7155, 19862: 2.8669, 22939: 11.6227, 24145: 1.4903, 24176: 4.4589, 24417: 0.7848, 24661: 3.792, 24980: 0.6546, 25000: 1.3235, 25217: 2.3727, 25551: 0.9175, 25570: 0.4592, 29945: 0.5483, 30905: 2.614, 31463: 1.894, 33182: 1.2852, 34140: 2.195, 35584: 3.2413, 42438: 4.3147, 44548: 6.5735, 47484: 3.9043, 48448: 0.0994, 49213: 4.3763, 49936: 7.1331, 50293: 3.5706, 50940: 0.6777, 52206: 1.8927, 52805: 2.3709, 52914: 1.141, 53049: 4.494, 56614: 8.0615, 59853: 0.4661, 61951: 2.0501, 63295: 4.4589, 66458: 5.575, 66980: 0.5346, 71002: 2.8495, 71524: 1.638, 71826: 2.9136, 72609: 2.0062, 74383: 1.9222, 77113: 5.6862, 77275: 6.2168, 80689: 4.7817, 80848: 3.438, 81566: 0.7914, 82111: 1.2569, 87478: 1.2095, 89188: 3.4318, 90138: 3.5565, 90757

Do a test first to practise with the LogisticRegression class. I like to create instances of objects first tocheck their methods and docstrings and figure out how to access data.

Build a logistic regression model for the binary toxic column.
Use the features column (the tfidf values) as the input vectors, `X`, and the toxic column as output vector, `y`.

In [109]:
REG = 0.1
label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")
pipeline = Pipeline(stages=[label_stringIdx])
pipelineFit = pipeline.fit(tfidf)
dataset = pipelineFit.transform(tfidf)

In [110]:
lr = LogisticRegression(featuresCol="features", labelCol='label', regParam=REG)


In [111]:
dataset.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+-----+
|Category|                Body|               words|         rawFeatures|            features|label|
+--------+--------------------+--------------------+--------------------+--------------------+-----+
|business|SEOUL  WITH A FAL...|[seoul, , with, a...|(262144,[619,4869...|(262144,[619,4869...|  0.0|
|business|JEFFERSON CITY  M...|[jefferson, city,...|(262144,[2325,316...|(262144,[2325,316...|  0.0|
|business|WASHINGTON  THE T...|[washington, , th...|(262144,[1598,176...|(262144,[1598,176...|  0.0|
|business|REUTERS    METLIF...|[reuters, , , , m...|(262144,[571,1846...|(262144,[571,1846...|  0.0|
|  sports|DALLAS  WHEN DALL...|[dallas, , when, ...|(262144,[8804,963...|(262144,[8804,963...|  2.0|
+--------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [112]:
lrModel = lr.fit(dataset.limit(5000))

In [113]:
res_train = lrModel.transform(dataset)

In [114]:
res_train.select("Body","Category","probability","label","prediction").show(20)

+--------------------+-------------+--------------------+-----+----------+
|                Body|     Category|         probability|label|prediction|
+--------------------+-------------+--------------------+-----+----------+
|SEOUL  WITH A FAL...|     business|[0.98191136542019...|  0.0|       0.0|
|JEFFERSON CITY  M...|     business|[0.98055222969936...|  0.0|       0.0|
|WASHINGTON  THE T...|     business|[0.59419377887003...|  0.0|       0.0|
|REUTERS    METLIF...|     business|[0.98217426198793...|  0.0|       0.0|
|DALLAS  WHEN DALL...|       sports|[0.06673936904185...|  2.0|       2.0|
|SOFIA  BULGARIA C...|     business|[0.98353054921268...|  0.0|       0.0|
|PHOENIX  WHEN FOR...|entertainment|[0.16686579619068...|  3.0|       1.0|
|SARA MELINDA BEES...|entertainment|[0.08246612234435...|  3.0|       3.0|
|ON A WARM SATURDA...|       sports|[0.00477047382362...|  2.0|       2.0|
|FEW WOULD DISPUTE...|       sports|[0.00582462073688...|  2.0|       2.0|
|REMARKS BY PRESID...|   

In [116]:
res_train.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|Category|                Body|               words|         rawFeatures|            features|label|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|business|SEOUL  WITH A FAL...|[seoul, , with, a...|(262144,[619,4869...|(262144,[619,4869...|  0.0|[3.85147051851027...|[0.98191136542019...|       0.0|
|business|JEFFERSON CITY  M...|[jefferson, city,...|(262144,[2325,316...|(262144,[2325,316...|  0.0|[5.32715364628057...|[0.98055222969936...|       0.0|
|business|WASHINGTON  THE T...|[washington, , th...|(262144,[1598,176...|(262144,[1598,176...|  0.0|[2.97982138154850...|[0.59419377887003...|       0.0|
|business|REUTERS    METLIF...|[reuters, , , , m...|(262144,[571,1846...|(26

#### Select the probability column
---
Create a user-defined function (udf) to select the second element in each row of the column vector

In [117]:
extract_prob = F.udf(lambda x: float(x[1]), T.FloatType())

In [118]:
(res_train.withColumn("proba", extract_prob("probability"))
 .select("proba", "prediction")
 .show())

+------------+----------+
|       proba|prediction|
+------------+----------+
| 0.008070086|       0.0|
|  0.01911029|       0.0|
|  0.40206984|       0.0|
| 0.009203206|       0.0|
| 0.048210133|       2.0|
| 0.008524606|       0.0|
|    0.501839|       1.0|
|  0.08413483|       3.0|
|0.0030365665|       2.0|
|0.0034108225|       2.0|
|  0.32620102|       0.0|
|  0.49998915|       1.0|
| 0.007310296|       0.0|
|  0.33218947|       0.0|
| 0.006979774|       0.0|
| 0.016729414|       3.0|
|  0.97994304|       1.0|
|  0.95325756|       1.0|
|0.0010521865|       0.0|
|  0.64051425|       1.0|
+------------+----------+
only showing top 20 rows



### Create the results DataFrame
---
Convert the test text

In [119]:
test = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('articles-test.csv')
drop_list = ['Dates', 'Topic', 'Page']
test = test.select([column for column in train.columns if column not in drop_list])
train.show(5)

+--------+--------------------+
|Category|                Body|
+--------+--------------------+
|business|SEOUL  WITH A FAL...|
|business|JEFFERSON CITY  M...|
|business|WASHINGTON  THE T...|
|business|REUTERS    METLIF...|
|  sports|DALLAS  WHEN DALL...|
+--------+--------------------+
only showing top 5 rows



In [120]:
test_tokens = tokenizer.transform(test)
test_tf = hashingTF.transform(test_tokens)
test_tfidf = idfModel.transform(test_tf)
label_stringIdxTest = StringIndexer(inputCol = "Category", outputCol = "label")
pipeline = Pipeline(stages=[label_stringIdxTest])
pipelineFit = pipeline.fit(test_tfidf)
dataset = pipelineFit.transform(test_tfidf)

Initialize the new DataFrame with the id column

In [122]:
test_res = dataset.select('Category')
test_res.head()

Row(Category='business')

Make predictions for each class

In [123]:
test_probs = []
for col in outputCol:
    print(col)
    lr = LogisticRegression(featuresCol="features", labelCol=col, regParam=REG)
    print("...fitting")
    lrModel = lr.fit(tfidf)
    print("...predicting")
    res = lrModel.transform(test_tfidf)
    print("...appending result")
    test_res = test_res.join(res.select('id', 'probability'), on="id")
    print("...extracting probability")
    test_res = test_res.withColumn(col, extract_prob('probability')).drop("probability")
    test_res.show(5)

NameError: name 'outputCol' is not defined

In [26]:
test_res.show(5)

In [27]:
test_res.coalesce(1).write.csv('./results/spark_lr.csv', mode='overwrite', header=True)

The output is actually a directory and not a csv file. Within the directory there is one or more csv files, which together make up the entire csv results. I used the cat function to concatenate these csv files together.

In [28]:
!cat results/spark_lr.csv/part*.csv > spark_lr.csv

In [29]:
ls

This submission scores 0.8797 on the public leaderboard.