# Unit Tests for Assessment Notebook Functions

This notebook contains comprehensive unit tests for all functions defined in the assessment notebook.

In [48]:
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

logger.info("Creating SparkSession for unit tests")

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("unit_tests") \
    .master("local") \
    .getOrCreate()
logger.info("SparkSession created for testing")

2026-02-08 15:16:11,408 - __main__ - INFO - Creating SparkSession for unit tests
2026-02-08 15:16:11,424 - __main__ - INFO - SparkSession created for testing


In [49]:
import unittest

import json
import tempfile
import os
import shutil
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime
from user_functions import *

## Test remove_special_characters Function

In [50]:
logger.info("Test 1 : Testing remove_special_characters function")

# Create test DataFrame with special characters
test_data1 = [("hello@world!",), ("test#data$",), ("abc-123,456",)]
test_schema1 = StructType([StructField("text", StringType(), True)])
df_test1 = spark.createDataFrame(test_data1, test_schema1)

df_result1 = remove_special_characters(df_test1, "text")


result_values = df_result1.collect()
assert result_values[0]['text'] == 'helloworld', f"Expected 'helloworld', got '{result_values[0]['text']}'"
assert result_values[1]['text'] == 'testdata', f"Expected 'testdata', got '{result_values[1]['text']}'"
assert result_values[2]['text'] == 'abc-123,456', f"Expected 'abc-123,456', got '{result_values[2]['text']}'"

logger.info("Test 1 passed: remove_special_characters works correctly")
print("\nTest 1 passed!\n")

2026-02-08 15:16:11,494 - __main__ - INFO - Test 1 : Testing remove_special_characters function


2026-02-08 15:16:11,677 - __main__ - INFO - Test 1 passed: remove_special_characters works correctly



Test 1 passed!



## Test convert_to_numeric Function

In [51]:
logger.info("Test 2.A : Testing convert_to_numeric function (int conversion)")

# Test int conversion
test_data2 = [("$100",), ("200",), ("$300abc",)]
test_schema2 = StructType([StructField("amount", StringType(), True)])
df_test2 = spark.createDataFrame(test_data2, test_schema2)

df_result2 = convert_to_numeric(df_test2, "amount", to_double=False)

result_values2 = df_result2.collect()
assert result_values2[0]['amount'] == 100, f"Expected 100, got {result_values2[0]['amount']}"
assert result_values2[1]['amount'] == 200, f"Expected 200, got {result_values2[1]['amount']}"

logger.info("Test 2.A passed: convert_to_numeric (int) works correctly")
print("Test 2.A passed!\n")

# Test double conversion
logger.info("Test 2.B: Testing convert_to_numeric function (double conversion)")
test_data2b = [("$100.50",), ("200.75",), ("$300.99abc",)]
df_test2b = spark.createDataFrame(test_data2b, test_schema2)

df_result2b = convert_to_numeric(df_test2b, "amount", to_double=True)

result_values2b = df_result2b.collect()
assert result_values2b[0]['amount'] == 100.50, f"Expected 100.50, got {result_values2b[0]['amount']}"
assert result_values2b[1]['amount'] == 200.75, f"Expected 200.75, got {result_values2b[1]['amount']}"

logger.info("Test 2.B passed: convert_to_numeric (double) works correctly")
print("Test 2.B passed!\n")

2026-02-08 15:16:11,713 - __main__ - INFO - Test 2.A : Testing convert_to_numeric function (int conversion)
2026-02-08 15:16:11,891 - __main__ - INFO - Test 2.A passed: convert_to_numeric (int) works correctly
2026-02-08 15:16:11,893 - __main__ - INFO - Test 2.B: Testing convert_to_numeric function (double conversion)
2026-02-08 15:16:12,058 - __main__ - INFO - Test 2.B passed: convert_to_numeric (double) works correctly


Test 2.A passed!

Test 2.B passed!



## Test convert_to_datetime Function

In [52]:
logger.info("Test 3: Testing convert_to_datetime function")

test_data3 = [("2020-01-15T10:30:45.000",), ("2021-06-20T14:45:30.500",)]
test_schema3 = StructType([StructField("date_str", StringType(), True)])
df_test3 = spark.createDataFrame(test_data3, test_schema3)

