In [1]:
import time
import os
import psutil # You might need to install this: pip install psutil

# This SparkSession is already initialized when launching with pyspark command
from pyspark.sql import SparkSession

In [2]:
# If you want to explicitly create it:
spark = SparkSession.builder \
    .appName("ClassificationExampleWithMetrics") \
    .getOrCreate()

# Check if Spark is working
print(f"Spark version: {spark.version}")

Spark version: 4.0.0-preview2


In [3]:
# Load data.csv and create a regression model to predict total_amount
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression  # Change import to LinearRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import DoubleType
import time
import psutil
import os

# Load the data
print("Loading data...")
data_path = "data.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True, nullValue="\\N")

# Display basic information
print(f"Dataset has {df.count()} rows and {len(df.columns)} columns")
print("Column names:", df.columns)
print("Schema:")
df.printSchema()
df.show(5)

# Drop any rows with null values
df = df.na.drop()

# Explicitly cast the target column to DoubleType for regression
df = df.withColumn("fare_amount", df["fare_amount"].cast(DoubleType()))

# Exclude non-relevant columns
exclude_columns = ["vendor_id", "pickup_time", "dropoff_time", "flag", "fare_amount"]  # Target and columns to exclude
feature_cols = [col for col in df.columns if col not in exclude_columns]

print("Selected features for model:", feature_cols)

# Cast all numeric features to double to ensure compatibility
for col in feature_cols:
    if col in df.columns:  # Make sure the column exists
        df = df.withColumn(col, df[col].cast(DoubleType()))

# Create pipeline stages for preprocessing
stages = []

# Convert any remaining string columns to numeric
string_columns = [field.name for field in df.schema.fields 
                  if field.dataType.simpleString().startswith("string") 
                  and field.name in feature_cols]

# Check if there are any string columns that need processing
if string_columns:
    for col in string_columns:
        # First convert strings to indices
        indexer = StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
        # Then convert indices to one-hot encoding
        encoder = OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_encoded")
        stages.extend([indexer, encoder])
        # Update feature columns list, replacing original string column with encoded version
        feature_cols.remove(col)
        feature_cols.append(f"{col}_encoded")

# Prepare numeric features 
numeric_cols = [field.name for field in df.schema.fields 
               if not field.dataType.simpleString().startswith("string")
               and field.name in feature_cols]

# Make sure we have features to work with
final_features = numeric_cols + [f"{col}_encoded" for col in string_columns]
if not final_features:
    raise ValueError("No features available for training after preprocessing")
    
print("Final features:", final_features)

# Add vector assembler to pipeline stages
assembler = VectorAssembler(inputCols=final_features, outputCol="features", handleInvalid="skip")
stages.append(assembler)

# Split the data
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
print(f"Training set: {train_data.count()} rows")
print(f"Test set: {test_data.count()} rows")

# Replace the Random Forest regressor with Linear Regression
# Create Linear Regression model
lr = LinearRegression(
    featuresCol="features", 
    labelCol="fare_amount",
    maxIter=10,       # Maximum iterations
    regParam=0.1,     # Regularization parameter
    elasticNetParam=0.0  # ElasticNet mixing parameter (0 = Ridge, 1 = Lasso)
)
stages.append(lr)

# Create the pipeline with all stages
pipeline = Pipeline(stages=stages)

# Try to fit the model with error handling
try:
    # --- Measure Training Time ---
    start_train_time = time.time()
    model = pipeline.fit(train_data)
    end_train_time = time.time()
    training_time = end_train_time - start_train_time
    print(f"Training Time: {training_time:.4f} seconds")
    
    # Continue with the rest of your code...
    # --- Measure Prediction Time ---
    start_pred_time = time.time()
    predictions = model.transform(test_data)
    end_pred_time = time.time()
    prediction_time = end_pred_time - start_pred_time
    print(f"Prediction Time: {prediction_time:.4f} seconds")
    # --- End Prediction Time Measurement ---

    # Select example rows to display
    print("Sample Predictions:")
    predictions.select("fare_amount", "prediction").show(5)

    # --- Evaluate Model ---
    evaluator = RegressionEvaluator(
        labelCol="fare_amount", 
        predictionCol="prediction", 
        metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE) on test data = {rmse:.4f}")

    # Also calculate R2 (coefficient of determination)
    evaluator.setMetricName("r2")
    r2 = evaluator.evaluate(predictions)
    print(f"R² on test data = {r2:.4f}")
    # --- End Evaluation ---

    # --- Feature Importance ---
    # Extract coefficients from the linear model
    coefficients = model.stages[-1].coefficients
    intercept = model.stages[-1].intercept
    print(f"\nLinear Regression Model:")
    print(f"Intercept: {intercept:.4f}")

    # Display coefficients as feature importance
    features_with_importance = list(zip(final_features, coefficients))
    sorted_features = sorted(features_with_importance, key=lambda x: abs(x[1]), reverse=True)

    print("\nFeature Coefficients (absolute value sorted):")
    for feature, coef in sorted_features[:10]:  # Top 10 features
        print(f"{feature}: {coef:.4f}")
    # --- End Feature Importance ---

    # --- Measure Memory Usage (Driver Process) ---
    process = psutil.Process(os.getpid())
    memory_info = process.memory_info()
    memory_usage_mb = memory_info.rss / (1024 * 1024)  # Resident Set Size in MB
    print(f"Driver Memory Usage: {memory_usage_mb:.2f} MB")
    # --- End Memory Usage Measurement ---
