# 1.0 Data Collection (Web Crawling)

###### Author: Terence Tiu Chuan Jie 
###### Last Edited:9/4/2025


## 1.Newsdata io

In [2]:
from pyspark.sql import SparkSession

import sys

sys.path.append(r'/home/student/data_collected')


from newsdataapi import NewsDataApiClient
import pickle
import csv  

spark = SparkSession.builder.appName('Newsdata io').getOrCreate()

25/04/09 22:59:27 WARN Utils: Your hostname, tiu. resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/04/09 22:59:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/09 22:59:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/09 22:59:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:

api = NewsDataApiClient(apikey="pub_74815e364b4e931611590c50e5c72985ca80a")


In [4]:
import csv

class NewsFetcher:
    def __init__(self, api, country='us', category='crime', language='en'):
        self.api = api
        self.country = country
        self.category = category
        self.language = language
        self.news_list = []

    def fetch_all_news(self):
        page = None
        while True:
            try:
                response = self.api.news_api(
                    country=self.country,
                    category=self.category,
                    language=self.language,
                    page=page
                )
                news = response.get('results', [])
                self.news_list.extend(news)

                page = response.get('nextPage', None)
                if not page:
                    print("No more pages available.")
                    break
            except Exception as e:
                print(f"Error: {e}")
                break

    def get_news(self):
        return self.news_list


class NewsAnalyzer:
    def __init__(self, news_list):
        self.news_list = news_list

    def display_sample_news(self, count=5):
        for i, news in enumerate(self.news_list[:count]):
            print(f"News {i}: {news}")

    def get_all_keys(self):
        all_keys = set()
        for news in self.news_list:
            all_keys.update(news.keys())
        return all_keys


class NewsExporter:
    def __init__(self, news_list, all_keys):
        self.news_list = news_list
        self.all_keys = list(all_keys)

    def export_to_csv(self, filename='newsdata_io_14_4_test.csv'):
        try:
            with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=self.all_keys)
                writer.writeheader()
                for news in self.news_list:
                    writer.writerow(news)
            print(f"Exported {len(self.news_list)} articles to {filename}")
        except Exception as e:
            print(f"Failed to write CSV: {e}")


# === Usage ===
# `api` is already defined and connected

fetcher = NewsFetcher(api)
fetcher.fetch_all_news()

news_list = fetcher.get_news()
print(f"Total articles retrieved: {len(news_list)}")

analyzer = NewsAnalyzer(news_list)
analyzer.display_sample_news()
all_keys = analyzer.get_all_keys()
print("All keys found in articles:", all_keys)