df_result3 = convert_to_datetime(df_test3, "date_str")

assert df_result3.schema['date_str'].dataType.typeName() == 'timestamp', "Column should be timestamp type"

logger.info("Test 3 passed: convert_to_datetime works correctly")
print("Test 3 passed!\n")

2026-02-08 15:16:12,088 - __main__ - INFO - Test 3: Testing convert_to_datetime function
2026-02-08 15:16:12,145 - __main__ - INFO - Test 3 passed: convert_to_datetime works correctly


Test 3 passed!



## Test convert_to_tilecase Function

In [54]:
logger.info("Test 4: Testing convert_to_tilecase function")

test_data4 = [("hello world",), ("PYSPARK CODE",), ("  python programming  ",)]
test_schema4 = StructType([StructField("name", StringType(), True)])
df_test4 = spark.createDataFrame(test_data4, test_schema4)

df_result4 = convert_to_tilecase(df_test4, "name")

result_values4 = df_result4.collect()
assert result_values4[0]['name'] == 'Hello World', f"Expected 'Hello World', got '{result_values4[0]['name']}'"
assert result_values4[1]['name'] == 'Pyspark Code', f"Expected 'Pyspark Code', got '{result_values4[1]['name']}'"
assert result_values4[2]['name'] == 'Python Programming', f"Expected 'Python Programming', got '{result_values4[2]['name']}'"

logger.info("Test 4 passed: convert_to_tilecase works correctly")
print("Test 4 passed!\n")

2026-02-08 15:16:20,751 - __main__ - INFO - Test 4: Testing convert_to_tilecase function
2026-02-08 15:16:21,004 - __main__ - INFO - Test 4 passed: convert_to_tilecase works correctly


Test 4 passed!



## Test remove_duplicates Function

In [None]:
logger.info("Test 5: Testing remove_duplicates function")

test_data5 = [
    (1, "2020-01-01"),
    (1, "2020-01-02"),
    (2, "2020-01-01"),
    (2, "2020-01-03"),
]
test_schema5 = StructType([
    StructField("id", IntegerType(), True),
    StructField("date", StringType(), True)
])
df_test5 = spark.createDataFrame(test_data5, test_schema5)

df_result5 = remove_duplicates(df_test5, dedup_grain=['id'], order_grain=['date'], is_desc=True)

assert df_result5.count() == 2, f"Expected 2 rows after dedup, got {df_result5.count()}"

result_values5 = df_result5.collect()
dates_by_id = {row['id']: row['date'] for row in result_values5}
assert dates_by_id[1] == '2020-01-02', f"Expected latest date for id=1 to be 2020-01-02, got {dates_by_id[1]}"
assert dates_by_id[2] == '2020-01-03', f"Expected latest date for id=2 to be 2020-01-03, got {dates_by_id[2]}"

logger.info("Test 5 passed: remove_duplicates works correctly")
print("Test 5 passed!\n")

2026-02-08 15:14:19,862 - __main__ - INFO - Test 5: Testing remove_duplicates function
2026-02-08 15:14:19,924 - root - INFO - Removing duplicates with dedup_grain=['id'], order_grain=['date'], is_desc=True
2026-02-08 15:14:22,094 - root - INFO - Deduplication complete: 4 rows reduced to 2 rows
2026-02-08 15:14:25,628 - __main__ - INFO - Test 5 passed: remove_duplicates works correctly


Test 5 passed!



## Test col_rename_with_mapping Function

In [55]:
logger.info("Test 6: Testing col_rename_with_mapping function")

mapping_dict = {
    "old_col1": "new_col1",
    "old_col2": "new_col2"
}

with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
    json.dump(mapping_dict, f)
    temp_mapping_file = f.name

