In [1]:
import os
import pandas as pd
import findspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, lower
from pyspark.sql.functions import regexp_replace
from datasets import load_dataset
import logging

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
hadoop_home = os.environ.get("HADOOP_HOME")
print(hadoop_home)

C:\hadoop


In [3]:

def init_spark():
    """Initialize a Spark session"""
    findspark.init()
    spark = SparkSession.builder \
    .appName("AGNewsProcessing") \
    .config("spark.hadoop.fs.permissions.umask-mode", "007") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .config("spark.hadoop.hadoop.home.dir", hadoop_home) \
    .getOrCreate()

    return spark

def load_data():
    """Load AG News dataset into a PySpark DataFrame"""
    # Load dataset as a list of dictionaries
    dataset = load_dataset("sh0416/ag_news", split="test")

    # Convert dataset to Pandas DataFrame
    df = pd.DataFrame(dataset)

    output_dir = "../data"
    os.makedirs(output_dir, exist_ok=True)
    csv_path = os.path.join(output_dir, "ag_news_test.csv")
    df.to_csv(csv_path, index=False)

    print("Test data CSV file successfully written to 'data/' directory.")


In [4]:
spark = init_spark()

In [5]:
load_data()

Test data CSV file successfully written to 'data/' directory.


In [None]:
df_spark = spark.read.csv("../data/ag_news_test.csv", header=True, inferSchema=True)

In [7]:
df_spark.printSchema()

root
 |-- label: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)



In [8]:
df_spark.createOrReplaceTempView("news")

In [9]:
spark.sql("select * from news limit 5").show()

+-----+--------------------+--------------------+
|label|               title|         description|
+-----+--------------------+--------------------+
|    3|Fears for T N pen...|Unions representi...|
|    4|The Race is On: S...|SPACE.com - TORON...|
|    4|Ky. Company Wins ...|AP - A company fo...|
|    4|Prediction Unit H...|AP - It's barely ...|
|    4|Calif. Aims to Li...|AP - Southern Cal...|
+-----+--------------------+--------------------+



In [10]:
spark.sql("select * from news limit 10").show()

+-----+--------------------+--------------------+
|label|               title|         description|
+-----+--------------------+--------------------+
|    3|Fears for T N pen...|Unions representi...|
|    4|The Race is On: S...|SPACE.com - TORON...|
|    4|Ky. Company Wins ...|AP - A company fo...|
|    4|Prediction Unit H...|AP - It's barely ...|
|    4|Calif. Aims to Li...|AP - Southern Cal...|
|    4|Open Letter Again...|"The British Depa...|
|    4|Loosing the War o...|    "\""Sven Jaschan|
|    4|FOAFKey: FOAF, PG...|\\FOAF/LOAF  and ...|
|    4|E-mail scam targe...|"Wiltshire Police...|
|    4|Card fraud unit n...|In its first two ...|
+-----+--------------------+--------------------+



In [11]:
df_spark.select('description').limit(1).show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------+
|description                                                                                                                    |
+-------------------------------------------------------------------------------------------------------------------------------+
|Unions representing workers at Turner   Newall say they are 'disappointed' after talks with stricken parent firm Federal Mogul.|
+-------------------------------------------------------------------------------------------------------------------------------+



In [12]:
# df_spark.filter(df_spark.label==2).show(5)

In [13]:
def count_specific_words(df, words):
    """Counts occurrences of given words in news descriptions"""

    for word in words:
        count = df.filter(F.col("description").contains(word)).count()
        print(f"Word '{word}' appears {count} times.")



In [14]:

# Define target words
target_words = ["president", "the", "asia"]

# Clean text and split into words
words_df = df_spark.select(
    explode(
        split(
            lower(regexp_replace(df_spark["description"], "[^a-zA-Z\s]", "")), "\s+"
        )
    ).alias("word")
)


  lower(regexp_replace(df_spark["description"], "[^a-zA-Z\s]", "")), "\s+"
  lower(regexp_replace(df_spark["description"], "[^a-zA-Z\s]", "")), "\s+"


In [15]:


# Convert to lowercase and remove empty/null rows
cleaned_words_df = words_df.filter(col("word").isNotNull()) \
                           .filter(col("word") != "") \
                           .filter(col("word") != " ")

# Show the cleaned DataFrame
cleaned_words_df.show()


