In [47]:
from pyspark.sql import SparkSession
import re
import json
from pyspark.sql import Row

#### Create a Spark session with 4 parallel CPUs

In [48]:
spark = SparkSession.builder \
    .master("local[4]") \
    .appName("RDD Single Entity Article Pair") \
    .getOrCreate()


#### Define the list of keywords as a constant array


In [49]:
keywords = [
    "modi", "rahul", "jaitley", "sonia", "lalu", "nitish", "farooq", 
    "sushma", "tharoor", "smriti", "mamata", "karunanidhi", "kejriwal", 
    "sidhu", "yogi", "mayawati", "akhilesh", "chandrababu", "chidambaram", 
    "fadnavis", "uddhav", "pawar"
]

#### Function to clean and tokenize text


In [50]:
def tokenize(text):
    text = text.lower()  # Convert to lowercase
    text = re.sub(r'[^a-z]', ' ', text)  # Remove all punctuation and non-alphabetic characters
    text = re.sub(r'( )+', ' ', text)  # Replace multiple spaces with a single space
    text = text.strip()  # Trim leading and trailing spaces
    tokens = text.split()  # Split into words
    return tokens

#### Function to check for keywords in a JSON article

In [51]:
def extract_entity_article_pairs(json_content):
    entities = []
    try:
        article = json.loads(json_content)
        # Combine relevant fields into a single text for tokenization
        combined_text = " ".join([
            article.get('source_article_link', ''),
            article.get('title', ''),
            article.get('date_published', ''),
            article.get('source_type', ''),
            article.get('source_name', ''),
            " ".join(article.get('image_link', [])),  # If image_link is a list, join it as a string
            article.get('article_body', ''),
            article.get('description', ''),
            article.get('time_scraped', ''),
            article.get('article_id', 'unknown'),  # Ensure article_id exists or set as 'unknown'
            article.get('article_category', ''),
            " ".join([img.get('url', '') for img in article.get('image_details', [])]),  # Join image URLs
            " ".join([img.get('path', '') for img in article.get('image_details', [])]),  # Join image paths
            " ".join([img.get('checksum', '') for img in article.get('image_details', [])]),  # Join checksums            
        ]).lower()

        tokens = tokenize(combined_text)
        
        article_id = article.get('article_id', 'unknown')  # Ensure article_id exists or set as 'unknown'

        # Check if any of the tokens match the keywords
        for token in tokens:
            if token in keywords:
                entities.append((token, article_id))
    except json.JSONDecodeError:
        pass  # Skip if JSON is invalid

    return entities

#### Create an RDD by reading the JSON files

In [52]:
input_path = "./given/Resources/newsdata/"

rdd = spark.sparkContext.wholeTextFiles(input_path)

#### Process each JSON file, extract entity-article pairs

In [53]:
entity_article_rdd = rdd.flatMap(lambda x: extract_entity_article_pairs(x[1]))

#### Remove duplicates

In [54]:
distinct_entity_article_pairs = entity_article_rdd.distinct()

#### Show the result

In [55]:
for result in distinct_entity_article_pairs.collect():
    print(result)



('jaitley', '64511576')
('tharoor', '5039501')
('rahul', '64221926')
('modi', '64221926')
('modi', '4982051')
('modi', '24693568')
('smriti', '4982291')
('modi', '4982291')
('yogi', '4996051')
('modi', '64638187')
('rahul', '5037681')
('modi', '5037681')
('sushma', '5037681')
('sonia', '65352506')
('modi', '65231395')
('akhilesh', '64498585')
('mamata', '4996291')
('modi', '4504521')
('modi', '65415324')
('modi', '64510931')
('modi', '24698263')
('sushma', '64510603')
('mayawati', '24698318')
('modi', '64647908')
('fadnavis', '4981941')
('akhilesh', '4981941')
('modi', '65231652')
('nitish', '65242507')
('modi', '64794910')
('modi', '4970971')
('sushma', '64991025')
('sonia', '4974611')
('rahul', '64486731')
('modi', '64486731')
('modi', '65304355')
('rahul', '65304355')
('modi', '64172937')
('rahul', '64172937')
('mamata', '64172937')
('modi', '64907468')
('mamata', '64172658')
('rahul', '64161441')
('modi', '64492632')
('sidhu', '65233934')
('smriti', '24719616')
('modi', '24719616')

                                                                                

<h4 style="color: red;"> Convert the RDD to a DataFrame </h4>

In [56]:
# We map each RDD record to a Row object
entity_article_df = distinct_entity_article_pairs.map(lambda x: Row(entity=x[0], article_id=x[1])).toDF()

# Step 2: Show the resulting DataFrame
entity_article_df.show(n=entity_article_df.count(), truncate=False)

                                                                                

+-----------+----------+
|entity     |article_id|
+-----------+----------+
|jaitley    |64511576  |
|tharoor    |5039501   |
|rahul      |64221926  |
|modi       |64221926  |
|modi       |4982051   |
|modi       |24693568  |
|smriti     |4982291   |
|modi       |4982291   |
|yogi       |4996051   |
|modi       |64638187  |
|rahul      |5037681   |
|modi       |5037681   |
|sushma     |5037681   |
|sonia      |65352506  |
|modi       |65231395  |
|akhilesh   |64498585  |
|mamata     |4996291   |
|modi       |4504521   |
|modi       |65415324  |
|modi       |64510931  |
|modi       |24698263  |
|sushma     |64510603  |
|mayawati   |24698318  |
|modi       |64647908  |
|fadnavis   |4981941   |
|akhilesh   |4981941   |
|modi       |65231652  |
|nitish     |65242507  |
|modi       |64794910  |
|modi       |4970971   |
|sushma     |64991025  |
|sonia      |4974611   |
|rahul      |64486731  |
|modi       |64486731  |
|modi       |65304355  |
|rahul      |65304355  |
|modi       |64172937  |


#### Stop the Spark session

In [57]:
spark.stop()