In [None]:
from pyspark.sql import SparkSession
import time
import os
from dataclasses import dataclass
from functools import wraps

In [None]:
@dataclass
class TestCompression:
    """
    Class to test compression and decompression benchmarks in Apache Spark.
    """
    file_name: str
    df: "pyspark.sql.DataFrame"

    @staticmethod
    def measure_execution_time(func):
        """Decorator to measure function execution time."""
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()
            execution_time = end_time - start_time
            print(f"{func.__name__}: {execution_time:.4f} seconds")
            return result, execution_time
        return wrapper

    @measure_execution_time
    def test_compression(self, format: str, compression: str) -> tuple:
        """
        Test compression by writing a DataFrame to disk.
        """
        output_path = f"output/{format}_{compression}"

        # Ensure output directory is empty before writing
        if os.path.exists(output_path):
            for file in os.listdir(output_path):
                os.remove(os.path.join(output_path, file))

        # Write DataFrame
        self.df.write.mode("overwrite").format(format).option("compression", compression).save(output_path)

        # Ensure directory exists before calculating size
        file_size_mb = 0.0
        if os.path.exists(output_path):
            file_size = sum(
                os.path.getsize(os.path.join(output_path, f))
                for f in os.listdir(output_path) if os.path.isfile(os.path.join(output_path, f))
            )
            file_size_mb = round(file_size / (1024 * 1024), 2)  # Convert to MB

        return file_size_mb

    @measure_execution_time
    def test_decompression(self, format: str, compression: str) -> None:
        """
        Test decompression by reading a DataFrame from disk.
        """
        input_path = f"output/{format}_{compression}"
        _ = spark.read.format(format).load(input_path).count()

    def test_compression_benchmarks(self):
        """
        Run compression and decompression benchmarks for Parquet using Zstd, Snappy, and LZ4.
        """
        results = {}
        codecs = ["zstd", "snappy", "lz4"]

        for codec in codecs:
            # Test compression
            size, compression_time = self.test_compression("parquet", codec)
            
            # Test decompression
            _, decompression_time = self.test_decompression("parquet", codec)
            
            results[f"parquet_{codec}"] = {
                "size_mb": size,
                "compression_time_seconds": compression_time,
                "decompression_time_seconds": decompression_time
            }

        return results

    def get_original_file_size(self) -> float:
        """
        Get the size of the original CSV file.
        """
        if os.path.exists(self.file_name):
            file_size = os.path.getsize(self.file_name)
            return round(file_size / (1024 * 1024), 2)  # Convert to MB
        return 0.0

In [None]:
if __name__ == "__main__":
    # Initialize Spark session
    spark = SparkSession.builder.appName("CompressionBenchmark").getOrCreate()

    # Path to CSV file
    FILE_PATH = "/workspaces/high-performance-pyspark-advanced-strategies-for-optimal-data-processing-3919191/data/test_data.csv"

    # Read CSV into DataFrame with correct data types
    df = spark.read.option("header", "true").csv(FILE_PATH)
    
    # Ensure DataFrame is not empty
    if df.count() == 0:
        print("❌ ERROR: DataFrame is empty. Please check the input CSV file.")
        spark.stop()
        exit()

    # Get original file size before compression
    test_compression = TestCompression(file_name=FILE_PATH, df=df)
    original_size_mb = test_compression.get_original_file_size()

    # Run compression and decompression benchmarks
    results = test_compression.test_compression_benchmarks()

    # Print original file size and benchmark results
    print(f"\nOriginal CSV file size: {original_size_mb} MB\n")
    print("🔥 Compression and Decompression Benchmark Results 🔥 \n")
    for algorithm, metrics in results.items():
        print(f"{algorithm}: \n ")
        print(f"  Compression Time: {metrics['compression_time_seconds']:.2f}s")
        print(f"  Decompression Time: {metrics['decompression_time_seconds']:.2f}s")
        print(f"  Size: {metrics['size_mb']} MB\n")

    # Stop Spark session
    spark.stop()