In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time


In [34]:
minio_endpoint = "http://minio:9000"
minio_access_key = "minioadmin"
minio_secret_key = "minioadmin"
minio_bucket_name = "spark-output"

In [35]:
spark = SparkSession.builder.appName("DataPreprocessingCPU").master("local[*]") \
    .config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", minio_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", minio_secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.367") \
    .getOrCreate()


In [36]:
csv_file_path = './data/raw/cpu-with-vendor.csv' 


In [37]:
df_spark = spark.read.csv(csv_file_path, header=True, inferSchema=True)


In [38]:
df_spark.printSchema()


root
 |-- vendor: string (nullable = true)
 |-- MYCT: double (nullable = true)
 |-- MMIN: double (nullable = true)
 |-- MMAX: double (nullable = true)
 |-- CACH: double (nullable = true)
 |-- CHMIN: double (nullable = true)
 |-- CHMAX: double (nullable = true)
 |-- class: double (nullable = true)



In [39]:
df_spark = df_spark.withColumn("MMAX_numeric", col("MMAX").cast("integer")) \
                           .withColumn("MMIN_numeric", col("MMIN").cast("integer"))


In [40]:
df_spark = df_spark.filter(col("MMAX_numeric") <= 30000)
df_spark = df_spark.filter(col("MMIN_numeric") <= 10000)

In [41]:
if df_spark.count() == 0:
        print("Nenhuma linha restou após a filtragem. Verifique os filtros ou os dados de entrada.")
        spark.stop()
        

In [42]:
columns_to_drop = ['vendor', 'MMAX_numeric', 'MMIN_numeric']

actual_columns_to_drop = [c for c in columns_to_drop if c in df_spark.columns]

df_processed = df_spark.drop(*actual_columns_to_drop)

In [51]:
parquet_object_name = f"clean_data/cpu-data-{int(time.time())}.parquet"
output_path_s3a = f"s3a://{minio_bucket_name}/{parquet_object_name}"

In [52]:
try:
    print(f"\nIniciando upload (escrita) para o MinIO em: {output_path_s3a}")
    df_processed.write.mode("overwrite").parquet(output_path_s3a)
    print(f"DataFrame 'df_processed' salvo com sucesso no MinIO em '{output_path_s3a}'")
except Exception as e:
    print(f"Erro ao salvar DataFrame no MinIO: {e}")


Iniciando upload (escrita) para o MinIO em: s3a://spark-output/clean_data/cpu-data-1747883619.parquet
DataFrame 'df_processed' salvo com sucesso no MinIO em 's3a://spark-output/clean_data/cpu-data-1747883619.parquet'


In [55]:
df_from_minio = spark.read.parquet(output_path_s3a)


In [56]:
df_from_minio.show(5, truncate=False)


+-----+------+-------+-----+-----+-----+-----+
|MYCT |MMIN  |MMAX   |CACH |CHMIN|CHMAX|class|
+-----+------+-------+-----+-----+-----+-----+
|125.0|256.0 |6000.0 |256.0|16.0 |128.0|199.0|
|29.0 |8000.0|16000.0|32.0 |8.0  |16.0 |132.0|
|400.0|1000.0|3000.0 |0.0  |1.0  |2.0  |23.0 |
|400.0|512.0 |3500.0 |4.0  |1.0  |6.0  |24.0 |
|60.0 |2000.0|8000.0 |65.0 |1.0  |8.0  |70.0 |
+-----+------+-------+-----+-----+-----+-----+
only showing top 5 rows