+------------+
|        word|
+------------+
|      unions|
|representing|
|     workers|
|          at|
|      turner|
|      newall|
|         say|
|        they|
|         are|
|disappointed|
|       after|
|       talks|
|        with|
|    stricken|
|      parent|
|        firm|
|     federal|
|       mogul|
|    spacecom|
|     toronto|
+------------+
only showing top 20 rows



In [16]:
# Filter for target words
filtered_words = cleaned_words_df.filter(col("word").isin(target_words))

In [17]:
# Count occurrences
word_counts = filtered_words.groupBy("word").count().orderBy("count", ascending=False)

In [18]:
word_counts.show()


+---------+-----+
|     word|count|
+---------+-----+
|      the|12303|
|president|  333|
|     asia|   23|
+---------+-----+



In [19]:
import os
from datetime import datetime

# Generate date in YYYYMMDD format
date_str = datetime.now().strftime("%Y%m%d")

# Define output directory (use absolute path)
output_dir = os.path.abspath("../outputs")  # Convert relative to absolute path
os.makedirs(output_dir, exist_ok=True)  # Ensure directory exists

# Construct full file path
parquet_path = os.path.join(output_dir, f"word_count_{date_str}.parquet")
print(parquet_path)

# Save DataFrame as Parquet
word_counts.write.parquet(parquet_path)

print(f"Parquet file saved at: {parquet_path}")


c:\Users\kdkas\Desktop\DE task\code\outputs\word_count_20250304.parquet
Parquet file saved at: c:\Users\kdkas\Desktop\DE task\code\outputs\word_count_20250304.parquet


In [6]:
df = pd.read_parquet("..\..\outputs\word_count_20250313.parquet")

  df = pd.read_parquet("..\..\outputs\word_count_20250313.parquet")


In [7]:
df.head()

Unnamed: 0,word,count
0,the,12303
1,president,333
2,asia,23


In [28]:
all = cleaned_words_df.groupBy("word").count().orderBy("count", ascending=False)

In [29]:
all.show()

+----+-----+
|word|count|
+----+-----+
| the|12303|
|   a| 6474|
|  to| 5976|
|  of| 5522|
|  in| 4727|
| and| 4108|
|  on| 2877|
| for| 2356|
|that| 1765|
|   s| 1666|
|with| 1460|
| its| 1374|
|  as| 1361|
|  at| 1273|
|said| 1243|
|  is| 1241|
|  by| 1200|
| has| 1169|
|  it| 1168|
| new| 1053|
+----+-----+
only showing top 20 rows



In [31]:
# Generate date in YYYYMMDD format
from datetime import datetime
date_str = datetime.now().strftime("%Y%m%d")

# Define output directory (use absolute path)
output_dir = os.path.abspath("../outputs")  # Convert relative to absolute path
os.makedirs(output_dir, exist_ok=True)  # Ensure directory exists

# Construct full file path
parquet_path = os.path.join(output_dir, f"word_count_all_{date_str}.parquet")
print(parquet_path)

# Save DataFrame as Parquet
all.write.parquet(parquet_path)

print(f"Parquet file saved at: {parquet_path}")

c:\Users\kdkas\Desktop\DE task\code\outputs\word_count_all_20250304.parquet
Parquet file saved at: c:\Users\kdkas\Desktop\DE task\code\outputs\word_count_all_20250304.parquet


In [8]:
df_all = pd.read_parquet("..\..\outputs\word_count_all_20250313.parquet")

  df_all = pd.read_parquet("..\..\outputs\word_count_all_20250313.parquet")


In [9]:
df_all.head()

Unnamed: 0,word,count
0,the,12303
1,a,6474
2,to,5976
3,of,5522
4,in,4727


In [39]:

def save_to_parquet(df, filename_prefix):
    """Save Results DataFrame to Parquet file"""
    
    date_str = datetime.now().strftime("%Y%m%d")
    outputs_dir = os.path.abspath("../../outputs")

    # Ensure the directory exists
    os.makedirs(outputs_dir, exist_ok=True)

    filename = f"{filename_prefix}_{date_str}.parquet"
    parquet_path = os.path.join(outputs_dir, filename)

    df.write.parquet(parquet_path)
    print(f"File saved: {parquet_path}")


In [41]:
save_to_parquet(word_counts, 'word_count')

File saved: c:\Users\kdkas\Desktop\DE task\outputs\word_count_20250304.parquet


In [41]:
spark.stop()