# Unit Testing and Debuggin (Random Forest Model)

In [132]:
import pandas as pd
import numpy as np
import unittest
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import time
from joblib import Parallel, delayed


In [133]:
# Load your dataset
df = pd.read_csv('./csv/2020_out_heading_matched_out.csv')

In [134]:
df.head()

Unnamed: 0,state_Alabama,state_Alaska,state_Arizona,state_Arkansas,state_California,state_Colorado,state_Connecticut,state_Delaware,state_Florida,state_Georgia,...,GROSSRentOccupiedUnitsPayingRentMedianGrossRent_Dollars_,COMPUTERSAndInternetUseTotalHouseholds,COMPUTERSAndInternetUseTotalHouseholdsWithAComputerInPercent,COMPUTERSAndInternetUseTotalHouseholdsWithABroadbandInternetSubscriptionInPercent,LastFourthElection_Republican,LastThirdElection_Republican,LastSecondElection_Republican,LastElection_Republican,Target,Poll
0,1,0,0,0,0,0,0,0,0,0,...,-0.937159,-0.216342,-1.577553,-1.336384,1,1,1,1,1,1
1,0,1,0,0,0,0,0,0,0,0,...,0.775301,-0.860445,1.384752,0.643917,1,1,1,1,1,1
2,0,0,1,0,0,0,0,0,0,0,...,0.340667,0.086206,1.033292,0.452275,1,0,0,1,0,0
3,0,0,0,1,0,0,0,0,0,0,...,-1.219672,-0.503647,-1.527345,-1.91131,1,1,1,1,1,1
4,0,0,0,0,1,0,0,0,0,0,...,2.570342,4.191641,1.284335,1.282724,0,0,0,0,0,0


In [135]:
print(df.columns)

Index(['state_Alabama', 'state_Alaska', 'state_Arizona', 'state_Arkansas',
       'state_California', 'state_Colorado', 'state_Connecticut',
       'state_Delaware', 'state_Florida', 'state_Georgia',
       ...
       'GROSSRentOccupiedUnitsPayingRentMedianGrossRent_Dollars_',
       'COMPUTERSAndInternetUseTotalHouseholds',
       'COMPUTERSAndInternetUseTotalHouseholdsWithAComputerInPercent',
       'COMPUTERSAndInternetUseTotalHouseholdsWithABroadbandInternetSubscriptionInPercent',
       'LastFourthElection_Republican', 'LastThirdElection_Republican',
       'LastSecondElection_Republican', 'LastElection_Republican', 'Target',
       'Poll'],
      dtype='object', length=364)


In [136]:
# Assuming `df` is your main dataset and 'target' is the label column
X = df.drop('Target', axis=1)
y = df['Target']

In [137]:
#Spliting the dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [138]:
# Seting up a RandomForest model
model = RandomForestClassifier(random_state=42)

In [139]:
import logging

In [140]:
# Training the model
logging.info("Training the RandomForest model...")
model.fit(X_train, y_train)

In [141]:
# predictions
logging.info("Making predictions...")
predictions = model.predict(X_test)

In [142]:
def preprocess_data(data):
    data = data.fillna(0)
    data = data.replace([np.inf, -np.inf], 0)
    return data

### Unit Test

In [161]:
class TestRandomForestModel(unittest.TestCase):
    
    def test_data_shape(self):
        """Test correct data shape and no missing values."""
        self.assertEqual(X.shape[1], df.shape[1] - 1)  # Checks features minus Target column
        self.assertFalse(X.isnull().any().any(), "Data contains missing values")

    def test_model_training(self):
        """Ensure model can train and achieve accuracy."""
        model.fit(X_train, y_train)
        predictions = model.predict(X_test)
        accuracy = accuracy_score(y_test, predictions)
        self.assertGreaterEqual(accuracy, 0.6, "Model accuracy is below threshold")

    def test_predictions_shape(self):
        """Check prediction shape matches test labels."""
        model.fit(X_train, y_train)
        predictions = model.predict(X_test)
        self.assertEqual(len(predictions), len(y_test), "Prediction length mismatch")

    def test_no_nan_predictions(self):
        """Ensure no NaNs in predictions."""
        model.fit(X_train, y_train)
        predictions = model.predict(X_test)
        self.assertFalse(np.isnan(predictions).any(), "NaN values found in predictions")


In [162]:
if __name__ == '__main__':
    unittest.main(argv=['', 'TestRandomForestModel'], exit=False)

....
----------------------------------------------------------------------
Ran 4 tests in 0.464s

OK


### Edge Case Test

In [163]:
class EdgeCaseTests(unittest.TestCase):
    
    def setUp(self):
        """Set up test data for edge cases using preprocessed dataset."""
        # Assuming the preprocessed data is loaded as X and y
        self.X = pd.read_csv('./csv/2020_out_heading_matched_out.csv')
        self.y = self.X['Target']
        self.X = self.X.drop(columns=['Target'])
        
        self.model = RandomForestClassifier()

    def test_empty_data(self):
        """Test model behavior with empty dataset."""
        X_empty, y_empty = pd.DataFrame(), pd.Series()
        with self.assertRaises(ValueError):
            self.model.fit(X_empty, y_empty)

    def test_single_row(self):
        """Test model with a single row of data."""
        X_single = self.X.iloc[[0]]  # Extract the first row of X
        y_single = self.y.iloc[[0]]  # Extract the corresponding label
        self.model.fit(X_single, y_single)  # Train the model
        prediction = self.model.predict(X_single)  # Making prediction for the single row
        self.assertEqual(len(prediction), 1, "Model failed with single row input")

    def test_large_feature_values(self):
        """Test model with extremely large feature values."""
        X_large = self.X.copy() * 1e6  # Multiply all feature values by 1 million to simulate large values
        self.model.fit(X_large, self.y)  # Train model with the large feature values
        prediction = self.model.predict(X_large.head(1))  # Making prediction for the first sample
        self.assertTrue(np.isfinite(prediction).all(), "Model failed with large feature values")


