In [5]:
from xgboost.spark import SparkXGBRegressor

In [6]:
times=[]
import time
from pyspark.ml.evaluation import RegressionEvaluator

In [7]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("data.csv") \
    .getOrCreate()

# Read the CSV file into a Spark DataFrame
file_path = "/home/ubuntu/data.csv"  # Replace this with the path to your CSV file
train_spark_dataframe = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the schema and the first few rows of the DataFrame
train_spark_dataframe.printSchema()
train_spark_dataframe.show(5)



root
 |-- trip_distance: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- pickup_hour: integer (nullable = true)

+--------------------+---------------+------------+------------+--------------------+-----------+
|       trip_distance|passenger_count|PULocationID|DOLocationID|          tip_amount|pickup_hour|
+--------------------+---------------+------------+------------+--------------------+-----------+
| -0.3661467419730907|              1|         151|         239|-0.01412124292201...|          0|
|-0.07707474953506906|              1|         239|         246|-0.03583707603552034|          0|
|  -0.760335822570393|              3|         236|         236|-0.06924605005629836|         13|
|  -0.760335822570393|              5|         193|         193|-0.06924605005629836|         15|
|  -0.760335822570393|              5| 

                                                                                

In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Start Spark session if it's not already started
# Uncomment the next two lines if the Spark session isn't initialized
# spark = SparkSession.builder \
#     .appName("Tip Prediction Model") \
#     .getOrCreate()

# Define the assembler with the input columns that will be features
assembler = VectorAssembler(
    inputCols=["trip_distance", "passenger_count", "PULocationID", "DOLocationID", "pickup_hour"],
    outputCol="features"
)

# Assuming 'train_spark_dataframe' is already loaded and defined
# Transform the data to create feature vectors
transformed_dataframe = assembler.transform(train_spark_dataframe)

# Select only the necessary columns, including the newly created 'features' and the label 'tip_amount'
final_dataframe = transformed_dataframe.select("features", "tip_amount")

# Split the data approximately into 70% training and 30% testing
train_data, test_data = final_dataframe.randomSplit([0.7, 0.3], seed=42)

# 10 Workers

In [9]:
times = []

In [10]:
for i in range(5):
    t0 = time.time()
# Create an instance of SparkXGBRegressor
    xgb_regressor = SparkXGBRegressor(
    features_col="features",
    label_col="tip_amount",
    prediction_col="predicted_tip_amount",
    objective="reg:squarederror",
    numWorkers=10,
    eta=0.1
    )
    t1 = time.time()
    times.append(t1-t0)
print("Training times for each run:", times)

Training times for each run: [0.0011959075927734375, 0.0007948875427246094, 0.0007452964782714844, 0.0007405281066894531, 0.0007779598236083984]


In [11]:
times = []

In [12]:
for i in range(5):
    t0 = time.time()
    xgb_regressor_model = xgb_regressor.fit(train_data)
    t1 = time.time()
    times.append(t1-t0)
print("Training times for each run:", times)

2024-04-21 21:31:37,519 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'numWorkers': 10, 'eta': 0.1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[21:34:04] task 0 got new rank 0                                    (0 + 1) / 1]
Parameters: { "numWorkers" } are not used.

2024-04-21 21:36:11,902 INFO XGBoost-PySpark: _fit Finished xgboost training!   
2024-04-21 21:36:12,093 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'numWorkers': 10, 'eta': 0.1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[21:38:34] task 0 got new rank 0                                    (0 + 1) / 1]
Parameters: { "numWorkers" } are not used.

2024-04-21 21:40:37

Training times for each run: [275.36387848854065, 266.0058059692383, 262.55772495269775, 266.4080424308777, 263.52869868278503]


In [13]:
times = []

In [14]:
for i in range(5):
    t0 = time.time()

# Assuming 'xgb_regressor_model' has already been trained and 'test_data' prepared
    predictions = xgb_regressor_model.transform(test_data)
    t1 = time.time()
    times.append(t1-t0)
# Evaluate the model using the correct parameter names
evaluator = RegressionEvaluator(
    labelCol="tip_amount",  # Correct parameter name is 'labelCol' not 'label_col'
    predictionCol="predicted_tip_amount",  # Correct parameter name is 'predictionCol'
    metricName="rmse"  # This is correct; other options could be "mae" or "r2"
)
print("Training times for each run:", times)
# Calculate RMSE
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Training times for each run: [0.13955903053283691, 0.08032870292663574, 0.08795332908630371, 0.08109807968139648, 0.07780265808105469]


INFO:XGBoost-PySpark:Do the inference on the CPUs                   (0 + 2) / 9]
2024-04-21 21:53:54,718 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
INFO:XGBoost-PySpark:Do the inference on the CPUs                   (2 + 2) / 9]
2024-04-21 21:54:28,194 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-04-21 21:55:02,838 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
INFO:XGBoost-PySpark:Do the inference on the CPUs                   (4 + 2) / 9]
INFO:XGBoost-PySpark:Do the inference on the CPUs                   (6 + 2) / 9]
2024-04-21 21:55:37,420 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-04-21 21:56:11,764 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs


Root Mean Squared Error (RMSE) on test data = 1.82001


                                                                                

# 6 Workers

In [15]:
times = []

In [16]:
for i in range(5):
    t0 = time.time()
# Create an instance of SparkXGBRegressor
    xgb_regressor = SparkXGBRegressor(
    features_col="features",
    label_col="tip_amount",
    prediction_col="predicted_tip_amount",
    objective="reg:squarederror",
    numWorkers=6,
    eta=0.1
    )
    t1 = time.time()
    times.append(t1-t0)
print("Training times for each run:", times)