exporter = NewsExporter(news_list, all_keys)
exporter.export_to_csv()


  response = self.api.news_api(


Error: {'status': 'error', 'results': {'message': 'You exceeded your assigned API credits, please check your plan and billing details.', 'code': 'ApiLimitExceeded'}}
Total articles retrieved: 190
News 0: {'article_id': '7cf5fadd91226f2920449067099412b0', 'title': 'Public Safety Logs — Monday, April 7, 2025', 'link': 'https://www.romesentinel.com/news/public-safety/public-safety-logs-monday-april-7-2025/article_fe6ead05-7304-4aa5-bc11-ba6926b5a761.html', 'keywords': None, 'creator': None, 'description': 'Rome Fire Department log', 'content': 'ONLY AVAILABLE IN PAID PLANS', 'pubDate': '2025-04-09 02:00:00', 'pubDateTZ': 'UTC', 'image_url': None, 'video_url': None, 'source_id': 'romesentinel', 'source_name': 'Romesentinel', 'source_priority': 38985, 'source_url': 'https://www.romesentinel.com', 'source_icon': 'https://i.bytvi.com/domain_icons/romesentinel.jpg', 'language': 'english', 'country': ['united states of america'], 'category': ['crime'], 'sentiment': 'ONLY AVAILABLE IN PROFESSION

In [5]:
spark.stop()

In [None]:
## if u wanna use for pre- preprocessing for here some sample ideal 
import json

for news in news_list:
    for key, value in news.items():
        if isinstance(value, list):
            news[key] = ', '.join(value) if value else None  # Convert list to string
        elif isinstance(value, dict):
            news[key] = json.dumps(value)  # Convert dict to JSON string


In [None]:
all_keys = set()
for news in news_list:
    all_keys.update(news.keys())

for news in news_list:
    for key in all_keys:
        if key not in news:
            news[key] = None  # Assign None to missing fields


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, BooleanType

schema = StructType([
    StructField("article_id", StringType(), True),
    StructField("title", StringType(), True),  
    StructField("link", StringType(), True),
    StructField("keywords", StringType(), True),
    StructField("creator", StringType(), True),
    StructField("video_url", StringType(), True),
    StructField("description", StringType(), True),
    StructField("content", StringType(), True),
    StructField("pubDate", StringType(), True),
    StructField("pubDateTZ", StringType(), True),
    StructField("image_url", StringType(), True),
    StructField("source_id", StringType(), True),
    StructField("source_priority", StringType(), True),
    StructField("source_name", StringType(), True),
    StructField("source_url", StringType(), True),
    StructField("source_icon", StringType(), True),
    StructField("language", StringType(), True),
    StructField("country", StringType(), True),
    StructField("category", StringType(), True),
    StructField("ai_tag", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("sentiment_stats", StringType(), True),
    StructField("ai_region", StringType(), True),
    StructField("ai_org", StringType(), True),
    StructField("duplicate", BooleanType(), True)
])

df = spark.createDataFrame(news_list, schema=schema)
df.show()


## Kafka Producer (news_io_producer.py)

In [6]:
from kafka import KafkaProducer
import json
import time
from newsdataapi import NewsDataApiClient

api = NewsDataApiClient(apikey="pub_74815e364b4e931611590c50e5c72985ca80a")

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def fetch_and_publish_news():
    page = None
    while True:
        response = api.news_api(
            category='crime',
            language='en',
            page=page
        )
        news_list = response.get('results', [])
        for news in news_list:
            # Clean and transform fields
            new_news = {
                "title": news.get("title"),
                "content": news.get("description"),
                "pubDate": news.get("pubDate"),
                "link": news.get("link"),
                "category": news.get("category")
            }
            producer.send('news_topic', new_news)
            print(f"Sent: {new_news.get('title')}")

        page = response.get('nextPage', 1)
        if not page:
            print("No more pages. Waiting for new updates...")
            time.sleep(60)
        else:
            time.sleep(2)


    fetch_and_publish_news()


  response = api.news_api(


Sent: Differing opinions on proposed police training facility
Sent: RIAD Corp. reveals AI-powered hotel group booking platform Ria at ITB Berlin 2025
Sent: CNOOC Brings On-stream Wenchang 9-7 Oilfield Development Project
Sent: Commission launches fresh appeal to help find Disappeared victim Seamus Maguire
Sent: 6 Killed in Upstate NY Plane Crash, Including 2022 NCAA Woman of the Year and Family Members
Sent: Missing 6-year-old South MS girl found dead, Harrison County sheriff confirms
Sent: Temple Police Department investigates shooting; 2 dead
Sent: Police Reports: Multiple assaults, possession reports in Killeen
Sent: Murder, rape of girl (5) shakes Okahandja
Sent: The White House’s 8 Big Tariff FlipFlops Since ‘Liberation Day’
Sent: Alberta government grants funding to Edmonton, Calgary groups to tackle antisemitism
Sent: Norwegian Cruise Line Holdings Announces Development Plans for Private Island Destination, Great Stirrup Cay
Sent: Welnax BioClear Reviews [Urgent Update]: Do Not 

KeyboardInterrupt: 

## for test can be stop by set the 

In [1]:
from kafka import KafkaProducer
import json
import time
from newsdataapi import NewsDataApiClient

api = NewsDataApiClient(apikey="pub_74815e364b4e931611590c50e5c72985ca80a")

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)


def fetch_and_publish_news(max_messages=None):
    page = None
    message_count = 0

    while True:
        response = api.news_api(
            category='crime',
            language='en',
            page=page
        )
        news_list = response.get('results', [])
        for news in news_list:
            new_news = {
                "title": news.get("title"),
                "content": news.get("description"),
                "pubDate": news.get("pubDate"),
                "link": news.get("link"),
                "category": news.get("category")
            }
            producer.send('news_io_topic', new_news)
            print(f"Sent: {new_news.get('title')}")
            message_count += 1

            # Check if we've reached max_messages (if set)
            if max_messages and message_count >= max_messages:
                print(f"Reached limit of {max_messages} messages. Stopping.")
                producer.flush()
                return

        page = response.get('nextPage', None)
        if not page:
            print("No more pages. Waiting for new updates...")
            time.sleep(60)
        else:
            time.sleep(2)

if __name__ == "__main__":
    fetch_and_publish_news(max_messages=12)  # Set limit here (e.g., 20)






  response = api.news_api(


Sent: Unity Grammar School Earns Honorable Mention in Otis' Asia Pacific Regional Made to Move CommunitiesTM Challenge
Sent: Globe joins international telecom leaders in pioneering Green Network study from GSMA Intelligence
Sent: Liberty Gold Provides Update on Spin-Out of Goldstrike/Antimony Ridge Project, Southwest Utah into Specialty American Metals Inc.
Sent: Pioneering Portable Power: BLUETTI Launches the Premium 200 V2 in Australia
Sent: Police Logs 4/14/25
Sent: Differing opinions on proposed police training facility
Sent: RIAD Corp. reveals AI-powered hotel group booking platform Ria at ITB Berlin 2025
Sent: CNOOC Brings On-stream Wenchang 9-7 Oilfield Development Project
Sent: Commission launches fresh appeal to help find Disappeared victim Seamus Maguire
Sent: UPDATE: Missing Hilo Woman and Toddler Found Safe as Hawai'i Island Police Credit Community Efforts
Sent: 6 Killed in Upstate NY Plane Crash, Including 2022 NCAA Woman of the Year and Family Members
Sent: Missing 6-year

In [2]:
from kafka import KafkaConsumer
import json
import csv

class ArticleConsumer:
    def __init__(self, kafka_server='localhost:9092', topic='news_io_topic'):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=kafka_server,
            auto_offset_reset='earliest',
            # auto_offset_reset='latest',
            enable_auto_commit=True,
            value_deserializer=lambda v: json.loads(v.decode('utf-8'))
        )
        
# # Subscribe to a specific topic
# consumer.subscribe(topics=['my-topic'])
    
    def consume_and_save(self, output_file='kafka_new_io_articles_output.csv'):
        print("Consuming messages...")
        fieldnames = ['title', 'content', 'pubDate', 'link','category']  

        with open(output_file, 'w', encoding='utf-8-sig', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()

            for message in self.consumer:
                article = message.value
                writer.writerow(article)
                print(f"Saved: {article['title'][:60]}") 

# Example usage:
consumer = ArticleConsumer()
consumer.consume_and_save()


Consuming messages...
Saved: Pioneering Portable Power: BLUETTI Launches the Premium 200 
Saved: Police Logs 4/14/25
Saved: Differing opinions on proposed police training facility
Saved: RIAD Corp. reveals AI-powered hotel group booking platform R
Saved: CNOOC Brings On-stream Wenchang 9-7 Oilfield Development Pro
Saved: Commission launches fresh appeal to help find Disappeared vi
Saved: UPDATE: Missing Hilo Woman and Toddler Found Safe as Hawai'i
Saved: 6 Killed in Upstate NY Plane Crash, Including 2022 NCAA Woma
Saved: Missing 6-year-old South MS girl found dead, Harrison County
Saved: Temple Police Department investigates shooting; 2 dead
Saved: Pioneering Portable Power: BLUETTI Launches the Premium 200 
Saved: Police Logs 4/14/25
Saved: Differing opinions on proposed police training facility
Saved: RIAD Corp. reveals AI-powered hotel group booking platform R
Saved: CNOOC Brings On-stream Wenchang 9-7 Oilfield Development Pro
Saved: Commission launches fresh appeal to help find Dis

KeyboardInterrupt: 