In [1]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.75:7077") \
        .appName("egemen_reddit")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.executor.cores",4)\
        .getOrCreate()
        #.config("spark.dynamicAllocation.executorIdleTimeout","30s")\
    
# Old API (RDD)
spark_context = spark_session.sparkContext

In [2]:
spark_context = spark_session.sparkContext

In [3]:
df1 =  spark_session.read.json('hdfs://192.168.2.75:9000/data1')

In [4]:
df1.show()

+----------+----------------------+-----------------+--------------------+----------------+-----------+-------------+------+------+---+--------+---------+------------+-----+--------+----------+------------+---+
|    author|author_flair_css_class|author_flair_text|                body|controversiality|created_utc|distinguished|edited|gilded| id| link_id|parent_id|retrieved_on|score|stickied| subreddit|subreddit_id|ups|
+----------+----------------------+-----------------+--------------------+----------------+-----------+-------------+------+------+---+--------+---------+------------+-----+--------+----------+------------+---+
|      frjo|                  null|             null|A look at Vietnam...|               0| 1134365188|         null| false|     0|c13|t3_17863| t3_17863|  1473738411|    2|   false|reddit.com|        t5_6|  2|
|   zse7zse|                  null|             null|The site states "...|               0| 1134365725|         null| false|     0|c14|t3_17866| t3_17866|  

In [5]:
df1.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: boolean (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



# CLASSIFICATION

In [45]:
preprocessed_contro=df1.select( 'ups','score', 'controversiality')

In [49]:
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
assembler = VectorAssembler(inputCols=["ups", "score"], outputCol="features")
labelIndexer = StringIndexer(inputCol='controversiality', outputCol="label")
stg=[assembler, labelIndexer]

In [11]:
(train_cdf, test_cdf)=preprocessed_contro.randomSplit([0.8 ,0.2])

In [12]:
train_cdf.count()

869

In [58]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol= "features", labelCol="label",  maxIter=10)

In [59]:
ppl=Pipeline(stages= [assembler, labelIndexer, lr])

In [60]:
model=ppl.fit(train_cdf)

In [61]:
prediction_cdf = model.transform(test_cdf)

In [63]:
prediction_cdf.show()

