Hồi quy (Regression)

Sử dụng tập dữ liệu california housing (file đính kèm)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataSplitExample").getOrCreate()

In [None]:
file_path = r'D:\Nam4\HK1\Cloud_ML\ThucHanh\Buoi3\src\data'

In [None]:
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [3]:
#Chia tập dư liệu thành train, validation, test
train_df, test_df = df.randomSplit([0.8,0.2], seed=42)
validation_df, test_df = test_df.randomSplit([0.5,0.5], seed=42)
print(f'Training set: {train_df.count()} samples')
print(f'validation set: {validation_df.count()} samples')
print(f'Test set: {test_df.count()} samples')

#Hiển thị thống kê cơ bản của từng tập
print('\n Thống kê tập train:')
display(train_df.describe())

print('\n Thống kê tập test:')
display(test_df.describe())

AttributeError: 'str' object has no attribute 'randomSplit'

In [None]:
#Loại bỏ outlier
from pyspark.sql.functions import col, mean, stddev

def remove_outliers_zscore(df, columns, threshold=3):
    df_clean = df
    for column in columns:
        if column != 'MedHouseVal':
            #Tính thống kê từ tập hiện tại
            stats = df.select(
                mean(col(column)).alias('mean'),
                stddev(col(column)).alias('std')

            ).collect()[0]

            mean_val = stats['mean']
            std_val = stats['std']

            lower_bound = mean_val - threshold * std_val
            upper_bound = mean_val + threshold * std_val

            df_clean = df_clean.filter((col(column) >= lower_bound) & (col(column) <= upper_bound ))

    return df_clean

train_df_clean = remove_outliers_zscore(train_df, train_df.columns)
print(f'Train samples sau khi xử lý outliers: {train_df_clean.count()}')


In [None]:
#Chuẩn hóa và chuyển thành vector 
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

feature_columns = [col for col in train_df.columns if col != ' MedHouseVal']
assembler = VectorAssembler(
    inputCols = feature_columns,
    outputCols='raw_feature'
)
scaler = StandardScaler(
    inputCol = 'raw_feature',
    outputCol ='features',
    withStd=True,
    withMean=True
)

preprocessing_pipeline = Pipeline(stage=[assembler, scaler])
preprocessing_model = preprocessing_pipeline.fit(train_df_clean)

train_df_processed = preprocessing_model.transform(train_df_clean)
validation_df_processed = preprocessing_model.transform(validation_df)

display(train_df_processed.select('features', 'MedHouseVal').limit(5))



In [None]:
#So sánh sự phân phối dữ liệu
def compare_distributions(original_df, cleaned_df, column_name):
    original_stats = original_df.select(mean(col(column_name)), stddev(col(column_name))).collect()[0]
    cleaned_stats = cleaned_df.select(mean(col(column_name)), stddev(col(column_name))).collect()[0]

    print(f'{column_name}:')
    print(f'Original - Mean: {original_stats[0]:.4f}, Std: {original_stats[1]:.4f}')
    print(f'Cleaned - Mean: {cleaned_stats[0]:.4f}, Std: {cleaned_stats[1]:.4f}')

for feature in ['MedInc', 'AveRooms','HouseAge']:
    compare_distributions(train_df, train_df_clean, feature)


In [None]:
#Huấn luyện và đánh giá
from pyspark.ml.regression import LinearRegression, RandomForesRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol ='MedHouseVal',
    predictionCol='prediction',
    metricName='rmse'
)

def train_and_evaluate_model(model, train_data, val_data, model_name):
    trained_model = model.fit(train_data)
    predictions = trained_model.transform(val_data)

    rmse = evaluator.evaluate(predictions)
    evaluator.setMetricName('mae')
    mae = evaluator.evaluate(predictions)
    evaluator.setMetricName('r2')
    r2 = evaluator.evaluate(predictions)

    print(f'{model_name} - Validation Metrics:')
    print(f'RMSE: {rmse:.4f}')
    print(f'MAE: {mae:.4f}')
    print(f'R^2:{r2:.4f}')
    print('-' * 40)

    return trained_model, predictions, {'RMSE': rmse, 'MAE': mae, 'R2': r2}