except Exception as e:
    print(f"Error during model training: {str(e)}")
    
    # Try to provide more diagnostic information
    print("\nDiagnostic information:")
    print(f"Number of features: {len(final_features)}")
    
    # Check for nulls in features
    for col in final_features:
        null_count = df.filter(df[col].isNull()).count()
        if null_count > 0:
            print(f"Column '{col}' has {null_count} null values")

Loading data...
Dataset has 83459 rows and 20 columns
Column names: ['vendor_id', 'pickup_time', 'dropoff_time', 'flag', 'rate_code', 'pu_location', 'do_location', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'trip_duration_minutes', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge']
Schema:
root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_time: timestamp (nullable = true)
 |-- dropoff_time: timestamp (nullable = true)
 |-- flag: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- pu_location: integer (nullable = true)
 |-- do_location: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |--

In [4]:
# Get details about ML tasks and cluster resources

# 1. Spark application information
print("===== SPARK APPLICATION INFORMATION =====")
print(f"Application Name: {spark.conf.get('spark.app.name')}")
print(f"Spark Version: {spark.version}")
print(f"Master: {spark.conf.get('spark.master')}")

# 2. Cluster resources
print("\n===== CLUSTER RESOURCES =====")
print(f"Number of executors: {spark.sparkContext._jsc.sc().getExecutorMemoryStatus().size()}")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")
print(f"Available Cores: {spark.sparkContext.defaultParallelism}")

# 3. Spark configuration
print("\n===== SPARK CONFIGURATION =====")
spark_config = spark.sparkContext.getConf().getAll()
for item in sorted(spark_config, key=lambda x: x[0]):
    print(f"{item[0]}: {item[1]}")

# 4. Memory information
print("\n===== MEMORY INFORMATION =====")
driver_memory = spark.sparkContext._jsc.sc().getConf().get("spark.driver.memory", "Not set")
executor_memory = spark.sparkContext._jsc.sc().getConf().get("spark.executor.memory", "Not set")
print(f"Driver Memory: {driver_memory}")
print(f"Executor Memory: {executor_memory}")

# 5. ML pipeline information (if model exists in this scope)
if 'model' in locals():
    print("\n===== ML PIPELINE INFORMATION =====")
    stages = model.stages
    print(f"Pipeline has {len(stages)} stages:")
    for i, stage in enumerate(stages):
        print(f"  Stage {i}: {type(stage).__name__}")
        # For LinearRegressionModel, print more details
        if hasattr(stage, 'coefficients'):
            print(f"    Coefficients shape: {stage.coefficients.size}")
            print(f"    Intercept: {stage.intercept:.4f}")
            print(f"    Num iterations: {stage.summary.totalIterations}")
            print(f"    Training RMSE: {stage.summary.rootMeanSquaredError:.4f}")
            print(f"    Training R²: {stage.summary.r2:.4f}")

# 6. Detailed execution metrics (UI information)
print("\n===== EXECUTION METRICS =====")
print("Note: For more detailed metrics, view the Spark UI at http://localhost:4040")

# 7. Process local metrics
print("\n===== PROCESS METRICS =====")
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
cpu_percent = process.cpu_percent(interval=1.0)
print(f"Process ID: {os.getpid()}")
print(f"CPU Usage: {cpu_percent}%")
print(f"Memory Usage (RSS): {memory_info.rss / (1024 * 1024):.2f} MB")
print(f"Memory Usage (VMS): {memory_info.vms / (1024 * 1024):.2f} MB")

# 8. Performance summary if available
print("\n===== PERFORMANCE SUMMARY =====")
if 'training_time' in locals():
    print(f"Model Training Time: {training_time:.4f} seconds")
if 'prediction_time' in locals():
    print(f"Prediction Time: {prediction_time:.4f} seconds")
if 'rmse' in locals() and 'r2' in locals():
    print(f"RMSE: {rmse:.4f}")
    print(f"R²: {r2:.4f}")

spark.stop()

===== SPARK APPLICATION INFORMATION =====
Application Name: ClassificationExampleWithMetrics
Spark Version: 4.0.0-preview2
Master: local[*]

===== CLUSTER RESOURCES =====
Number of executors: 1
Default Parallelism: 8
Available Cores: 8

===== SPARK CONFIGURATION =====
spark.app.id: local-1746844806252
spark.app.name: ClassificationExampleWithMetrics
spark.app.startTime: 1746844804731
spark.app.submitTime: 1746844802665
spark.driver.extraJavaOptions: -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add