try:
    test_data6 = [(1, 'John'), (2, 'Jane')]
    test_schema6 = StructType([
        StructField("old_col1", IntegerType(), True),
        StructField("old_col2", StringType(), True)
    ])
    df_test6 = spark.createDataFrame(test_data6, test_schema6)

    df_result6 = col_rename_with_mapping(df_test6, temp_mapping_file)
    
    assert "new_col1" in df_result6.columns, "new_col1 should exist after rename"
    assert "new_col2" in df_result6.columns, "new_col2 should exist after rename"
    assert "old_col1" not in df_result6.columns, "old_col1 should not exist after rename"
    assert "old_col2" not in df_result6.columns, "old_col2 should not exist after rename"
    
    logger.info("Test 6 passed: col_rename_with_mapping works correctly")
    print("Test 6 passed!\n")
    
finally:
    os.unlink(temp_mapping_file)

2026-02-08 15:16:59,448 - __main__ - INFO - Test 6: Testing col_rename_with_mapping function
2026-02-08 15:16:59,495 - root - INFO - Checking if the input is Pyspark DataFrame or not
2026-02-08 15:16:59,497 - root - INFO - Input is a Spark DataFrame. Proceeding with column renaming.
2026-02-08 15:16:59,499 - root - INFO - Loading column mapping from path : /tmp/tmpisco0wj0.json
2026-02-08 15:16:59,500 - root - INFO - Column mapping loaded successfully , Proceeding with column renaming
2026-02-08 15:16:59,520 - root - INFO - Columns rename completed as per mapping
2026-02-08 15:16:59,523 - __main__ - INFO - Test 6 passed: col_rename_with_mapping works correctly


Test 6 passed!



## Test drop_columns Function

In [57]:
logger.info("Test 7: Testing drop_columns function")

test_data7 = [(1, 'Vamsi', 'Engineer', 50000)]
test_schema7 = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("title", StringType(), True),
    StructField("salary", IntegerType(), True)
])
df_test7 = spark.createDataFrame(test_data7, test_schema7)

columns_to_drop = ['title', 'salary']
df_result7 = drop_columns(df_test7, columns_to_drop)

assert 'id' in df_result7.columns, "id column should still exist"
assert 'name' in df_result7.columns, "name column should still exist"
assert 'title' not in df_result7.columns, "title column should be dropped"
assert 'salary' not in df_result7.columns, "salary column should be dropped"
assert len(df_result7.columns) == 2, f"Expected 2 columns, got {len(df_result7.columns)}"

logger.info("Test 7 passed: drop_columns works correctly")
print("Test 7 passed!\n")

2026-02-08 15:17:09,543 - __main__ - INFO - Test 7: Testing drop_columns function
2026-02-08 15:17:09,597 - root - INFO - Dropping 2 columns: ['title', 'salary']
2026-02-08 15:17:09,611 - __main__ - INFO - Test 7 passed: drop_columns works correctly


Test 7 passed!



## Test annualize_salary Function

In [58]:
logger.info("Test 8: Testing annualize_salary function")

test_data8a = [
    (1000, 2000, "Annual"),
    (10, 15, "Hourly"),
    (100, 150, "Daily")
]
test_schema8a = StructType([
    StructField("salary_min_range", IntegerType(), True),
    StructField("salary_max_range", IntegerType(), True),
    StructField("salary_frequency", StringType(), True)
])
df_test8a = spark.createDataFrame(test_data8a, test_schema8a)

df_result8a = annualize_salary(df_test8a)

result_values8a = df_result8a.collect()
assert result_values8a[0]['annualized_salary_min_range'] == 1000, "Annual salary should not be changed"
assert result_values8a[1]['annualized_salary_min_range'] == 20800, "Hourly salary should be multiplied by 2080"
assert result_values8a[2]['annualized_salary_min_range'] == 26000, "Daily salary should be multiplied by 260"

logger.info("Test 8 passed: annualize_salary works correctly")
print("Test 8 passed!\n")



2026-02-08 15:17:13,867 - __main__ - INFO - Test 8: Testing annualize_salary function
2026-02-08 15:17:13,914 - root - INFO - Starting salary annualization
2026-02-08 15:17:14,166 - root - INFO - Salary annualization completed
2026-02-08 15:17:14,288 - __main__ - INFO - Test 8 passed: annualize_salary works correctly


Test 8 passed!



# Test create_qualification_indicator Function

In [59]:

logger.info("Test 9: Testing create_qualification_indicator function")

test_data9 = [
    ("Bachelor's degree required",),
    ("High school diploma",),
    ("Master's degree preferred",)
]
test_schema9 = StructType([StructField("min_qualify_requirements", StringType(), True)])
df_test9 = spark.createDataFrame(test_data9, test_schema9)

df_result9 = create_qualification_indicator(df_test9)

result_values9 = df_result9.collect()
assert result_values9[0]['is_degree_req'] == 1, "Should detect degree requirement"
assert result_values9[1]['is_degree_req'] == 0, "Should not detect degree requirement"
assert result_values9[2]['is_degree_req'] == 1, "Should detect master's degree"

logger.info("✓ Test 9 passed: create_qualification_indicator works correctly")
print("Test 9 passed!\n")

2026-02-08 15:17:17,626 - __main__ - INFO - Test 9: Testing create_qualification_indicator function
2026-02-08 15:17:17,665 - root - INFO - Creating qualification indicator column
2026-02-08 15:17:17,706 - root - INFO - Qualification indicator column created
2026-02-08 15:17:17,805 - __main__ - INFO - ✓ Test 9 passed: create_qualification_indicator works correctly


Test 9 passed!



## Test display Function

In [61]:
logger.info("Test 10: Testing display function")

# Create test DataFrame
test_data9 = [(1, 'Vamsi', 100), (2, 'Krishna', 200), (3, 'Virat', 300)]
test_schema9 = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("value", IntegerType(), True)
])
df_test9 = spark.createDataFrame(test_data9, test_schema9)

logger.info("Testing display function:")

assert df_test9.count() == 3, f"Expected 3 rows, got {df_test9.count()}"
assert len(df_test9.columns) == 3, f"Expected 3 columns, got {len(df_test9.columns)}"

logger.info("Test 10 passed: display function works correctly")
print("Test 10 passed!\n")

2026-02-08 15:17:38,776 - __main__ - INFO - Test 10: Testing display function
2026-02-08 15:17:38,837 - __main__ - INFO - Testing display function:
2026-02-08 15:17:38,964 - __main__ - INFO - Test 10 passed: display function works correctly


Test 10 passed!



## Test export_to_csv Function

In [62]:
logger.info("Test 11: Testing export_to_csv function")

def export_to_csv(df, output_path, file_name):
    """Exports DataFrame to CSV"""
    logger.info(f"Starting CSV export: {output_path}/{file_name}")
    
    temp_output_dir = output_path + "/temp_output_folder"
    final_filename = output_path + "/" + file_name
    
    df.coalesce(1).write.option("header", "true").csv(temp_output_dir, mode="overwrite")
    
    files = os.listdir(temp_output_dir)
    for file in files:
        if file.endswith(".csv"):
            os.rename(os.path.join(temp_output_dir, file), final_filename)
            break
    
    shutil.rmtree(temp_output_dir)
    logger.info(f"CSV export completed: {final_filename}")


test_data10 = [(1, 'Vamsi'), (2, 'Krishna'), (3, 'Mahesh')]
test_schema10 = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])
df_test10 = spark.createDataFrame(test_data10, test_schema10)


with tempfile.TemporaryDirectory() as temp_dir:
    logger.info(f"Exporting to temporary directory: {temp_dir}")
    
    export_to_csv(df_test10, temp_dir, "test_output.csv")
    

    expected_file = os.path.join(temp_dir, "test_output.csv")
    assert os.path.exists(expected_file), f"Expected file {expected_file} not found"
    
    with open(expected_file, 'r') as f:
        lines = f.readlines()
        assert len(lines) == 4, f"Expected 4 lines (header + 3 data rows), got {len(lines)}"
        assert 'id' in lines[0], "CSV header should contain 'id'"
        assert 'name' in lines[0], "CSV header should contain 'name'"
    
    logger.info("Test 11 passed: export_to_csv works correctly")
    print("Test 11 passed!\n")