Training times for each run: [0.0011239051818847656, 0.0011930465698242188, 0.0009562969207763672, 0.0010235309600830078, 0.0008661746978759766]


In [17]:
times = []

In [18]:
for i in range(5):
    t0 = time.time()
    xgb_regressor_model = xgb_regressor.fit(train_data)
    t1 = time.time()
    times.append(t1-t0)
print("Training times for each run:", times)

2024-04-21 21:59:40,887 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'numWorkers': 6, 'eta': 0.1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[22:02:06] task 0 got new rank 0                                    (0 + 1) / 1]
Parameters: { "numWorkers" } are not used.

2024-04-21 22:04:11,617 INFO XGBoost-PySpark: _fit Finished xgboost training!   
2024-04-21 22:04:11,782 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'numWorkers': 6, 'eta': 0.1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[22:06:35] task 0 got new rank 0                                    (0 + 1) / 1]
Parameters: { "numWorkers" } are not used.

2024-04-21 22:08:39,7

Training times for each run: [270.94072675704956, 268.09159541130066, 266.82062911987305, 268.5338923931122, 270.7846043109894]


In [19]:
times = []

In [20]:
for i in range(5):
    t0 = time.time()

# Assuming 'xgb_regressor_model' has already been trained and 'test_data' prepared
    predictions = xgb_regressor_model.transform(test_data)
    t1 = time.time()
    times.append(t1-t0)
# Evaluate the model using the correct parameter names
evaluator = RegressionEvaluator(
    labelCol="tip_amount",  # Correct parameter name is 'labelCol' not 'label_col'
    predictionCol="predicted_tip_amount",  # Correct parameter name is 'predictionCol'
    metricName="rmse"  # This is correct; other options could be "mae" or "r2"
)
print("Training times for each run:", times)
# Calculate RMSE
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Training times for each run: [0.07748293876647949, 0.07015180587768555, 0.07546353340148926, 0.05991053581237793, 0.0815427303314209]


INFO:XGBoost-PySpark:Do the inference on the CPUs
2024-04-21 22:22:09,300 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2024-04-21 22:22:41,785 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
INFO:XGBoost-PySpark:Do the inference on the CPUs                   (2 + 2) / 9]
2024-04-21 22:23:15,899 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
INFO:XGBoost-PySpark:Do the inference on the CPUs                   (4 + 2) / 9]
2024-04-21 22:23:50,339 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
INFO:XGBoost-PySpark:Do the inference on the CPUs                   (6 + 2) / 9]
2024-04-21 22:24:24,804 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs

Root Mean Squared Error (RMSE) on test data = 1.82001


                                                                                

# 2 Workers

In [21]:
times = []

In [22]:
for i in range(5):
    t0 = time.time()
# Create an instance of SparkXGBRegressor
    xgb_regressor = SparkXGBRegressor(
    features_col="features",
    label_col="tip_amount",
    prediction_col="predicted_tip_amount",
    objective="reg:squarederror",
    numWorkers=2,
    eta=0.1
    )
    t1 = time.time()
    times.append(t1-t0)
print("Training times for each run:", times)

Training times for each run: [0.0010111331939697266, 0.0008254051208496094, 0.0008337497711181641, 0.0007421970367431641, 0.0007581710815429688]


In [23]:
times = []

In [24]:
for i in range(5):
    t0 = time.time()
    xgb_regressor_model = xgb_regressor.fit(train_data)
    t1 = time.time()
    times.append(t1-t0)
print("Training times for each run:", times)

2024-04-21 22:25:54,217 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'numWorkers': 2, 'eta': 0.1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[22:28:19] task 0 got new rank 0                                    (0 + 1) / 1]
Parameters: { "numWorkers" } are not used.

2024-04-21 22:30:23,564 INFO XGBoost-PySpark: _fit Finished xgboost training!   
2024-04-21 22:30:23,707 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'numWorkers': 2, 'eta': 0.1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[22:32:48] task 0 got new rank 0                                    (0 + 1) / 1]
Parameters: { "numWorkers" } are not used.

2024-04-21 22:34:49,0

Training times for each run: [269.49249601364136, 265.5281836986542, 262.73722672462463, 265.38063073158264, 268.6342601776123]


In [25]:
times = []

In [26]:
for i in range(5):
    t0 = time.time()

# Assuming 'xgb_regressor_model' has already been trained and 'test_data' prepared
    predictions = xgb_regressor_model.transform(test_data)
    t1 = time.time()
    times.append(t1-t0)
# Evaluate the model using the correct parameter names
evaluator = RegressionEvaluator(
    labelCol="tip_amount",  # Correct parameter name is 'labelCol' not 'label_col'
    predictionCol="predicted_tip_amount",  # Correct parameter name is 'predictionCol'
    metricName="rmse"  # This is correct; other options could be "mae" or "r2"
)
print("Training times for each run:", times)
# Calculate RMSE
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Training times for each run: [0.07062339782714844, 0.0824432373046875, 0.09829831123352051, 0.08804798126220703, 0.060906171798706055]


INFO:XGBoost-PySpark:Do the inference on the CPUs
2024-04-21 22:48:09,025 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
INFO:XGBoost-PySpark:Do the inference on the CPUs                   (1 + 2) / 9]
2024-04-21 22:48:42,489 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
INFO:XGBoost-PySpark:Do the inference on the CPUs                   (3 + 2) / 9]
2024-04-21 22:49:16,788 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
INFO:XGBoost-PySpark:Do the inference on the CPUs                   (6 + 2) / 9]
2024-04-21 22:49:50,976 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs


Root Mean Squared Error (RMSE) on test data = 1.82001


                                                                                