In [1]:
import findspark
findspark.init('/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('regresion_logistica').getOrCreate()

In [3]:
from pyspark.sql.types import *   # Importamos los tipos de datos para definir el esquema
%matplotlib inline 
import matplotlib.pyplot as plt, numpy as np
import pandas

## El dataset sólo tiene dos columnas, el mensaje SMS (texto) 
## y una etiqueta que indica si fué spam o no 
spam_schema = StructType([
                         StructField("spam", StringType(), True), 
                         StructField("message", StringType(), True)
              ])


ds = spark.read.csv("/user/carloslopez/datasets/spam", sep="\t", schema=spam_schema)
ds.show()
ds.show(truncate=False)
ds.printSchema()

+----+--------------------+
|spam|             message|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows

+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|spam|message                                                                                     

In [4]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="spam", outputCol="label")
indexed = indexer.fit(ds).transform(ds)
indexed.show()

from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="message", outputCol="tokens")
tokenized = tokenizer.transform(indexed)
tokenized.show()

from pyspark.ml.feature import HashingTF, IDF, VectorAssembler
hashingTF = HashingTF(inputCol="tokens", outputCol="tf")
tf_data = hashingTF.transform(tokenized)
tf_data.show()

idf = IDF(inputCol="tf", outputCol="idf")
idfModel = idf.fit(tf_data)
idf_data = idfModel.transform(tf_data)
idf_data.show()

assembler = VectorAssembler(inputCols=["idf"], outputCol="features")
assembled_data = assembler.transform(idf_data)
assembled_data.show()

+----+--------------------+-----+
|spam|             message|label|
+----+--------------------+-----+
| ham|Go until jurong p...|  0.0|
| ham|Ok lar... Joking ...|  0.0|
|spam|Free entry in 2 a...|  1.0|
| ham|U dun say so earl...|  0.0|
| ham|Nah I don't think...|  0.0|
|spam|FreeMsg Hey there...|  1.0|
| ham|Even my brother i...|  0.0|
| ham|As per your reque...|  0.0|
|spam|WINNER!! As a val...|  1.0|
|spam|Had your mobile 1...|  1.0|
| ham|I'm gonna be home...|  0.0|
|spam|SIX chances to wi...|  1.0|
|spam|URGENT! You have ...|  1.0|
| ham|I've been searchi...|  0.0|
| ham|I HAVE A DATE ON ...|  0.0|
|spam|XXXMobileMovieClu...|  1.0|
| ham|Oh k...i'm watchi...|  0.0|
| ham|Eh u remember how...|  0.0|
| ham|Fine if thats th...|  0.0|
|spam|England v Macedon...|  1.0|
+----+--------------------+-----+
only showing top 20 rows

+----+--------------------+-----+--------------------+
|spam|             message|label|              tokens|
+----+--------------------+-----+---------------

In [4]:
assembled_data.head(2)

