In [2]:
from pyspark.context import SparkContext
from math import sqrt, pow
from datetime import datetime
from pyspark.rdd import RDD


sc = SparkContext('local', 'test')

25/04/08 16:09:39 WARN Utils: Your hostname, lnvi-legion resolves to a loopback address: 127.0.1.1; using 192.168.100.212 instead (on interface wlp4s0)
25/04/08 16:09:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/08 16:09:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Change the path to the CSV file as needed
# Load the CSV file as a text file and filter out the header
lines = sc.textFile("train.csv")
header = lines.first()
data_rdd = lines.filter(lambda line: line != header)

def process_trip_line(line):
    fields = line.split(",")
    pickup_datetime = datetime.strptime(fields[2], "%Y-%m-%d %H:%M:%S") # pickup_datetime
    passenger_count = int(fields[4]) # passenger_count
    pickup_longitude = float(fields[5]) # pickup_longitude
    pickup_latitude = float(fields[6]) # pickup_latitude
    dropoff_longitude = float(fields[7]) # dropoff_longitude
    dropoff_latitude = float(fields[8]) # dropoff_latitude
    trip_duration = int(fields[10]) # trip_duration

    pickup_minutes = pickup_datetime.hour * 60 + pickup_datetime.minute
    pickup_dayofweek = pickup_datetime.weekday() + 1
    pickup_month = pickup_datetime.month
    distance = sqrt(pow((pickup_longitude - dropoff_longitude), 2) + pow((pickup_latitude - dropoff_latitude), 2))
    return [
        passenger_count,
        pickup_latitude,
        pickup_longitude,
        distance,
        pickup_minutes,
        pickup_dayofweek,
        pickup_month,
        trip_duration
    ]

data_rdd = data_rdd.map(process_trip_line)
data_rdd = data_rdd.filter(lambda x: x[0] > 0) \
                    .filter(lambda x: x[-1] < 22 * 3600) \
                    .filter(lambda x: x[3] > 0)

                                                                                

In [None]:
class DecisionTreeRegressor:
    def __init__(self, train_rdd : RDD, max_depth=5, minInstancesPerNode=1, maxBins=10):
        self.X = train_rdd.map(lambda row: row[:-1]).cache()
        self.y = train_rdd.map(lambda row: row[-1]).cache()
        self.max_depth = max_depth
        self.minInstancesPerNode = minInstancesPerNode
        self.maxBins = maxBins
        self.tree = self.build_tree(self.X, self.y)
    
    def build_tree(self, X, y, depth=0):
        n_samples, n_features = X.count(), len(X.first())
        
        y_mean = y.mean()
        
        variance = y.map(lambda x: (x - y_mean) ** 2).mean()
        
        if n_samples == 0 or depth >= self.max_depth or variance == 0:
            return y_mean
        
        best_split = None
        best_variance = float('inf')
        
        for feature_index in range(n_features):
            split_candidates = self.approx_quantile(X.map(lambda x: x[feature_index]), [i / self.maxBins for i in range(1, self.maxBins)])
            for candidate in split_candidates:
                left_mask = X.map(lambda x: x[feature_index] <= candidate)
                right_mask = X.map(lambda x: x[feature_index] > candidate)
                
                left_y = y.zip(left_mask).filter(lambda x: x[1]).map(lambda x: x[0]).persist()
                right_y = y.zip(right_mask).filter(lambda x: not x[1]).map(lambda x: x[0]).persist()

                if left_y.count() < self.minInstancesPerNode or right_y.count() < self.minInstancesPerNode:
                    continue
                
                left_y_count = left_y.count()
                right_y_count = right_y.count()
                left_y_mean = left_y.mean()
                right_y_mean = right_y.mean()

                left_variance = left_y.map(lambda x: (x - left_y_mean) ** 2).mean() if left_y_count > 0 else 0
                right_variance = right_y.map(lambda x: (x - right_y_mean) ** 2).mean() if right_y_count > 0 else 0
                
                weighted_variance = (left_variance * left_y_count + right_variance * right_y_count) / n_samples
                
                if weighted_variance < best_variance:
                    best_variance = weighted_variance
                    best_split = (feature_index, candidate)
            left_y.unpersist()
            right_y.unpersist()
        
        if best_split is None:
            return y_mean
        
        feature_index, candidate = best_split
        
        left_mask = X.map(lambda x: x[feature_index] <= candidate)
        right_mask = X.map(lambda x: x[feature_index] > candidate)
        
        left_X = X.zip(left_mask).filter(lambda x: x[1]).map(lambda x: x[0])
        right_X = X.zip(right_mask).filter(lambda x: not x[1]).map(lambda x: x[0])
        
        left_y = y.zip(left_mask).filter(lambda x: x[1]).map(lambda x: x[0]).cache()
        right_y = y.zip(right_mask).filter(lambda x: not x[1]).map(lambda x: x[0]).cache()
        left_tree = self.build_tree(left_X, left_y, depth + 1)
        right_tree = self.build_tree(right_X, right_y, depth + 1)
        return {
            'feature_index': feature_index,
            'threshold': candidate,
            'left': left_tree,
            'right': right_tree
        }
    
    @staticmethod
    def approx_quantile(rdd, quantiles, sample_fraction=0.1, seed=None):
        """
        Approximate quantiles of an RDD using sampling.

        :param rdd: Input RDD of numerical values.
        :param quantiles: List of desired quantiles (values between 0 and 1).
        :param sample_fraction: Fraction of data to sample (between 0 and 1).
        :param seed: Random seed for sampling.
        :return: List of approximate quantile values.
        """
        sample = rdd.sample(withReplacement=False, fraction=sample_fraction, seed=seed).collect()

        if not sample:
            raise ValueError("Sample is empty. Consider increasing sample_fraction.")

        sample.sort()

        n = len(sample)
        return [sample[min(int(q * n), n - 1)] for q in quantiles]

    def traverse_tree(self, X):
        if isinstance(self.tree, dict):
            if X[self.tree['feature_index']] <= self.tree['threshold']:
                return self.traverse_tree(self.tree['left'], X)
            else:
                return self.traverse_tree(self.tree['right'], X)
        else:
            return self.tree

    def predict(self, test_rdd):
        return test_rdd.map(lambda X: self.traverse_tree(self.tree, X))
        

In [5]:
train_rdd, test_rdd = data_rdd.randomSplit([0.8, 0.2], seed=42)

In [10]:
dt = DecisionTreeRegressor(train_rdd=train_rdd, max_depth=5, minInstancesPerNode=1)

ERROR:root:KeyboardInterrupt while sending command.                 (4 + 1) / 6]
Traceback (most recent call last):
  File "/home/lnvi/Code/lab03-spark-ml/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lnvi/Code/lab03-spark-ml/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
25/04/08 16:53:25 ERROR Executor: Exception in task 4.0 in stage 1828.0 (TID 10933)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/lnvi/Code/lab03-spark-ml/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py"

KeyboardInterrupt: 