2026-02-08 15:17:54,083 - __main__ - INFO - Test 11: Testing export_to_csv function
2026-02-08 15:17:54,139 - __main__ - INFO - Exporting to temporary directory: /tmp/tmpx1guj9r7
2026-02-08 15:17:54,143 - __main__ - INFO - Starting CSV export: /tmp/tmpx1guj9r7/test_output.csv
2026-02-08 15:17:54,525 - __main__ - INFO - CSV export completed: /tmp/tmpx1guj9r7/test_output.csv
2026-02-08 15:17:54,527 - __main__ - INFO - Test 11 passed: export_to_csv works correctly


Test 11 passed!



## Test Summary and Conclusions

In [63]:
logger.info("=" * 80)
logger.info("TEST EXECUTION SUMMARY")
logger.info("=" * 80)

test_results = {
    "Test 1": "✓ remove_special_characters - PASSED",
    "Test 2a": "✓ convert_to_numeric (int) - PASSED",
    "Test 2b": "✓ convert_to_numeric (double) - PASSED",
    "Test 3": "✓ convert_to_datetime - PASSED",
    "Test 4": "✓ convert_to_tilecase - PASSED",
    "Test 5": "✓ remove_duplicates - PASSED",
    "Test 6": "✓ col_rename_with_mapping - PASSED",
    "Test 7": "✓ drop_columns - PASSED",
    "Test 8": "✓ annualize_salary - PASSED",
    "Test 9": "✓ create_qualification_indicator - PASSED",
    "Test 10": "✓ display - PASSED",
    "Test 11": "✓ export_to_csv - PASSED"
}

print("\n" + "=" * 80)
print("UNIT TEST RESULTS")
print("=" * 80)
for test_name, result in test_results.items():
    print(f"{test_name}: {result}")
    logger.info(f"{test_name}: {result}")

print("=" * 80)
print(f"Total Tests: {len(test_results)}")
print(f"Passed: {len(test_results)}")
print(f"Failed: 0")
print("=" * 80)

logger.info("All unit tests passed successfully!")
logger.info("=" * 80)

2026-02-08 15:17:57,615 - __main__ - INFO - TEST EXECUTION SUMMARY
2026-02-08 15:17:57,622 - __main__ - INFO - Test 1: ✓ remove_special_characters - PASSED
2026-02-08 15:17:57,624 - __main__ - INFO - Test 2a: ✓ convert_to_numeric (int) - PASSED
2026-02-08 15:17:57,625 - __main__ - INFO - Test 2b: ✓ convert_to_numeric (double) - PASSED
2026-02-08 15:17:57,626 - __main__ - INFO - Test 3: ✓ convert_to_datetime - PASSED
2026-02-08 15:17:57,628 - __main__ - INFO - Test 4: ✓ convert_to_tilecase - PASSED
2026-02-08 15:17:57,629 - __main__ - INFO - Test 5: ✓ remove_duplicates - PASSED
2026-02-08 15:17:57,630 - __main__ - INFO - Test 6: ✓ col_rename_with_mapping - PASSED
2026-02-08 15:17:57,632 - __main__ - INFO - Test 7: ✓ drop_columns - PASSED
2026-02-08 15:17:57,633 - __main__ - INFO - Test 8: ✓ annualize_salary - PASSED
2026-02-08 15:17:57,636 - __main__ - INFO - Test 9: ✓ create_qualification_indicator - PASSED
2026-02-08 15:17:57,640 - __main__ - INFO - Test 10: ✓ display - PASSED
2026-02


UNIT TEST RESULTS
Test 1: ✓ remove_special_characters - PASSED
Test 2a: ✓ convert_to_numeric (int) - PASSED
Test 2b: ✓ convert_to_numeric (double) - PASSED
Test 3: ✓ convert_to_datetime - PASSED
Test 4: ✓ convert_to_tilecase - PASSED
Test 5: ✓ remove_duplicates - PASSED
Test 6: ✓ col_rename_with_mapping - PASSED
Test 7: ✓ drop_columns - PASSED
Test 8: ✓ annualize_salary - PASSED
Test 9: ✓ create_qualification_indicator - PASSED
Test 10: ✓ display - PASSED
Test 11: ✓ export_to_csv - PASSED
Total Tests: 12
Passed: 12
Failed: 0
