In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import re

spark = SparkSession.builder.appName("NewsETL").getOrCreate()

with open(r"/content/talk.politics.mideast.txt", "r", errors="ignore") as f:
    a = f.read()

a = re.sub(r"\)\s*\n\s*writes:", ") writes:", a, flags=re.IGNORECASE)
lines = a.splitlines()

records = []
record = {}
body_lines = []
collect_body = False



for line in lines:
    if line.startswith("Newsgroup:"):
        #  previous record
        if record:
            if "In article" not in record:
                record["In article"] = None
            if "Newsgroup" not in record:
                record["Newsgroup"] = None
            if "document_id" not in record:
                record["document_id"] = None
            if "From" not in record:
                record["From"] = None
            if "Subject" not in record:
                record["Subject"] = None

            record["Text"] = " ".join(body_lines).strip()
            records.append(record)
            record, body_lines = {}, []
            collect_body = False

        record["Newsgroup"] = line.replace("Newsgroup:", "").strip()
        collect_body = True

    elif line.startswith("document_id:"):
        record["document_id"] = line.replace("document_id:", "").strip()
        collect_body = True

    elif line.startswith("From:"):
        record["From"] = line.replace("From:", "").strip()
        collect_body = True

    elif line.startswith("Subject:"):
        record["Subject"] = line.replace("Subject:", "").strip()
        collect_body = True

    elif re.search(r"(^>*\s*in article|^>*\s*in\s*<)", line.lower()) or line.endswith("writes:") or line.endswith("writes..."):
      if "In article" not in record or record["In article"] is None:
        record["In article"] = ""
      if record["In article"]:
        record["In article"] += " " + line.strip()
      else:
        record["In article"] = line.strip()
        collect_body = True
    else:
        if collect_body:
            body_lines.append(line)

#  last record
if record:
    if "In article" not in record:
        record["In article"] = None
    if "Newsgroup" not in record:
        record["Newsgroup"] = None
    if "document_id" not in record:
        record["document_id"] = None
    if "From" not in record:
        record["From"] = None
    if "Subject" not in record:
        record["Subject"] = None
    record["Text"] = " ".join(body_lines).strip()
    records.append(record)


# Create DataFrame with explicit schema
schema = StructType([
    StructField("Newsgroup", StringType(), True),
    StructField("document_id", StringType(), True),
    StructField("From", StringType(), True),
    StructField("Subject", StringType(), True),
    StructField("In_article", StringType(), True),
    StructField("Text", StringType(), True)
])

# Create DataFrame
df_clean = spark.createDataFrame([Row(**rec) for rec in records], schema=schema)

#print("Clean DataFrame schema:")
#df_clean.printSchema()
#print("Clean DataFrame sample:")
#df_clean.show(2, truncate=False)

df_clean.write.format("parquet").mode("overwrite").save("/content/bronze_clean")

df_bronze_clean = spark.read.format("parquet").load("/content/bronze_clean")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import re

spark = SparkSession.builder.appName("NewsETL").getOrCreate()

with open(r"/content/talk.politics.mideast.txt", "r", errors="ignore") as f:
    a = f.read()

a = re.sub(r"\)\s*\n\s*writes:", ") writes:", a, flags=re.IGNORECASE)
lines = a.splitlines()

records = []
record = {}
body_lines = []
collect_body = False

for i, line in enumerate(lines):
    line = line.strip()

    if not line:
        continue

    if line.startswith("Newsgroup:"):
        # previous record
        if record:
            if "In article" not in record:
                record["In article"] = None
            if "Newsgroup" not in record:
                record["Newsgroup"] = None
            if "document_id" not in record:
                record["document_id"] = None
            if "From" not in record:
                record["From"] = None
            if "Subject" not in record:
                record["Subject"] = None

            record["Text"] = " ".join(body_lines).strip()
            records.append(record)
            record, body_lines = {}, []
            collect_body = False

        record["Newsgroup"] = line.replace("Newsgroup:", "").strip()
        collect_body = True

    elif line.startswith("document_id:"):
        record["document_id"] = line.replace("document_id:", "").strip()
        collect_body = True

    elif line.startswith("From:"):
        record["From"] = line.replace("From:", "").strip()
        collect_body = True

    elif line.startswith("Subject:"):
        record["Subject"] = line.replace("Subject:", "").strip()
        collect_body = True

    elif re.search(r"(^>*\s*in article|^>*\s*in\s*<)", line.lower()) or line.endswith("writes:") or line.endswith("writes..."):
        if "In article" not in record or record["In article"] is None:
            record["In article"] = ""
        if record["In article"]:
            record["In article"] += " " + line.strip()
        else:
            record["In article"] = line.strip()
        collect_body = True
    else:
        if collect_body:   # collect body lines
            body_lines.append(line)

# last record
if record:
    if "In article" not in record:
        record["In article"] = None
    if "Newsgroup" not in record:
        record["Newsgroup"] = None
    if "document_id" not in record:
        record["document_id"] = None
    if "From" not in record:
        record["From"] = None
    if "Subject" not in record:
        record["Subject"] = None
    record["Text"] = " ".join(body_lines).strip()
    records.append(record)

schema = StructType([
    StructField("Newsgroup", StringType(), True),
    StructField("document_id", StringType(), True),
    StructField("From", StringType(), True),
    StructField("Subject", StringType(), True),
    StructField("In_article", StringType(), True),
    StructField("Text", StringType(), True)
])

# Create DataFrame
df_clean = spark.createDataFrame([Row(**rec) for rec in records], schema=schema)

#print("Clean DataFrame schema:")
#df_clean.printSchema()
#print("Clean DataFrame sample:")
#df_clean.show(2, truncate=False)

df_clean.write.format("parquet").mode("overwrite").save("/content/ETL/bronze_clean")

df_bronze_clean = spark.read.format("parquet").load("/content/ETL/bronze_clean")



In [None]:
from pyspark.sql.functions import col, regexp_replace, trim

df_silver_clean = df_bronze_clean.withColumn    ("In_article",coalesce(col("In_article"), lit("NA"))).withColumn(
    "In_articles_clean",
    trim(
        regexp_replace(
            col("In_article"),
            r"(?i)(>+|\|>|>IN|>>IN article|>\||#|\s*In article\s*|writes:|writes\.\.\.)",
            ""
        )
    )
).withColumn(
    "Text_clean",
    trim(
        regexp_replace(
            col("Text"),
            r"(>+|\s*>+\s*|\|>|>\||\^|#|:+)",
            ""
        )
    )
).withColumn("Data_ingested_date",current_timestamp())

df_silver_clean.write.format("parquet").mode("overwrite").save("/content/ETL/silver_clean")

#df_silver.select("document_id", "In_article", "In_articles_clean").show(100,truncate=False)
#df_silver.select("document_id", "Text", "Text_clean",).show(1,truncate=False)
#df_silver_clean.show(1,truncate=False)
#df_silver.filter(col("document_id")=="75886").select("document_id", "Text", "Text_clean",).show(100,truncate=False)
#df_silver.filter (col("In_article")=="NA").count()

df_silver_clean.write.format("parquet").mode("overwrite").save("/content/ETL/Gold")



In [None]:
df_gold = spark.read.format("parquet").load("/content/ETL/Gold")
df_gold.show(1,truncate=False)

+---------------------+-----------+-----------------------+-----------------------------------------------------+-------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------