In [1]:
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
try:
    # create SparkContext on all CPUs available: in my case I have 1 CPUs on my laptop
    # https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35
    conf = ps.SparkConf().setAppName("App")
    conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '16G')
        .set('spark.driver.memory', '8G')
        .set('spark.driver.maxResultSize', '1 G'))
    sc = ps.SparkContext(conf=conf)
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

Just created a SparkContext


In [3]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('../datasets/news.csv')
type(df)

pyspark.sql.dataframe.DataFrame

In [4]:
df.show(100)

+---+-------+--------------------+--------------+--------------------+----------+------+-----+----+--------------------+
| id|id_news|               title|   publication|              author|      date|  year|month| url|             content|
+---+-------+--------------------+--------------+--------------------+----------+------+-----+----+--------------------+
|  0|  17283|House Republicans...|New York Times|          Carl Hulse|2016-12-31|2016.0| 12.0|null|WASHINGTON  —   C...|
|  1|  17284|Rift Between Offi...|New York Times|Benjamin Mueller ...|2017-06-19|2017.0|  6.0|null|After the bullet ...|
|  2|  17285|Tyrus Wong, ‘Bamb...|New York Times|        Margalit Fox|2017-01-06|2017.0|  1.0|null|When Walt Disney’...|
|  3|  17286|Among Deaths in 2...|New York Times|    William McDonald|2017-04-10|2017.0|  4.0|null|Death may be the ...|
|  4|  17287|Kim Jong-un Says ...|New York Times|       Choe Sang-Hun|2017-01-02|2017.0|  1.0|null|SEOUL, South Kore...|
|  5|  17288|Sick With a Cold,..

In [5]:
df.count()

185186

In [6]:
df = df.dropna()
df.count()

79911

In [7]:
(train_set, val_set, test_set) = df.randomSplit([0.7, 0.2, 0.1], seed = 2000)

In [8]:
tokenizer = Tokenizer(inputCol="content", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "publication", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+
|                  id|             id_news|               title|         publication|              author|                date|                year|               month|                 url|             content|               words|                  tf|            features|  label|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+
| hateful rhetoric...|                2017|  We are a single...| we are an ocean#...|  —   Bianca I. M...|                2017|  Twitter users w...| wi

In [9]:
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

Py4JJavaError: An error occurred while calling o269.fit.
: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.linalg.DenseMatrix$.zeros(Matrices.scala:492)
	at org.apache.spark.ml.linalg.Matrices$.zeros(Matrices.scala:1067)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:706)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:494)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:494)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:489)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:279)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [None]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy