News Aggregator dataset from uci: https://archive.ics.uci.edu/ml/datasets/News+Aggregator

Somayeh Shami - somayeh.shami@studio.unibo.it

Install  libraries

In [1]:
!pip install pyspark



In [2]:
!pip install findspark



Import required libraries

In [3]:
import os
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-19"
os.environ["SPARK_HOME"] = 'C:\\Users\\Lenovo\\Desktop\\BDAVM\\spark'
findspark.init()
from pyspark.sql import SparkSession

In [4]:
from pyspark.ml import Pipeline 
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.sql.functions import col, udf,regexp_replace,isnull
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Initialize default configs using SparkSession

In [5]:
# Build the SparkSession
spark = SparkSession.builder.master("local").appName("NewsAggregator").config("spark.executor.memory", "1gb").getOrCreate()

In [6]:
sc = spark.sparkContext

In [7]:
s3_bucket_path = 'News/uci-news-aggregator.csv'

Load dataset from local machine as a csv file

In [8]:
news_data = spark.read.csv(s3_bucket_path,header = 'True',inferSchema='True')

Display top 20 rows

In [9]:
news_data.show()

+---+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-------------+
| ID|               TITLE|                 URL|           PUBLISHER|CATEGORY|               STORY|            HOSTNAME|    TIMESTAMP|
+---+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-------------+
|  1|Fed official says...|http://www.latime...|   Los Angeles Times|       b|ddUyU0VZz0BRneMio...|     www.latimes.com|1394470370698|
|  2|Fed's Charles Plo...|http://www.livemi...|            Livemint|       b|ddUyU0VZz0BRneMio...|    www.livemint.com|1394470371207|
|  3|US open: Stocks f...|http://www.ifamag...|        IFA Magazine|       b|ddUyU0VZz0BRneMio...| www.ifamagazine.com|1394470371550|
|  4|Fed risks falling...|http://www.ifamag...|        IFA Magazine|       b|ddUyU0VZz0BRneMio...| www.ifamagazine.com|1394470371793|
|  5|Fed's Plosser: Na...|http://www.moneyn...|           Mone

In [11]:
news_data.count()

422937

Select 'TITLE' and 'CATEGORY' columns from dataframe

In [12]:
title_category = news_data.select("TITLE","CATEGORY")

Display top 20 rows from selected columns

In [13]:
title_category.show()

+--------------------+--------+
|               TITLE|CATEGORY|
+--------------------+--------+
|Fed official says...|       b|
|Fed's Charles Plo...|       b|
|US open: Stocks f...|       b|
|Fed risks falling...|       b|
|Fed's Plosser: Na...|       b|
|Plosser: Fed May ...|       b|
|Fed's Plosser: Ta...|       b|
|Fed's Plosser exp...|       b|
|US jobs growth la...|       b|
|ECB unlikely to e...|       b|
|ECB unlikely to e...|       b|
|EU's half-baked b...|       b|
|Europe reaches cr...|       b|
|ECB FOCUS-Stronge...|       b|
|EU aims for deal ...|       b|
|Forex - Pound dro...|       b|
|Noyer Says Strong...|       b|
|EU Week Ahead Mar...|       b|
|ECB member Noyer ...|       b|
|Euro Anxieties Wa...|       b|
+--------------------+--------+
only showing top 20 rows



Define a function which shows null values from dataframe