In [None]:
lr = LinearRegression(featureCol='features', labelCol='MedHouseVal')
lr_model, lr_predictions, lr_metrics = train_and_evaluate_model(
    lr, train_df_processed, validation_df_processed, 'Linear Regression'
)

rf = RandomForesRegressor(featureCol='feature', labelCol='MedHouseVal', seed = 42)
rf_model, rf_predictions, rf_metrics = train_and_evaluate_model(
    rf, train_df_processed, validation_df_processed, 'Random Forest'
)

gbt = GBTRegressor(featureCol='feature', labelCol='MedHouseVal', seed = 42)
gbt_model, gbt_predictions, gbt_metrics = train_and_evaluate_model(
    gbt, train_df_processed, validation_df_processed, 'Gradient Boosting'

)


In [None]:
#Tuning
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import os


param_grid = (ParamGridBuilder()
              .addGid(rf.numTrees, [50,100])
              .addGrid(rf.maxDepth, [5,10])
              .build())
cross_val = CrossValidator(
    estimator = rf,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=4,
    seed=42
)

cv_model = cross_val.fit(train_df_processed)
best_rf_model = cv_model.bestModel
print(f'Best parameters: numTrees= {best_rf_model.getNumtrees}, maxDepth={best_rf_model.getMaxDepth()}')

cv_predictions = best_rf_model.transform(validation_df_processed)
cv_rmse = evaluator.evaluate(cv_predictions)
print(f'Best Model RMSE on valiadation: {cv_rmse:.4f}')


In [None]:
#Đánh giá trên test set
def final_evaluation(model, test_data, model_name):
    """Đánh giá cuối cùng trên tập test -  chỉ được chạy 1 lần"""
    test_predictions = model.transform(test_data)

    rmse = evaluator.setMetricName('rmse').evaluate(test_predictions)
    mae = evaluator.setMetricName('mae').evaluate(test_predictions)
    r2 = evaluator.setMetricName('r2').evaluate(test_predictions)

    print(f'{model_name} - 'FINAL TEST METRICS:)
    print(f'MESE: {rmse:.4f}')
    print(f'MAE:{mae:.4f}')
    print(f'R2: {r2:.4f}')

    return test_predictions, {'RMSE':rmse, 'MAE': mae, 'R2': r2}

#Đánh giá các models trên tập test
print('Đnahs giá cuối cùng trên tập test:')
print('=' * 50)

lr_test_predictions, lr_test_metrics = final_evaluation(lr_model, test_df_processed, ' Linear Regression')
rf_test_predictions, rf_test_metrics = final_evaluation(rf_model, test_df_processed, 'Random Forest')
gbt_test_predictions, gbt_test_metrics = final_evaluation(gbt_model, test_df_processed, 'Gradient Boosting')
cv_test_predictions, cv_test_metrics = final_evaluation(best_rf_model, test_df_processed, 'Random Forest CV')



In [None]:
#Đánh giá hiệu suất và trực quan dữ liệu
from matplotlib import pyplot as plt 
import pandas as pd
#So sánh tắt cả models
models_comparison = {
    'Linear Regression': lr_test_metrics,
    'Random Forest': rf_test_metrics,
    'Gradient Boosting': gbt_test_metrics,
    'Random Forest CV': cv_test_metrics
}

#Hiển thị bằng so sánh
comparison_df = pd.DataFrame(models_comparison).T
print('\n Bảng so sánh hiệu suất:')
print(comparison_df)

#viasualation
plt.figure(figsize=(12,5))

plt.subplot(1,2,1)
models = list(models_comparison.keys())
rmse_scores = [metrics['RMSE'] for metrics in models_comparison.values()]
plt.bar(models, rmse_scores, color=['skyblue','lightgreen','lightcoral','gold'])
plt.title('RMSE Comparison on test set')
plt.xticks(rotation=45)

plt.subplot(1,2,3)
r2_scores =[metrics['R2'] for metrics in models_comparison.value()]
plt.bar(models, r2_scores, color=['skyblue','lightgreen','lightcoral','gold'])
plt.title('R2 Comparison on Test set')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()