In [164]:
if __name__ == '__main__':
    unittest.main(argv=['', 'EdgeCaseTests'], exit=False)

...
----------------------------------------------------------------------
Ran 3 tests in 0.320s

OK


### Scalability Test

In [165]:
class ScalabilityTests(unittest.TestCase):
    def test_scalability_large_data(self):
        """Test model's performance on a large dataset."""
        X_large = pd.concat([X] * 50, ignore_index=True)  # 100x original dataset size
        y_large = pd.concat([y] * 50, ignore_index=True)
        
        
        print(f"Shape of X_large: {X_large.shape}")
        print(f"Shape of y_large: {y_large.shape}")
        
        start_time = time.time()
        model.fit(X_large, y_large)
        predictions = model.predict(X_large)
        end_time = time.time()
        
        self.assertEqual(len(predictions), len(y_large), "Mismatch in predictions for large data")
        print("Scalability Test Time:", end_time - start_time)


In [166]:
if __name__ == '__main__':
    unittest.main(argv=['', 'ScalabilityTests'], exit=False)

Shape of X_large: (2500, 363)
Shape of y_large: (2500,)


.
----------------------------------------------------------------------
Ran 1 test in 0.280s

OK


Scalability Test Time: 0.2683243751525879


### Load Test

In [167]:
class LoadTests(unittest.TestCase):
    def test_load(self):
        """Simulate load by making multiple predictions in a loop."""
        model.fit(X_train, y_train)
        start_time = time.time()
        
        for p in range(1000):
            p = model.predict(X_test)
        
        end_time = time.time()
        print("Load test")

        print("Load Test Time for 1000 predictions:", end_time - start_time)


In [168]:
if __name__ == '__main__':
    unittest.main(argv=['', 'LoadTests'], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 7.208s

OK


Load test
Load Test Time for 1000 predictions: 7.082529783248901


### Speed/Latency Test

In [169]:
class SpeedLatencyTests(unittest.TestCase):
    def test_prediction_speed(self):
        """Measure the speed of prediction for latency optimization."""
        model.fit(X_train, y_train)
        
        # Test prediction latency
        start_time = time.time()
        predictions = model.predict(X_test)
        end_time = time.time()
        
        latency = end_time - start_time
        print("Prediction Latency:", latency)
        self.assertLess(latency, 1, "Prediction latency is too high")


In [170]:
if __name__ == '__main__':
    unittest.main(argv=['', 'SpeedLatencyTests'], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 0.139s

OK


Prediction Latency: 0.006833314895629883


### Corrected Error

In [178]:

class CorrectedErrorsTests(unittest.TestCase):
    def test_corrected_null_handling(self):
        """Test that corrected error in handling nulls is fixed and does not reoccur."""
        # DataFrame with missing values:None represents NaN
        data = pd.DataFrame([[None, 2], [4, None]])
        
        # Preprocess the data to handle null values
        preprocessed_data = preprocess_data(data)
        self.assertFalse(preprocessed_data.isnull().any().any(), "Null handling error not corrected")


In [179]:
if __name__ == '__main__':
    unittest.main(argv=['', 'CorrectedErrorsTests'], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 0.003s

OK


### Engaged optimizations/improvements

In [180]:
import time
import pandas as pd
import unittest

class OptimizationTests(unittest.TestCase):
    def test_optimized_preprocessing(self):
        """Test if preprocessing optimization reduces execution time."""
        # assigning file path
        file_path = './csv/2020_out_heading_matched_out.csv'
        
        # Unoptimized timing: without optimization applied
        unoptimized_start = time.time()
        unoptimized_data = pd.read_csv(file_path)
        preprocess_data(unoptimized_data)  # unoptimized preprocessing
        unoptimized_time = time.time() - unoptimized_start

        # Optimized timing: after applying optimization in preprocess_data
        optimized_start = time.time()
        optimized_data = pd.read_csv(file_path)
        preprocess_data(optimized_data)  # optimized preprocessing
        optimized_time = time.time() - optimized_start

        print("Unoptimized Time:", unoptimized_time)
        print("Optimized Time:", optimized_time)

        self.assertLess(optimized_time, unoptimized_time, "Optimizations did not improve performance")

In [181]:
if __name__ == '__main__':
    unittest.main(argv=['', 'OptimizationTests'], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 0.040s

OK


Unoptimized Time: 0.01974344253540039
Optimized Time: 0.01813483238220215


In [182]:
if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)

......

Load test
Load Test Time for 1000 predictions: 7.176939487457275
Unoptimized Time: 0.01289057731628418
Optimized Time: 0.011537790298461914
Shape of X_large: (2500, 363)
Shape of y_large: (2500,)


...

Scalability Test Time: 0.34461450576782227
Prediction Latency: 0.006262779235839844


...
----------------------------------------------------------------------
Ran 12 tests in 8.631s

OK