In [14]:
def null_value_count(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows
            null_columns_counts.append(temp)
    return(null_columns_counts)

In [15]:
null_columns_count_list = null_value_count(title_category)

Display number of null-values rows in 'TITLE' and 'CATEGORY' columns
As we can see, there is 389 and 516 null-values in 'TITLE' and 'CATEGORY' columns respectively

In [16]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                 TITLE|              389|
|              CATEGORY|              516|
+----------------------+-----------------+



Drop null-values from dataframe

In [17]:
title_category = title_category.dropna()

Count the number of rows after removing null-values

In [18]:
title_category.count()

422421

Display top 20 rows after removing null-values

In [19]:
title_category.show(truncate=False)

+---------------------------------------------------------------------------+--------+
|TITLE                                                                      |CATEGORY|
+---------------------------------------------------------------------------+--------+
|Fed official says weak data caused by weather, should not slow taper       |b       |
|Fed's Charles Plosser sees high bar for change in pace of tapering         |b       |
|US open: Stocks fall after Fed official hints at accelerated tapering      |b       |
|Fed risks falling 'behind the curve', Charles Plosser says                 |b       |
|Fed's Plosser: Nasty Weather Has Curbed Job Growth                         |b       |
|Plosser: Fed May Have to Accelerate Tapering Pace                          |b       |
|Fed's Plosser: Taper pace may be too slow                                  |b       |
|Fed's Plosser expects US unemployment to fall to 6.2% by the end of 2014   |b       |
|US jobs growth last month hit by weather:F

Count the number of duplicate quantity in 'Category' column

In [20]:
title_category.select("Category").distinct().count()

265

In this step, first I groupby dataframe by 'Category' column. Then I order it based on 'count' column. Finally, I display the result.

In [21]:
title_category.groupBy("Category").count().orderBy(col("count").desc()).show(truncate=False)

+--------------------+------+
|Category            |count |
+--------------------+------+
|e                   |152127|
|b                   |115935|
|t                   |108237|
|m                   |45616 |
|Us Magazine         |31    |
|GossipCop           |20    |
|Contactmusic.com    |20    |
|Complex.com         |12    |
|CBS News            |12    |
|The Hollywood Gossip|11    |
|HipHopDX            |11    |
|HeadlinePlanet.com  |10    |
|We Got This Covered |10    |
|Gamepur             |8     |
|TooFab.com          |7     |
|Wetpaint            |7     |
|WorstPreviews.com   |7     |
|Consequence of Sound|7     |
|The Escapist        |6     |
|Reality TV World    |5     |
+--------------------+------+
only showing top 20 rows



In this step, same as previous step, first I groupby dataframe by 'TITLE' column. Then I order it based on 'count' column. Finally, I display the result.

In [22]:
title_category.groupBy("TITLE").count().orderBy(col("count").desc()).show(truncate=False)

+----------------------------------------------------------------------------------+-----+
|TITLE                                                                             |count|
+----------------------------------------------------------------------------------+-----+
|The article requested cannot be found! Please refresh your browser or go back  ...|145  |
|Business Highlights                                                               |59   |
|Posted by Parvez Jabri                                                            |59   |
|Posted by Imaduddin                                                               |53   |
|Posted by Shoaib-ur-Rehman Siddiqui                                               |52   |
|(click the phrases to see a list)                                                 |51   |
|Business Wire                                                                     |41   |
|PR Newswire                                                                       |38   |

### Data Cleaning

Perform regular expression operation on 'only_str' column, for removing numbers from titles

In [23]:
title_category = title_category.withColumn("only_str",regexp_replace(col('TITLE'), '\d+', ''))

Display 'TITLE' and 'only_str' columns after performing 'regexp' operation 

In [24]:
title_category.select("TITLE","only_str").show(truncate=False)

+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|TITLE                                                                      |only_str                                                                   |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|Fed official says weak data caused by weather, should not slow taper       |Fed official says weak data caused by weather, should not slow taper       |
|Fed's Charles Plosser sees high bar for change in pace of tapering         |Fed's Charles Plosser sees high bar for change in pace of tapering         |
|US open: Stocks fall after Fed official hints at accelerated tapering      |US open: Stocks fall after Fed official hints at accelerated tapering      |
|Fed risks falling 'behind the curve', Charles Plosser says                 

Split text to words by RegexTokenizer

In [25]:
regex_tokenizer = RegexTokenizer(inputCol="only_str", outputCol="words", pattern="\W")
raw_words = regex_tokenizer.transform(title_category)

In [26]:
raw_words.show()

+--------------------+--------+--------------------+--------------------+
|               TITLE|CATEGORY|            only_str|               words|
+--------------------+--------+--------------------+--------------------+
|Fed official says...|       b|Fed official says...|[fed, official, s...|
|Fed's Charles Plo...|       b|Fed's Charles Plo...|[fed, s, charles,...|
|US open: Stocks f...|       b|US open: Stocks f...|[us, open, stocks...|
|Fed risks falling...|       b|Fed risks falling...|[fed, risks, fall...|
|Fed's Plosser: Na...|       b|Fed's Plosser: Na...|[fed, s, plosser,...|
|Plosser: Fed May ...|       b|Plosser: Fed May ...|[plosser, fed, ma...|
|Fed's Plosser: Ta...|       b|Fed's Plosser: Ta...|[fed, s, plosser,...|
|Fed's Plosser exp...|       b|Fed's Plosser exp...|[fed, s, plosser,...|
|US jobs growth la...|       b|US jobs growth la...|[us, jobs, growth...|
|ECB unlikely to e...|       b|ECB unlikely to e...|[ecb, unlikely, t...|
|ECB unlikely to e...|       b|ECB unl

Remove stop words

In [27]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
words_df = remover.transform(raw_words)

In [28]:
words_df.select("words","filtered").show(truncate=False)

+-------------------------------------------------------------------------------------+-------------------------------------------------------------------------------+
|words                                                                                |filtered                                                                       |
+-------------------------------------------------------------------------------------+-------------------------------------------------------------------------------+
|[fed, official, says, weak, data, caused, by, weather, should, not, slow, taper]     |[fed, official, says, weak, data, caused, weather, slow, taper]                |
|[fed, s, charles, plosser, sees, high, bar, for, change, in, pace, of, tapering]     |[fed, charles, plosser, sees, high, bar, change, pace, tapering]               |
|[us, open, stocks, fall, after, fed, official, hints, at, accelerated, tapering]     |[us, open, stocks, fall, fed, official, hints, accelerated, tapering]    

Encode column of category to a column of category indices

In [29]:
indexer = StringIndexer(inputCol="CATEGORY", outputCol="categoryIndex")
feature_data = indexer.fit(words_df).transform(words_df)

In [30]:
feature_data = feature_data.withColumnRenamed("CATEGORY","label")

In [31]:
feature_data

DataFrame[TITLE: string, label: string, only_str: string, words: array<string>, filtered: array<string>, categoryIndex: double]

In [32]:
feature_data.select("label","categoryIndex").show()

+-----+-------------+
|label|categoryIndex|
+-----+-------------+
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
|    b|          1.0|
+-----+-------------+
only showing top 20 rows



Convert text into vectors of token counts

In [33]:
cv = CountVectorizer(inputCol="filtered", outputCol="features")
model = cv.fit(feature_data)
countVectorizer_feateures = model.transform(feature_data)

### Modelling

Create Training & Test datasets

In [34]:
(trainingData, testData) = countVectorizer_feateures.randomSplit([0.8, 0.2],seed = 11)

In [35]:
trainingData

DataFrame[TITLE: string, label: string, only_str: string, words: array<string>, filtered: array<string>, categoryIndex: double, features: vector]

### Naive bayes method

Train the NaiveBayes model

In [47]:
nb = NaiveBayes(modelType="multinomial",labelCol="categoryIndex", featuresCol="features")
nbModel = nb.fit(trainingData)
nb_predictions = nbModel.transform(testData)

Display predication of NaiveBayes model on top 10 rows

In [48]:
nb_predictions.select("prediction", "categoryIndex", "features").show(10)

+----------+-------------+--------------------+
|prediction|categoryIndex|            features|
+----------+-------------+--------------------+
|       0.0|          0.0|(49043,[520,585,6...|
|       0.0|          0.0|(49043,[331,1026,...|
|       0.0|          0.0|(49043,[22,53,555...|
|       0.0|          0.0|(49043,[167,553,9...|
|       2.0|          2.0|(49043,[7,95,353,...|
|       0.0|          0.0|(49043,[4,942,109...|
|       0.0|          0.0|(49043,[17,287,10...|
|       0.0|          0.0|(49043,[6,21,22,5...|
|       0.0|          0.0|(49043,[21,22,50,...|
|       0.0|          0.0|(49043,[21,22,50,...|
+----------+-------------+--------------------+
only showing top 10 rows



Evaluate the performance of NaiveBayes model

In [37]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
nb_accuracy = evaluator.evaluate(nb_predictions)
print("Accuracy of NaiveBayes is = %g"% (nb_accuracy))
print("Test Error of NaiveBayes = %g " % (1.0 - nb_accuracy))

Accuracy of NaiveBayes is = 0.925271
Test Error of NaiveBayes = 0.074729 


### DecisionTreeClassifier

Train the DecisionTreeClassifier model

In [36]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier(labelCol="categoryIndex", featuresCol="features")
dt_model = dt.fit(trainingData)

In [37]:
dt_predictions = dt_model.transform(testData)

Evaluate the performance of DecisionTreeClassifier model

In [38]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
dt_accuracy = evaluator.evaluate(dt_predictions)
print("Accuracy of DT is = %g"% (dt_accuracy))
print("Test Error of DT = %g " % (1.0 - dt_accuracy))

Accuracy of DT is = 0.451625
Test Error of DT = 0.548375 


Display predication of DecisionTreeClassifier model on top 10 rows

In [45]:
dt_predictions.select("prediction", "categoryIndex", "features").show(10)

+----------+-------------+--------------------+
|prediction|categoryIndex|            features|
+----------+-------------+--------------------+
|       0.0|          0.0|(49043,[520,585,6...|
|       0.0|          0.0|(49043,[331,1026,...|
|       0.0|          0.0|(49043,[22,53,555...|
|       0.0|          0.0|(49043,[167,553,9...|
|       2.0|          2.0|(49043,[7,95,353,...|
|       0.0|          0.0|(49043,[4,942,109...|
|       0.0|          0.0|(49043,[17,287,10...|
|       0.0|          0.0|(49043,[6,21,22,5...|
|       0.0|          0.0|(49043,[21,22,50,...|
|       0.0|          0.0|(49043,[21,22,50,...|
+----------+-------------+--------------------+
only showing top 10 rows



### RandomForestClassifier

Train the RandomForestClassifier model

In [40]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="categoryIndex", featuresCol="features", numTrees=10)
rf_model = rf.fit(trainingData)

In [41]:
rf_predictions = rf_model.transform(testData)

Evaluate the performance of RandomForestClassifier model

In [42]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
rf_accuracy = evaluator.evaluate(rf_predictions)
print("Accuracy of DT is = %g"% (rf_accuracy))
print("Test Error of DT = %g " % (1.0 - rf_accuracy))

Accuracy of DT is = 0.398267
Test Error of DT = 0.601733 


Display predication of RandomForestClassifier model on top 10 rows

In [46]:
rf_predictions.select("prediction", "categoryIndex", "features").show(10)

+----------+-------------+--------------------+
|prediction|categoryIndex|            features|
+----------+-------------+--------------------+
|       0.0|          0.0|(49043,[520,585,6...|
|       0.0|          0.0|(49043,[331,1026,...|
|       0.0|          0.0|(49043,[22,53,555...|
|       0.0|          0.0|(49043,[167,553,9...|
|       0.0|          2.0|(49043,[7,95,353,...|
|       0.0|          0.0|(49043,[4,942,109...|
|       0.0|          0.0|(49043,[17,287,10...|
|       0.0|          0.0|(49043,[6,21,22,5...|
|       0.0|          0.0|(49043,[21,22,50,...|
|       0.0|          0.0|(49043,[21,22,50,...|
+----------+-------------+--------------------+
only showing top 10 rows



The results show that NaiveBayes model has better performance in comparison with two other models

In [None]:
spark.stop()