In [1]:
import logging, os, sys
from datetime import datetime, timedelta
from modules import get_latest_partition_datetime
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
timestamp = (datetime.utcnow() + timedelta(hours=7)).strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(log_dir, f"Comments_{timestamp}.log")
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file, mode='a'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger()

✅ Latest timestamp found: 2025-07-19 04:36:58
2025-07-19 04:36:58


In [None]:
logger.info("------------------COMMENTS DATA QUALITY------------------")
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Read from MinIO via s3a") \
    .getOrCreate()
commentsdf = spark.read.json("s3a://raw/comments/")
logger.info(commentsdf.schema.simpleString())
commentsdf.printSchema()

2025-07-20 03:40:14,450 - INFO - ------------------NEWS DATA QUALITY------------------
2025-07-20 03:40:25,390 - INFO - struct<author:string,published:string,source:string,text:string,title:string,url:string>
root
 |-- author: string (nullable = true)
 |-- published: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)



In [3]:
#Checking null
from pyspark.sql.functions import col, count
num_of_null_title = commentsdf.filter((col("title").isNull()) | (col("title") == "")).count()
logger.info(f"Title columns WHICH IS NULL:{num_of_null_title}")

num_of_null_url = commentsdf.filter(col("url").isNull()).count()
logger.info(f"Url columns WHICH IS NULL:{num_of_null_url}")

num_of_null_id = commentsdf.filter(col("id").isNull()).count()
logger.info(f"ID columns WHICH IS NULL:{num_of_null_id}")

num_of_null_selftext = commentsdf.filter(col("selftext").isNull()).count()
logger.info(f"Selftext columns WHICH IS NULL:{num_of_null_selftext}")

num_of_null_score = commentsdf.filter(col("score").isNull()).count()
logger.info(f"Score columns WHICH IS NULL:{num_of_null_score}")

2025-07-20 03:37:31,455 - INFO - Text columns WHICH IS NULL:137
2025-07-20 03:37:31,887 - INFO - Published columns WHICH IS NULL:0


In [4]:
#Checking row count
num_of_row = commentsdf.count()
logging.info(f"ROW COUNT:{num_of_row}")

2025-07-20 03:37:35,213 - INFO - ROW COUNT:733


In [None]:
#Checking duplicated news
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

spec = Window.partitionBy("id").orderBy("id")
commentsdf_with_duplicated = commentsdf.withColumn("duplicated_id", row_number().over(spec))
record_duplicated = commentsdf_with_duplicated.filter(col("duplicated_id") > 1).select(col('id'), col('title'))
record_duplicated_select = record_duplicated.collect()
num_record_duplicated = record_duplicated.count()
logging.info(f"NUMBER of DUPLICATED COMMENTS: {num_record_duplicated}")
logging.info("DUPLICATED COMMENTS:")
for row in record_duplicated_select:
    logger.info(f"{str(row['id'])} - {str(row['title'])}")

#New df without duplicated url
commentsdf = commentsdf_with_duplicated.filter(col("duplicated_id") == 1).drop("duplicated_id")

2025-07-20 03:37:37,968 - INFO - NUMBER of DUPLICATED NEWS: 3
2025-07-20 03:37:37,971 - INFO - DUPLICATED NEWS:
2025-07-20 03:37:37,974 - INFO - https://www.bbc.com/news/articles/c0k7enxkxndo - What is an Isa and how might the rules change?
2025-07-20 03:37:37,983 - INFO - https://www.bbc.com/news/articles/clyndp097gro - Package holidays to Spain, Cyprus and Turkey soar in price
2025-07-20 03:37:37,986 - INFO - https://www.bbc.com/news/articles/cq6mvn699v9o - When to book and where to stay: Six ways to save money on your summer holiday


In [None]:
#Checking distinct value
for column in commentsdf.columns:
    n_unique = commentsdf.select(col(column)).distinct().count()
    logger.info(f"UNIQUE VALUE of {column}: {n_unique}")

2025-07-20 03:37:40,899 - INFO - UNIQUE VALUE of author: 109
2025-07-20 03:37:41,342 - INFO - UNIQUE VALUE of published: 608
2025-07-20 03:37:41,761 - INFO - UNIQUE VALUE of source: 13
2025-07-20 03:37:42,173 - INFO - UNIQUE VALUE of text: 588
2025-07-20 03:37:42,589 - INFO - UNIQUE VALUE of title: 719
2025-07-20 03:37:42,906 - INFO - UNIQUE VALUE of url: 730


In [None]:
#Checking datatypes
for column, dtype in commentsdf.dtypes:
    logger.info(f"DATATYPE Column {column}: {dtype}")

2025-07-20 03:37:44,160 - INFO - DATATYPE Column author: string
2025-07-20 03:37:44,167 - INFO - DATATYPE Column published: string
2025-07-20 03:37:44,172 - INFO - DATATYPE Column source: string
2025-07-20 03:37:44,180 - INFO - DATATYPE Column text: string
2025-07-20 03:37:44,183 - INFO - DATATYPE Column title: string
2025-07-20 03:37:44,186 - INFO - DATATYPE Column url: string


In [None]:
#checking date
from pyspark.sql.functions import to_timestamp, to_date, min, max, udf
from pyspark.sql.types import TimestampType
from datetime import datetime, timedelta

@udf(returnType=TimestampType())
def clean_and_parse_published(published):
    try:
        if published is None:
            return None
        return datetime.utcfromtimestamp(int(published))
    except Exception as e:
        logger.warning(f"Failed to parse: {published} | Error: {e}")
        return None
    
commentsdf = commentsdf.withColumn("created_utc", clean_and_parse_published(col("created_utc")))
commentsdf.show(20)
min_date_row = commentsdf.agg(min(to_date(col("created_utc"))).alias("min_date")).collect()[0]
min_date = min_date_row["min_date"]
logger.info(f"MIN DATE: {min_date}")

max_date_row = commentsdf.agg(max(to_date(col("created_utc"))).alias("max_date")).collect()[0]
max_date = max_date_row["max_date"]
logger.info(f"MAX DATE: {max_date}")

# #Eliminate old published from nearest scraped timestamp to now
# latest_update = get_latest_partition_datetime("raw", "comments")
# today = datetime.today()
# print(today)

# commentsdf = commentsdf.filter((col('created_utc') > latest_update) & (col('created_utc') <= today))
# sumcount = commentsdf.count()
# logger.info(f"SUM OF RECORD after filtering TIMESTAMP: {sumcount}")

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|              author|           published|              source|                text|               title|                 url|       published_ts|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|Lauren Aratani in...|Thu, 10 Jul 2025 ...|https://www.thegu...|As temperatures s...|US shoppers feel ...|https://www.thegu...|2025-07-10 09:00:15|
|Dan Milmo Global ...|Thu, 10 Jul 2025 ...|https://www.thegu...|Four people inclu...|Four people arres...|https://www.thegu...|2025-07-10 11:13:06|
|    Joanna Partridge|Thu, 10 Jul 2025 ...|https://www.thegu...|Bad management of...|Government inheri...|https://www.thegu...|2025-07-10 23:01:51|
|        Nils Pratley|Thu, 10 Jul 2025 ...|https://www.thegu...|The chief executi...|Zonal pricing is ...|https:

In [None]:
# Build model reflecting which content is related to domain topics.