+---+-----+----------------+-------------+-----+--------------------+--------------------+----------+
|ups|score|controversiality|     features|label|       rawPrediction|         probability|prediction|
+---+-----+----------------+-------------+-----+--------------------+--------------------+----------+
|-16|  -16|               0|[-16.0,-16.0]|  0.0|[0.50416186186452...|[0.62343688455997...|       0.0|
| -9|   -9|               0|  [-9.0,-9.0]|  0.0|[1.56039474723286...|[0.82640998934167...|       0.0|
| -6|   -6|               0|  [-6.0,-6.0]|  0.0|[2.01306598381929...|[0.88216211133835...|       0.0|
| -6|   -6|               1|  [-6.0,-6.0]|  1.0|[2.01306598381929...|[0.88216211133835...|       0.0|
| -5|   -5|               0|  [-5.0,-5.0]|  0.0|[2.16395639601477...|[0.89696576614953...|       0.0|
| -4|   -4|               0|  [-4.0,-4.0]|  0.0|[2.31484680821024...|[0.91009920358905...|       0.0|
| -4|   -4|               0|  [-4.0,-4.0]|  0.0|[2.31484680821024...|[0.9100992035

In [67]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction_cdf)
print("Accuracy = %g " % (accuracy*100))


Accuracy = 94.6602 


In [82]:
def classification(df):
    preprocessed_contro=df.select( 'ups','score', 'controversiality')
    assembler = VectorAssembler(inputCols=["ups", "score"], outputCol="features")
    labelIndexer = StringIndexer(inputCol='controversiality', outputCol="label")
    lr = LogisticRegression(featuresCol= "features", labelCol="label",  maxIter=10)
    ppl=Pipeline(stages= [assembler, labelIndexer, lr])
    (train_cdf, test_cdf)=preprocessed_contro.randomSplit([0.8 ,0.2])
    model=ppl.fit(train_cdf)
    # Select (prediction, true label) and compute test accuracy
    evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(prediction_cdf)
    print("Accuracy = %g " % (accuracy*100))


In [85]:
spark_context.stop()

# REGRESSION

In [68]:
preprocessed_df = df1.select('body', 'score')

In [69]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer

In [70]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import DecisionTreeClassifier

In [71]:
# Transformer 1: Tokenizer (splits up words)
tokenizer = Tokenizer(inputCol="body", outputCol="words")
tokenized_df = tokenizer.transform(preprocessed_df)
tokenized_df.show()


+--------------------+-----+--------------------+
|                body|score|               words|
+--------------------+-----+--------------------+
|A look at Vietnam...|    2|[a, look, at, vie...|
|The site states "...|    1|[the, site, state...|
|Jython related to...|    0|[jython, related,...|
|           [deleted]|    1|         [[deleted]]|
|Saft is by far th...|    1|[saft, is, by, fa...|
|           [deleted]|   -6|         [[deleted]]|
|How to take panor...|    1|[how, to, take, p...|
|I donât know wh...|    1|[i, donât, know...|
|LinkIt by Marc, a...|    1|[linkit, by, marc...|
|Making websites r...|    1|[making, websites...|
|On the bright sid...|    1|[on, the, bright,...|
|Like a lot of peo...|    8|[like, a, lot, of...|
|This is comment t...|  -16|[this, is, commen...|
|           [deleted]|    0|         [[deleted]]|
|           [deleted]|   -2|         [[deleted]]|
|           [deleted]|   -6|         [[deleted]]|
|           [deleted]|   -4|         [[deleted]]|


In [72]:
# Transformer 2: Convert Words into word frequencies (TF = "Term Frequency")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
freq_df = hashingTF.transform(tokenized_df)
freq_df.take(3)

[Row(body='A look at Vietnam and Mexico exposes the myth of market liberalisation.', score=2, words=['a', 'look', 'at', 'vietnam', 'and', 'mexico', 'exposes', 'the', 'myth', 'of', 'market', 'liberalisation.'], features=SparseVector(262144, {2624: 1.0, 9639: 1.0, 91677: 1.0, 103838: 1.0, 114232: 1.0, 135373: 1.0, 138730: 1.0, 170698: 1.0, 176964: 1.0, 223763: 1.0, 227410: 1.0, 237314: 1.0})),
 Row(body='The site states "What can I use it for? Meeting notes, Reports, technical specs Sign-up sheets, proposals and much more...", just like any other new breeed of sites that want us to store everything we have on the web. And they even guarantee multiple levels of security and encryption etc. But what prevents these web site operators fom accessing and/or stealing Meeting notes, Reports, technical specs Sign-up sheets, proposals and much more, for competitive or personal gains...? I am pretty sure that most of them are honest, but what\'s there to prevent me from setting up a good useful sit

In [76]:
# Model: Random Forest Regression
dt = DecisionTreeRegressor(featuresCol="features", labelCol="score")

# Put them together as a pipeline.
pipeline = Pipeline(stages=[tokenizer, hashingTF, dt])

In [77]:
(train_rdf, test_rdf)=preprocessed_df.randomSplit([0.8 ,0.2])

In [79]:
preprocessed_df.show()

+--------------------+-----+
|                body|score|
+--------------------+-----+
|A look at Vietnam...|    2|
|The site states "...|    1|
|Jython related to...|    0|
|           [deleted]|    1|
|Saft is by far th...|    1|
|           [deleted]|   -6|
|How to take panor...|    1|
|I donât know wh...|    1|
|LinkIt by Marc, a...|    1|
|Making websites r...|    1|
|On the bright sid...|    1|
|Like a lot of peo...|    8|
|This is comment t...|  -16|
|           [deleted]|    0|
|           [deleted]|   -2|
|           [deleted]|   -6|
|           [deleted]|   -4|
|It's a New York T...|    3|
|[Here's the copy ...|   12|
|The best thing ab...|    5|
+--------------------+-----+
only showing top 20 rows



In [78]:
modelreg=pipeline.fit(train_rdf)

Py4JJavaError: An error occurred while calling o819.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 87 (collectAsMap at RandomForest.scala:928) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 19 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697) 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693) 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 	at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693) 	at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147) 	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 	at org.apache.spark.scheduler.Task.run(Task.scala:108) 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 	at java.lang.Thread.run(Thread.java:748) 
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1310)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1711)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:746)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:745)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:745)
	at org.apache.spark.ml.tree.impl.RandomForest$.findSplitsBySorting(RandomForest.scala:928)
	at org.apache.spark.ml.tree.impl.RandomForest$.findSplits(RandomForest.scala:906)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:118)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:111)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:46)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
prediction_rdf = model.transform(test_rdf)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)