In [None]:
# Install Java, Spark, and PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -xvzf spark-3.5.1-bin-hadoop3.tgz
!pip install -q pyspark

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

from pyspark.sql import SparkSession

def create_spark_session():
    try:
        spark = SparkSession.builder \
            .appName("WordCount") \
            .config("spark.executor.memory", "2g") \
            .config("spark.driver.memory", "2g") \
            .config("spark.executor.cores", "2") \
            .config("spark.sql.shuffle.partitions", "2") \
            .config("spark.local.dir", "/content/spark-temp") \
            .getOrCreate()
        print("Spark session created successfully.")
        return spark
    except Exception as e:
        print(f"Error creating Spark session: {e}")
        return None

spark = create_spark_session()

Spark session created successfully.


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Function to print RAM and CPU usage
import psutil
def print_usage():
    mem = psutil.virtual_memory()
    total_mem = mem.total / (1024**3)  # Convert from bytes to GB
    available_mem = mem.available / (1024**3)  # Convert from bytes to GB
    used_mem = total_mem - available_mem

    # Get CPU usage
    cpu_usage = psutil.cpu_percent(interval=1)

    print(f"Total RAM: {total_mem:.2f} GB")
    print(f"Used RAM: {used_mem:.2f} GB")
    print(f"Available RAM: {available_mem:.2f} GB")
    print(f"CPU Usage: {cpu_usage:.2f}%")

In [4]:
file_path = '/content/drive/My Drive/DATASET/mergedfiles12GB.txt'

import time
# Preprocessing Timer
preprocessing_start = time.time()

# Read the text file
df = spark.read.text(file_path)

# Filter to include only alphabetic characters, ignore spaces, commas, and dots, and convert to lowercase
import re
filtered_rdd = df.rdd.map(lambda row: (re.sub(r'[^a-zA-Z]', ' ', row.value).lower(),))

# Define schema
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("value", StringType(), True)])

# Convert filtered RDD back to DataFrame
filtered_df = spark.createDataFrame(filtered_rdd, schema)

filtered_df.show(50)

# Cache the DataFrame for faster access
filtered_df.cache()

preprocessing_end = time.time()
preprocessing_time = preprocessing_end - preprocessing_start
print(f"Preprocessing Time: {preprocessing_time:.2f} seconds")

# Word Count Timer
word_count_start = time.time()

# Perform word count using map-reduce
words = filtered_df.rdd.flatMap(lambda line: line.value.split())
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

word_count_end = time.time()
word_count_time = word_count_end - word_count_start
print(f"Word Count Time: {word_count_time:.2f} seconds")

# Sorting Timer
sorting_start = time.time()

# Convert to DataFrame and sort by word count
word_counts_df = word_counts.toDF(["word", "count"])
sorted_word_counts_df = word_counts_df.orderBy("count", ascending=False)

sorting_end = time.time()
sorting_time = sorting_end - sorting_start
print(f"Sorting Time: {sorting_time:.2f} seconds")

# Show the sorted word counts
sorted_word_counts_df.show(50)

print_usage()

+--------------------+
|               value|
+--------------------+
|a rebel statement...|
|authorities last ...|
|at the first pan ...|
|mr  neigum  poker...|
|this  combined wi...|
|she told the post...|
|according to a st...|
|preston tisch    ...|
| we re dealing wi...|
|asked if he might...|
|he said muscovite...|
|changes to the va...|
| he came up to me...|
|the government tr...|
|but the official ...|
| my husband says ...|
|we look forward t...|
|but other major b...|
|so he joined hyat...|
|de klerk said wed...|
| the results are ...|
|mr  hoover expect...|
|the entree  like ...|
|under the first c...|
|in store s presid...|
|quaker oats co  t...|
|can we never see ...|
|in addition  unio...|
|while the results...|
| the item veto is...|
| this isn t a hos...|
|in the long term ...|
|some analysts bel...|
|the yield on    y...|
|the kit has an ad...|
|last year  the fd...|
|that would be qui...|
|no children in th...|
|as the first wine...|
|the sterling valu...|
|they have 