In [1]:
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import subprocess
import sys
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info("Starting")

# Create SparkSession
spark = SparkSession.builder \
    .appName("DataCompactionJob") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

# Specify the input and output paths
input_path = "/data/spark_project/streamingNew1"
output_path = "/data/spark_project/streamingNew1_compacted"

logger.info(f"Compacting data from {input_path} to {output_path}")

# Check if the input path exists
check_path_command = f"hdfs dfs -test -d {input_path}"
if subprocess.run(check_path_command, shell=True).returncode != 0:
    logger.error(f"Input path {input_path} does not exist. Exiting.")
    sys.exit(1)

# Read the data from the specified path
df = spark.read.format("parquet").load(input_path)

logger.info("Data read successfully")

# Repartition the data to control the number of output files
num_partitions = 2  
compacted_df = df.repartition(num_partitions)

logger.info(f"Repartitioned the data into {num_partitions} partition(s)")

# Write the data back to the output path
compacted_df.write.mode("overwrite").format("parquet").save(output_path)

logger.info(f"Compacted data written to {output_path}")

# Stop SparkSession
spark.stop()

logger.info("Data compaction job completed successfully")


INFO:__main__:Starting
INFO:__main__:Compacting data from /data/spark_project/streamingNew1 to /data/spark_project/streamingNew1_compacted
INFO:__main__:Data read successfully
INFO:__main__:Repartitioned the data into 2 partition(s)
INFO:__main__:Compacted data written to /data/spark_project/streamingNew1_compacted
INFO:__main__:Data compaction job completed successfully