[Row(spam=u'ham', message=u'Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...', label=0.0, tokens=[u'go', u'until', u'jurong', u'point,', u'crazy..', u'available', u'only', u'in', u'bugis', u'n', u'great', u'world', u'la', u'e', u'buffet...', u'cine', u'there', u'got', u'amore', u'wat...'], tf=SparseVector(262144, {3168: 1.0, 4081: 1.0, 17222: 1.0, 26042: 1.0, 31463: 1.0, 59729: 1.0, 65510: 1.0, 95595: 1.0, 100743: 1.0, 120767: 1.0, 138356: 1.0, 172477: 1.0, 180535: 1.0, 181635: 1.0, 181726: 1.0, 184181: 1.0, 201474: 1.0, 222453: 1.0, 231671: 1.0, 238163: 1.0}), idf=SparseVector(262144, {3168: 7.9329, 4081: 5.2938, 17222: 4.322, 26042: 6.8343, 31463: 3.2055, 59729: 7.5274, 65510: 7.9329, 95595: 6.6801, 100743: 3.822, 120767: 7.9329, 138356: 4.2072, 172477: 3.1126, 180535: 3.8058, 181635: 3.4443, 181726: 5.4072, 184181: 7.9329, 201474: 6.1411, 222453: 1.9706, 231671: 7.9329, 238163: 5.918}), features=SparseVector(262144, {316

In [5]:
## Esto no habría que hacerlo, deberíamos usar una especie de magic loop, cross-validation, etc
training_data, test_data = assembled_data.randomSplit(weights=[0.7, 0.3], 
                                                      seed=12345)

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="label", featuresCol="features")
lrModel = lr.fit(training_data)


In [6]:
training_data.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 3350|
|  1.0|  530|
+-----+-----+



In [7]:
test_data.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 1477|
|  1.0|  217|
+-----+-----+



In [11]:
predictions = lrModel.transform(test_data)
predictions.createOrReplaceTempView("tablita")

In [14]:
spark.sql("""
            select label, prediction, count(1)
              from tablita
            group by label, prediction
            order by 1, 2 asc
          """).show()

+-----+----------+--------+
|label|prediction|count(1)|
+-----+----------+--------+
|  0.0|       0.0|    1473|
|  0.0|       1.0|       4|
|  1.0|       0.0|      51|
|  1.0|       1.0|     166|
+-----+----------+--------+



In [10]:
predictions.head(50)

[Row(spam=u'ham', message=u' came to look at the flat, seems ok, in his 50s? * Is away alot wiv work. Got woman coming at 6.30 too.', label=0.0, tokens=[u'', u'came', u'to', u'look', u'at', u'the', u'flat,', u'seems', u'ok,', u'in', u'his', u'50s?', u'*', u'is', u'away', u'alot', u'wiv', u'work.', u'got', u'woman', u'coming', u'at', u'6.30', u'too.'], tf=SparseVector(262144, {9129: 1.0, 13471: 1.0, 15889: 1.0, 31463: 1.0, 53777: 1.0, 61193: 1.0, 65844: 1.0, 66980: 1.0, 73366: 1.0, 80458: 1.0, 103838: 1.0, 121808: 1.0, 148195: 1.0, 158129: 1.0, 170186: 1.0, 176964: 2.0, 197150: 1.0, 205044: 1.0, 205408: 1.0, 206331: 1.0, 222453: 1.0, 223763: 1.0, 249180: 1.0}), idf=SparseVector(262144, {9129: 5.5815, 13471: 6.6801, 15889: 2.0344, 31463: 3.2055, 53777: 5.7928, 61193: 6.5466, 65844: 5.3302, 66980: 4.5152, 73366: 4.8419, 80458: 5.1295, 103838: 1.6868, 121808: 7.5274, 148195: 7.9329, 158129: 6.1411, 170186: 7.9329, 176964: 5.4089, 197150: 7.5274, 205044: 1.1983, 205408: 5.987, 206331: 5.630

In [22]:
predict = lrModel.transform(test_data)
predict.select("spam", "probability", "prediction", "label").show(truncate=False)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator().setRawPredictionCol("prediction")
accuracy = evaluator.evaluate(predict)

"Test error: {}".format(1.0 - accuracy)


+----+-------------------------------------------+----------+-----+
|spam|probability                                |prediction|label|
+----+-------------------------------------------+----------+-----+
|ham |[0.9999999999999993,5.893336188403968E-16] |0.0       |0.0  |
|ham |[0.9999999999987219,1.278068203353027E-12] |0.0       |0.0  |
|ham |[1.0,4.158197333982635E-24]                |0.0       |0.0  |
|ham |[0.9999999999698668,3.013324135712556E-11] |0.0       |0.0  |
|ham |[0.9999998977488163,1.022511836279667E-7]  |0.0       |0.0  |
|ham |[0.9999999999954396,4.560253881374639E-12] |0.0       |0.0  |
|ham |[0.9999999998690658,1.3093417959804142E-10]|0.0       |0.0  |
|ham |[1.0,1.6841575527438628E-33]               |0.0       |0.0  |
|ham |[0.9999998607896353,1.392103646489647E-7]  |0.0       |0.0  |
|ham |[1.0,5.908301262149189E-17]                |0.0       |0.0  |
|ham |[0.9999975171362772,2.48286372273592E-6]   |0.0       |0.0  |
|ham |[1.0,3.9952354694499075E-21]              

'Test error: 0.118865616878'

In [23]:
summary = lrModel.summary
summary

<pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary at 0x31a6790>

In [25]:
print("ROC",summary.areaUnderROC)
print("vars:",summary.roc)

('ROC', 0.9998856660095748)
('vars:', DataFrame[FPR: double, TPR: double])
('vars:', DataFrame[recall: double, precision: double])


In [26]:
spark.stop()