In [1]:
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import pyspark
import pandas as pd
import tensorflow as tf
from google.colab import files


from pyspark.sql import SparkSession

from pyspark import SparkContext


In [3]:
import warnings
warnings.filterwarnings('ignore')

In [4]:
spark_context_sc = SparkContext(master='local[3]')
spark_session = SparkSession.builder.appName("Categorizer").getOrCreate()

In [5]:
from google.colab import files
upload = files.upload()

In [6]:
df = spark_session.read.json("CategorizedNews.json")
df.head()

Row(authors='Melissa Jeltsen', category='CRIME', date='2018-05-26', headline='There Were 2 Mass Shootings In Texas Last Week, But Only 1 On TV', link='https://www.huffingtonpost.com/entry/texas-amanda-painter-mass-shooting_us_5b081ab4e4b0802d69caad89', short_description='She left her husband. He killed their children. Just another day in America.')

In [7]:
df.count()

200853

In [8]:
df = df.limit(10000)
df.count()

10000

In [9]:
df.groupby('category').count().sort('count', ascending=False).show()


+-------------+-----+
|     category|count|
+-------------+-----+
|     POLITICS| 3604|
|ENTERTAINMENT| 1906|
|   WORLD NEWS|  683|
| QUEER VOICES|  512|
|       COMEDY|  495|
| BLACK VOICES|  443|
|       SPORTS|  382|
|        MEDIA|  329|
|        WOMEN|  283|
|   WEIRD NEWS|  242|
|        CRIME|  201|
|     BUSINESS|  112|
|LATINO VOICES|  105|
|       IMPACT|   86|
|       TRAVEL|   76|
|     RELIGION|   76|
|        STYLE|   70|
|      PARENTS|   66|
|        GREEN|   66|
|         TECH|   65|
+-------------+-----+
only showing top 20 rows



In [10]:
from pyspark.sql import functions as fun
df = df.withColumn('description', fun.concat(fun.col('headline'),fun.lit(' '), fun.col('short_description')))

In [11]:
df = df.drop("headline", "link", "authors", "short_description", "date")
df.show()

+-------------+--------------------+
|     category|         description|
+-------------+--------------------+
|        CRIME|There Were 2 Mass...|
|ENTERTAINMENT|Will Smith Joins ...|
|ENTERTAINMENT|Hugh Grant Marrie...|
|ENTERTAINMENT|Jim Carrey Blasts...|
|ENTERTAINMENT|Julianna Margulie...|
|ENTERTAINMENT|Morgan Freeman 'D...|
|ENTERTAINMENT|Donald Trump Is L...|
|ENTERTAINMENT|What To Watch On ...|
|ENTERTAINMENT|Mike Myers Reveal...|
|ENTERTAINMENT|What To Watch On ...|
|ENTERTAINMENT|Justin Timberlake...|
|   WORLD NEWS|South Korean Pres...|
|       IMPACT|With Its Way Of L...|
|     POLITICS|Trump's Crackdown...|
|     POLITICS|'Trump's Son Shou...|
|     POLITICS|Edward Snowden: T...|
|     POLITICS|Booyah: Obama Pho...|
|     POLITICS|Ireland Votes To ...|
|     POLITICS|Ryan Zinke Looks ...|
|     POLITICS|Trump's Scottish ...|
+-------------+--------------------+
only showing top 20 rows



In [12]:
from pyspark.ml.feature import IDF
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover


df.columns

['category', 'description']

In [13]:
import pyspark.sql.functions as check

df = df.where(check.col('description').isNotNull())
df.show()

+-------------+--------------------+
|     category|         description|
+-------------+--------------------+
|        CRIME|There Were 2 Mass...|
|ENTERTAINMENT|Will Smith Joins ...|
|ENTERTAINMENT|Hugh Grant Marrie...|
|ENTERTAINMENT|Jim Carrey Blasts...|
|ENTERTAINMENT|Julianna Margulie...|
|ENTERTAINMENT|Morgan Freeman 'D...|
|ENTERTAINMENT|Donald Trump Is L...|
|ENTERTAINMENT|What To Watch On ...|
|ENTERTAINMENT|Mike Myers Reveal...|
|ENTERTAINMENT|What To Watch On ...|
|ENTERTAINMENT|Justin Timberlake...|
|   WORLD NEWS|South Korean Pres...|
|       IMPACT|With Its Way Of L...|
|     POLITICS|Trump's Crackdown...|
|     POLITICS|'Trump's Son Shou...|
|     POLITICS|Edward Snowden: T...|
|     POLITICS|Booyah: Obama Pho...|
|     POLITICS|Ireland Votes To ...|
|     POLITICS|Ryan Zinke Looks ...|
|     POLITICS|Trump's Scottish ...|
+-------------+--------------------+
only showing top 20 rows



In [14]:
token_create = Tokenizer(inputCol='description',outputCol='tokens')
stopwordRemove = StopWordsRemover(inputCol='tokens',outputCol='removed_stopwords')
vector_create = CountVectorizer(inputCol='removed_stopwords',outputCol='extracted_features')
idf = IDF(inputCol='extracted_features',outputCol='vectorizedFeatures')

In [15]:
label_encoding = StringIndexer(inputCol='category',outputCol='category_number').fit(df)
label_encoding.transform(df).show(10)

+-------------+--------------------+---------------+
|     category|         description|category_number|
+-------------+--------------------+---------------+
|        CRIME|There Were 2 Mass...|           10.0|
|ENTERTAINMENT|Will Smith Joins ...|            1.0|
|ENTERTAINMENT|Hugh Grant Marrie...|            1.0|
|ENTERTAINMENT|Jim Carrey Blasts...|            1.0|
|ENTERTAINMENT|Julianna Margulie...|            1.0|
|ENTERTAINMENT|Morgan Freeman 'D...|            1.0|
|ENTERTAINMENT|Donald Trump Is L...|            1.0|
|ENTERTAINMENT|What To Watch On ...|            1.0|
|ENTERTAINMENT|Mike Myers Reveal...|            1.0|
|ENTERTAINMENT|What To Watch On ...|            1.0|
+-------------+--------------------+---------------+
only showing top 10 rows



In [16]:
# Assume label_encoding is your StringIndexer object
transformed_df = label_encoding.transform(df)

# Get unique categories
unique_categories = transformed_df.select('category_number', 'category').distinct().collect()
#name = transformed_df.select('category').distinct().collect()

# Print the unique categories
for row in unique_categories:
    print("Category Number", row.category_number, " Category Name", row.category)

Category Number 10.0  Category Name CRIME
Category Number 1.0  Category Name ENTERTAINMENT
Category Number 2.0  Category Name WORLD NEWS
Category Number 13.0  Category Name IMPACT
Category Number 0.0  Category Name POLITICS
Category Number 9.0  Category Name WEIRD NEWS
Category Number 5.0  Category Name BLACK VOICES
Category Number 8.0  Category Name WOMEN
Category Number 4.0  Category Name COMEDY
Category Number 3.0  Category Name QUEER VOICES
Category Number 6.0  Category Name SPORTS
Category Number 11.0  Category Name BUSINESS
Category Number 15.0  Category Name TRAVEL
Category Number 7.0  Category Name MEDIA
Category Number 19.0  Category Name TECH
Category Number 14.0  Category Name RELIGION
Category Number 21.0  Category Name SCIENCE
Category Number 12.0  Category Name LATINO VOICES
Category Number 22.0  Category Name EDUCATION
Category Number 25.0  Category Name COLLEGE
Category Number 18.0  Category Name PARENTS
Category Number 24.0  Category Name ARTS & CULTURE
Category Number

In [17]:
label_encoding.labels


['POLITICS',
 'ENTERTAINMENT',
 'WORLD NEWS',
 'QUEER VOICES',
 'COMEDY',
 'BLACK VOICES',
 'SPORTS',
 'MEDIA',
 'WOMEN',
 'WEIRD NEWS',
 'CRIME',
 'BUSINESS',
 'LATINO VOICES',
 'IMPACT',
 'RELIGION',
 'TRAVEL',
 'STYLE',
 'GREEN',
 'PARENTS',
 'TECH',
 'HEALTHY LIVING',
 'SCIENCE',
 'EDUCATION',
 'TASTE',
 'ARTS & CULTURE',
 'COLLEGE']

In [18]:
label_dict = {
    0.0: 'POLITICS',
    1.0: 'ENTERTAINMENT',
    2.0: 'WORLD NEWS',
    3.0: 'QUEER VOICES',
    4.0: 'COMEDY',
    5.0: 'BLACK VOICES',
    6.0: 'SPORTS',
    13.0: 'MEDIA',
    8.0: 'WOMEN',
    10.0: 'CRIME',
    11.0: 'BUSINESS',
    12.0: 'LATINO VOICES',
    13.0: 'IMPACT',
    14.0: 'RELIGION',
    15.0: 'TRAVEL',
    16.0: 'STYLE',
    18.0: 'PARENTS',
    19.0: 'TECH',
    20.0: 'HEALTHY LIVING',
    22.0: 'EDUCATION',
    23.0: 'TASTE',
    25.0: 'COLLEGE',
    17.0: 'GREEN',
    9.0: 'WEIRD NEWS',
    24.0: 'ARTS $ CULTURE',
    21.0: 'SCIENCE',
    7.0: 'MEDIA'
}


In [19]:
df = label_encoding.transform(df)
df.show(100)


+-------------+--------------------+---------------+
|     category|         description|category_number|
+-------------+--------------------+---------------+
|        CRIME|There Were 2 Mass...|           10.0|
|ENTERTAINMENT|Will Smith Joins ...|            1.0|
|ENTERTAINMENT|Hugh Grant Marrie...|            1.0|
|ENTERTAINMENT|Jim Carrey Blasts...|            1.0|
|ENTERTAINMENT|Julianna Margulie...|            1.0|
|ENTERTAINMENT|Morgan Freeman 'D...|            1.0|
|ENTERTAINMENT|Donald Trump Is L...|            1.0|
|ENTERTAINMENT|What To Watch On ...|            1.0|
|ENTERTAINMENT|Mike Myers Reveal...|            1.0|
|ENTERTAINMENT|What To Watch On ...|            1.0|
|ENTERTAINMENT|Justin Timberlake...|            1.0|
|   WORLD NEWS|South Korean Pres...|            2.0|
|       IMPACT|With Its Way Of L...|           13.0|
|     POLITICS|Trump's Crackdown...|            0.0|
|     POLITICS|'Trump's Son Shou...|            0.0|
|     POLITICS|Edward Snowden: T...|          

In [20]:
from pyspark.sql.functions import rand

df = df.orderBy(rand(seed=21))
train_df = df.limit(int(df.count() * 0.70))
test_df = df.exceptAll(train_df)


In [21]:
#from pyspark.ml.classification import DecisionTreeClassifier
#dt = DecisionTreeClassifier(featuresCol='vectorizedFeatures',labelCol='category_number')

from pyspark.ml.classification import LogisticRegression
logistic_reg = LogisticRegression(featuresCol='vectorizedFeatures',labelCol='category_number')


In [22]:
from pyspark.ml import Pipeline
cat_pipeline = Pipeline(stages = [token_create, stopwordRemove, vector_create,idf,logistic_reg])

In [23]:
cat_pipeline.stages


Param(parent='Pipeline_7e1f569c1829', name='stages', doc='a list of pipeline stages')

In [24]:
logistic_model = cat_pipeline.fit(train_df)


In [25]:
logistic_model
preds = logistic_model.transform(test_df)

In [26]:
preds.columns

['category',
 'description',
 'category_number',
 'tokens',
 'removed_stopwords',
 'extracted_features',
 'vectorizedFeatures',
 'rawPrediction',
 'probability',
 'prediction']

In [27]:
preds.select('description','rawPrediction', 'probability','category','category_number','prediction').show(50)

+--------------------+--------------------+--------------------+-------------+---------------+----------+
|         description|       rawPrediction|         probability|     category|category_number|prediction|
+--------------------+--------------------+--------------------+-------------+---------------+----------+
|Trump Claims With...|[8.35310736422872...|[0.36156315470111...|     POLITICS|            0.0|       1.0|
|94 Percent Of Hol...|[-0.5695281958545...|[0.00163890651875...|ENTERTAINMENT|            1.0|       1.0|
|Trump Threatens A...|[11.1176599704702...|[0.11389348926235...|   WORLD NEWS|            2.0|       2.0|
|This Ex-NFL Playe...|[1.93307018838857...|[0.01901316292647...|       SPORTS|            6.0|       1.0|
|Black Parkland St...|[5.80949539195597...|[0.13252920793733...| BLACK VOICES|            5.0|       5.0|
|Laura Ingraham An...|[-5.3165958182414...|[2.12902949132581...|        MEDIA|            7.0|       7.0|
|A Generation Of H...|[8.87330457160014...|[0.

In [28]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='category_number')

evaluator.evaluate(preds)*100

59.93828170582442

In [29]:
from pyspark.mllib.evaluation import MulticlassMetrics
logistic_reg_metric = MulticlassMetrics(preds['category_number','prediction'].rdd)


In [30]:
print("Accuracy ", logistic_reg_metric.accuracy)
print("precision ", logistic_reg_metric.precision(1.0))
print("f1Score ", logistic_reg_metric.fMeasure(1.0))
print("recall ", logistic_reg_metric.recall(1.0))

Accuracy  0.628
precision  0.7455516014234875
f1Score  0.6736334405144694
recall  0.6143695014662757


We have built the model. Now using te model, we categorize the news from our dataset
**bold text**

In [31]:
df2 = spark_session.read.csv("new_dataset.csv")
df2.show(5)

+--------------------+--------------------+-----------------+--------------------+--------------------+----------+--------+
|                 _c0|                 _c1|              _c2|                 _c3|                 _c4|       _c5|     _c6|
+--------------------+--------------------+-----------------+--------------------+--------------------+----------+--------+
|                 url|               title|DomainCountryCode|            location|     contextual_text|state_code|category|
|https://thedailyr...|Is tipping gettin...|               US|Cornell Universit...|about choosing th...|        NY|    null|
|https://www.bicma...|U . S . natural g...|             null|North Dakota, Uni...|exports of lng wh...|        ND|    null|
|https://news.prai...|ND COVID - 19 : 5...|               US|North Dakota, Uni...|dakota health and...|        ND|    null|
|https://www.chesh...|cheshireherald . ...|               US|Times Square, New...|york city time sq...|        NY|    null|
+-------

In [32]:
from pyspark.sql.functions import *

# Read CSV file with custom column names
df2 = spark_session.read.csv("new_dataset.csv", header=True, inferSchema=True).toDF(*["_c0", "_c1", "_c2","_c3","_c4","_c5","_c6"])

# Rename columns to match Pandas DataFrame column names
df2 = df2.withColumnRenamed("_c0", "url")\
       .withColumnRenamed("_c1", "title")\
       .withColumnRenamed("_c2", "DomainCountryCode")\
       .withColumnRenamed("_c3", "location")\
       .withColumnRenamed("_c4", "contextual_text")\
       .withColumnRenamed("_c5", "state_code")\
       .withColumnRenamed("_c6", "category")


In [33]:
df2.show()

+--------------------+--------------------+-----------------+--------------------+--------------------+----------+--------+
|                 url|               title|DomainCountryCode|            location|     contextual_text|state_code|category|
+--------------------+--------------------+-----------------+--------------------+--------------------+----------+--------+
|https://thedailyr...|Is tipping gettin...|               US|Cornell Universit...|about choosing th...|        NY|    null|
|https://www.bicma...|U . S . natural g...|             null|North Dakota, Uni...|exports of lng wh...|        ND|    null|
|https://news.prai...|ND COVID - 19 : 5...|               US|North Dakota, Uni...|dakota health and...|        ND|    null|
|https://www.chesh...|cheshireherald . ...|               US|Times Square, New...|york city time sq...|        NY|    null|
|https://www.messe...|CVB gets $5 . 46 ...|               US|Owensboro, Kentuc...|county tourism ec...|        KY|    null|
|https:/

In [34]:
#df2 = df2.limit(1000)
#df2.count()

In [35]:
from pyspark.sql import functions as sf2
df2 = df2.withColumn('description', sf2.concat(sf2.col('title'),sf2.lit(' '), sf2.col('contextual_text')))

df2 = df2.select("description","category")
df2.show()

+--------------------+--------+
|         description|category|
+--------------------+--------+
|Is tipping gettin...|    null|
|U . S . natural g...|    null|
|ND COVID - 19 : 5...|    null|
|cheshireherald . ...|    null|
|CVB gets $5 . 46 ...|    null|
|Free Ride : Trans...|    null|
|Providence establ...|    null|
|Halberd Corporati...|    null|
|New Under the Sun...|    null|
|Headlines for Tue...|    null|
|OPEC+ likely to m...|    null|
|Three in custody ...|    null|
|COVID - 19 Sore T...|    null|
|Heath commissione...|    null|
|Minnesota county ...|    null|
|CHI Memorial clos...|    null|
|Nurses Strike New...|    null|
|Facing the challe...|    null|
|Booker calls on E...|    null|
|GoLocalProv | Tho...|    null|
+--------------------+--------+
only showing top 20 rows



In [36]:
prediction_data = logistic_model.transform(df2)

In [37]:
prediction_data.columns


['description',
 'category',
 'tokens',
 'removed_stopwords',
 'extracted_features',
 'vectorizedFeatures',
 'rawPrediction',
 'probability',
 'prediction']

In [38]:
prediction_data.select('description','rawPrediction', 'probability','category','prediction').show(100)


+--------------------+--------------------+--------------------+--------+----------+
|         description|       rawPrediction|         probability|category|prediction|
+--------------------+--------------------+--------------------+--------+----------+
|Is tipping gettin...|[22.6029391945904...|[0.99999858811471...|    null|       0.0|
|U . S . natural g...|[-5.0522888037594...|[2.71276754279872...|    null|       2.0|
|ND COVID - 19 : 5...|[16.1275007382884...|[0.03037190412818...|    null|      10.0|
|cheshireherald . ...|[-6.6327148433153...|[2.23031363604884...|    null|      10.0|
|CVB gets $5 . 46 ...|[2.90814275652290...|[2.24853210044316...|    null|       2.0|
|Free Ride : Trans...|[23.5836248653638...|[0.99999955283581...|    null|       0.0|
|Providence establ...|[13.0958445069729...|[0.96900269601528...|    null|       0.0|
|Halberd Corporati...|[4.90768940942493...|[0.27746342822439...|    null|      20.0|
|New Under the Sun...|[10.2804373190422...|[0.63407430552902...| 

In [39]:
#predictions_data.write.format("csv").option("header", "true").mode("overwrite").save("predictions.csv")
prediction_data.write.format('json').mode("overwrite").save('path/to/output/file.json')

In [40]:
import pandas as pd

predictions_data_pd = prediction_data.select("prediction").toPandas()

predictions_data_pd.to_csv("path/to/output/prediction_cat.csv", index=False)

In [41]:
from google.colab import files
files.download('path/to/output/prediction_cat.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>