<a href="https://colab.research.google.com/github/suriarasai/BEAD2026/blob/main/colab/03d_Functional_PySpark_UseCase.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Data Cleansing Tutorial: Singapore Manufacturing Equipment Data

This notebook demonstrates a Use Case with:
1. Core RDD functions (map, filter, reduce, flatMap, groupByKey, reduceByKey)
2. User-Defined Functions (UDFs)
3. Higher-Order Functions
4. Currying
5. Real-world data cleansing scenarios

## Context

Equipment sensor data from Singapore Manufacturing Plants.

This cell imports several essential modules for the PySpark data cleansing tutorial:

*   `from pyspark import SparkContext, SparkConf`: These are fundamental classes from the PySpark library. `SparkContext` is the entry point for Spark functionality, allowing you to create RDDs (Resilient Distributed Datasets) and interact with the Spark cluster. `SparkConf` is used to configure various Spark parameters, such as the application name and master URL.
*   `from datetime import datetime`: This imports the `datetime` class from Python's built-in `datetime` module, which is crucial for working with dates and times, including parsing, formatting, and performing calculations on date/time objects.
*   `import re`: This imports Python's built-in `re` module, which provides regular expression operations. It's often used for pattern matching and text manipulation, useful in data cleansing for tasks like extracting specific patterns or validating strings.
*   `from functools import reduce`: This imports the `reduce` function from the `functools` module. `reduce` applies a given function to the items of an iterable in a cumulative way, from left to right, to reduce the iterable to a single output. It's a powerful tool for aggregation.
*   `from typing import Callable, List, Tuple, Optional`: These are type hints from Python's `typing` module. They are used to specify the expected types of function arguments and return values, enhancing code readability, maintainability, and enabling static analysis tools to catch potential errors early.

In [1]:
from pyspark import SparkContext, SparkConf
from datetime import datetime
import re
from functools import reduce
from typing import Callable, List, Tuple, Optional

# Spark Context



In [2]:
# Initialize Spark Context
conf = SparkConf().setAppName("SingaporeEquipmentDataCleansing").setMaster("local[*]")
sc = SparkContext(conf=conf)

## Raw equipment data with typical data quality issues:
- Inconsistent date formats
- Missing values
- Inconsistent location formats
- Temperature in different units (Celsius/Fahrenheit)
- Whitespace issues
- Invalid sensor readings


In [3]:
raw_equipment_data = [
    "EQ001|CNC-MILLING-001|Jurong Industrial Estate|25-12-2024 14:30|85.5|CELSIUS|NORMAL|  ",
    "EQ002|HYDRAULIC-PRESS-042|   Tuas South Ave 3   |2024/12/25 15:45|187.2|FAHRENHEIT|ALERT|Oil Pressure Low",
    "EQ003|ROBOTIC-ARM-123|Woodlands Industrial Park|25/12/2024 16:00|78.0|CELSIUS||Sensor offline",
    "EQ004|INJECTION-MOLD-099|Changi Business Park|25-12-2024 16:15|NULL|CELSIUS|NORMAL|",
    "EQ005|LATHE-MACHINE-067|Ang Mo Kio Industrial Park 2|2024-12-25 17:00|92.3|CELSIUS|WARNING|Vibration detected",
    "EQ006|CONVEYOR-SYSTEM-221|Jurong Industrial Estate|25/12/2024|75.5|FAHRENHEIT|NORMAL|",
    "EQ007|WELDING-ROBOT-456|Tuas South Ave 3|25-12-2024 18:30|-999|CELSIUS|ERROR|Temperature sensor fault",
    "EQ008||Woodlands Industrial Park|25-12-2024 19:00|88.1|CELSIUS|NORMAL|",
    "EQ009|CUTTING-MACHINE-789|Changi Business Park|2024/12/25 19:30|95.0|CELSIUS|WARNING|Blade wear detected",
    "EQ010|PACKAGING-UNIT-334|Clementi Loop|INVALID_DATE|82.5|CELSIUS|NORMAL|",
]

Create RDD from raw data

In [4]:
equipment_rdd = sc.parallelize(raw_equipment_data)

print("\nRaw Equipment Data Sample:")
for record in equipment_rdd.take(5):
    print(record)


Raw Equipment Data Sample:
EQ001|CNC-MILLING-001|Jurong Industrial Estate|25-12-2024 14:30|85.5|CELSIUS|NORMAL|  
EQ002|HYDRAULIC-PRESS-042|   Tuas South Ave 3   |2024/12/25 15:45|187.2|FAHRENHEIT|ALERT|Oil Pressure Low
EQ003|ROBOTIC-ARM-123|Woodlands Industrial Park|25/12/2024 16:00|78.0|CELSIUS||Sensor offline
EQ004|INJECTION-MOLD-099|Changi Business Park|25-12-2024 16:15|NULL|CELSIUS|NORMAL|


## Understanding Core Functions

This sample code uses map, filters and flatmap.

### Map and Tuples

This cell defines a function `parse_equipment_record` which takes a single string `record` (expected to be pipe-delimited) and splits it into individual fields. It then uses a generator expression `(field.strip() for field in fields)` to remove leading/trailing whitespace from each field and converts the result into a `tuple`.

The `equipment_rdd.map(parse_equipment_record)` line applies this `parse_equipment_record` function to every element in the `equipment_rdd`. The `map` transformation is a core Spark operation that applies a function to each element of an RDD, returning a new RDD. Here, it transforms each raw string record into a tuple of cleaned fields, creating the `parsed_rdd`.

In [5]:
def parse_equipment_record(record: str) -> tuple:
    """Parse pipe-delimited equipment record into tuple"""
    fields = record.split('|')
    return tuple(field.strip() for field in fields)

parsed_rdd = equipment_rdd.map(parse_equipment_record)

print("Parsed Records (first 3):")
for record in parsed_rdd.take(3):
    print(record)

Parsed Records (first 3):
('EQ001', 'CNC-MILLING-001', 'Jurong Industrial Estate', '25-12-2024 14:30', '85.5', 'CELSIUS', 'NORMAL', '')
('EQ002', 'HYDRAULIC-PRESS-042', 'Tuas South Ave 3', '2024/12/25 15:45', '187.2', 'FAHRENHEIT', 'ALERT', 'Oil Pressure Low')
('EQ003', 'ROBOTIC-ARM-123', 'Woodlands Industrial Park', '25/12/2024 16:00', '78.0', 'CELSIUS', '', 'Sensor offline')


### Filters


This cell defines a function `is_valid_record` to check if a parsed record meets basic validity criteria. Specifically, it verifies that the record has a minimum number of fields and that essential fields (equipment_id, equipment_type, location) are not empty. It also checks for the presence of 'INVALID' in the timestamp.

The `valid_rdd = parsed_rdd.filter(is_valid_record)` line applies this validation function to every record in the `parsed_rdd`. The `filter` transformation is a core Spark operation that returns a new RDD containing only the elements for which the given function returns `True`. The cell then prints the counts of total, valid, and removed records to quantify the data quality improvement.

In [6]:
def is_valid_record(record: tuple) -> bool:
    """Check if equipment record has minimum required fields"""
    # Must have at least: equipment_id, equipment_type, location, timestamp
    if len(record) < 8:
        return False

    equipment_id, equipment_type, location, timestamp = record[0:4]

    # Check for essential fields
    if not equipment_id or not equipment_type or not location:
        return False

    # Check for invalid timestamp
    if 'INVALID' in timestamp.upper():
        return False

    return True

valid_rdd = parsed_rdd.filter(is_valid_record)

print(f"Total records: {parsed_rdd.count()}")
print(f"Valid records: {valid_rdd.count()}")
print(f"Invalid records removed: {parsed_rdd.count() - valid_rdd.count()}")

Total records: 10
Valid records: 8
Invalid records removed: 2


### FlatMap



This cell defines `extract_location_words` to extract significant words from the location field of a record. It then uses `flatMap()` on `valid_rdd` to get a new RDD where each record's location words are flattened into individual elements, followed by `distinct()` to find unique location keywords.

In [7]:
def extract_location_words(record: tuple) -> List[str]:
    """Extract individual words from location field"""
    location = record[2]
    # Split by spaces and filter out common words
    words = location.split()
    return [word for word in words if len(word) > 3]

location_words_rdd = valid_rdd.flatMap(extract_location_words)

print("Unique location keywords:")
unique_locations = location_words_rdd.distinct().collect()
print(unique_locations)

Unique location keywords:
['Industrial', 'Tuas', 'Changi', 'Jurong', 'Estate', 'South', 'Woodlands', 'Park', 'Business']


## User Defined Functions


### Data Normalization using UDF


This cell introduces two functions: `normalize_date` which converts various date string formats to a standard ISO format, and `apply_date_normalization` which applies this conversion to the timestamp field of each record. The `map()` transformation creates `normalized_date_rdd`, and examples are printed.

In [8]:
def normalize_date(date_str: str) -> Optional[str]:
    """
    Normalize various date formats to ISO format (YYYY-MM-DD HH:MM)
    Handles: DD-MM-YYYY, YYYY/MM/DD, DD/MM/YYYY
    """
    if not date_str or 'INVALID' in date_str.upper():
        return None

    # Try different date formats
    formats = [
        '%d-%m-%Y %H:%M',      # 25-12-2024 14:30
        '%Y/%m/%d %H:%M',      # 2024/12/25 15:45
        '%d/%m/%Y %H:%M',      # 25/12/2024 16:00
        '%Y-%m-%d %H:%M',      # 2024-12-25 17:00
        '%d-%m-%Y',            # 25-12-2024 (no time)
        '%d/%m/%Y',            # 25/12/2024 (no time)
    ]

    for fmt in formats:
        try:
            dt = datetime.strptime(date_str.strip(), fmt)
            return dt.strftime('%Y-%m-%d %H:%M')
        except ValueError:
            continue

    return None

def apply_date_normalization(record: tuple) -> tuple:
    """Apply date normalization to record"""
    record_list = list(record)
    record_list[3] = normalize_date(record_list[3])
    return tuple(record_list)

normalized_date_rdd = valid_rdd.map(apply_date_normalization)

print("Date Normalization Examples:")
for record in normalized_date_rdd.take(3):
    print(f"Equipment: {record[0]}, Normalized Date: {record[3]}")


Date Normalization Examples:
Equipment: EQ001, Normalized Date: 2024-12-25 14:30
Equipment: EQ002, Normalized Date: 2024-12-25 15:45
Equipment: EQ003, Normalized Date: 2024-12-25 16:00


### Temperature Conversion UDF



In [9]:
def fahrenheit_to_celsius(fahrenheit: float) -> float:
    """Convert Fahrenheit to Celsius"""
    return (fahrenheit - 32) * 5.0 / 9.0

def normalize_temperature(record: tuple) -> tuple:
    """
    Normalize temperature to Celsius and handle invalid readings
    Invalid readings: NULL, -999, or out of range
    """
    record_list = list(record)
    temp_str = record_list[4]
    unit = record_list[5]

    # Handle NULL or invalid values
    if temp_str == 'NULL' or temp_str == '-999':
        record_list[4] = None
        record_list[5] = 'CELSIUS'
        return tuple(record_list)

    try:
        temp = float(temp_str)

        # Convert Fahrenheit to Celsius
        if unit == 'FAHRENHEIT':
            temp = fahrenheit_to_celsius(temp)
            record_list[5] = 'CELSIUS'

        # Validate reasonable temperature range (0-150°C for machinery)
        if temp < 0 or temp > 150:
            record_list[4] = None
        else:
            record_list[4] = round(temp, 2)

    except ValueError:
        record_list[4] = None

    return tuple(record_list)

normalized_temp_rdd = normalized_date_rdd.map(normalize_temperature)

print("Temperature Normalization Examples:")
for record in normalized_temp_rdd.take(5):
    print(f"Equipment: {record[0]}, Temp: {record[4]}°C, Status: {record[6]}")


Temperature Normalization Examples:
Equipment: EQ001, Temp: 85.5°C, Status: NORMAL
Equipment: EQ002, Temp: 86.22°C, Status: ALERT
Equipment: EQ003, Temp: 78.0°C, Status: 
Equipment: EQ004, Temp: None°C, Status: NORMAL


### Location Standardization UDF

In [10]:
def standardize_location(location: str) -> str:
    """
    Standardize Singapore industrial location names
    """
    location = location.strip()

    # Standardization rules for common Singapore industrial areas
    standardization_map = {
        'Jurong Industrial Estate': 'JURONG_IE',
        'Tuas South Ave 3': 'TUAS_SOUTH',
        'Tuas South': 'TUAS_SOUTH',
        'Woodlands Industrial Park': 'WOODLANDS_IP',
        'Changi Business Park': 'CHANGI_BP',
        'Ang Mo Kio Industrial Park 2': 'AMK_IP2',
        'Clementi Loop': 'CLEMENTI',
    }

    for key, value in standardization_map.items():
        if key.lower() in location.lower():
            return value

    return location.upper().replace(' ', '_')

def apply_location_standardization(record: tuple) -> tuple:
    """Apply location standardization to record"""
    record_list = list(record)
    record_list[2] = standardize_location(record_list[2])
    return tuple(record_list)

standardized_rdd = normalized_temp_rdd.map(apply_location_standardization)

print("Location Standardization Examples:")
for record in standardized_rdd.take(5):
    print(f"Equipment: {record[0]}, Location: {record[2]}")


Location Standardization Examples:
Equipment: EQ001, Location: JURONG_IE
Equipment: EQ002, Location: TUAS_SOUTH
Equipment: EQ003, Location: WOODLANDS_IP
Equipment: EQ004, Location: CHANGI_BP
Equipment: EQ005, Location: AMK_IP2


## Higher Order Functions

### Function Composition

In [11]:
def compose(*functions):
    """
    Higher-order function that composes multiple functions
    compose(f, g, h)(x) = f(g(h(x)))
    """
    def inner(arg):
        result = arg
        for func in reversed(functions):
            result = func(result)
        return result
    return inner

# Individual transformation functions
def clean_whitespace(record: tuple) -> tuple:
    """Remove extra whitespace from all fields"""
    return tuple(field.strip() if isinstance(field, str) else field for field in record)

def uppercase_status(record: tuple) -> tuple:
    """Convert status field to uppercase"""
    record_list = list(record)
    if len(record_list) > 6:
        record_list[6] = record_list[6].upper()
    return tuple(record_list)

def add_processing_flag(record: tuple) -> tuple:
    """Add a processing flag to indicate data has been cleansed"""
    return record + ('CLEANSED',)

# Compose all transformation functions
full_transform = compose(add_processing_flag, uppercase_status, clean_whitespace)

# Apply composed transformation
composed_rdd = standardized_rdd.map(full_transform)

print("Composed Transformation Examples:")
for record in composed_rdd.take(3):
    print(f"Equipment: {record[0]}, Status: {record[6]}, Flag: {record[-1]}")

# 4.2 Filter with Predicate Generator

Composed Transformation Examples:
Equipment: EQ001, Status: NORMAL, Flag: CLEANSED
Equipment: EQ002, Status: ALERT, Flag: CLEANSED
Equipment: EQ003, Status: , Flag: CLEANSED


### Higher Order Filters with Predicates

In [12]:
def create_temperature_filter(min_temp: float, max_temp: float) -> Callable:
    """
    Higher-order function that returns a filter predicate
    for temperature range
    """
    def temperature_predicate(record: tuple) -> bool:
        temp = record[4]
        if temp is None:
            return False
        try:
            temp_float = float(temp)
            return min_temp <= temp_float <= max_temp
        except (ValueError, TypeError):
            return False
    return temperature_predicate

# Create different temperature filters
normal_temp_filter = create_temperature_filter(60.0, 90.0)
high_temp_filter = create_temperature_filter(90.0, 150.0)

normal_temp_equipment = composed_rdd.filter(normal_temp_filter)
high_temp_equipment = composed_rdd.filter(high_temp_filter)

print(f"Equipment in normal temperature range (60-90°C): {normal_temp_equipment.count()}")
print(f"Equipment in high temperature range (90-150°C): {high_temp_equipment.count()}")

print("\nHigh Temperature Equipment:")
for record in high_temp_equipment.take(3):
    print(f"  {record[0]} at {record[2]}: {record[4]}°C - Status: {record[6]}")


Equipment in normal temperature range (60-90°C): 3
Equipment in high temperature range (90-150°C): 2

High Temperature Equipment:


### Map with Transformer Generator

In [13]:
def create_field_transformer(field_index: int, transform_func: Callable) -> Callable:
    """
    Higher-order function that creates a transformer for specific field
    """
    def transformer(record: tuple) -> tuple:
        record_list = list(record)
        if field_index < len(record_list):
            record_list[field_index] = transform_func(record_list[field_index])
        return tuple(record_list)
    return transformer

# Create transformers for specific fields
equipment_type_transformer = create_field_transformer(
    1,
    lambda x: x.replace('-', '_').lower() if x else x
)

transformed_equipment_rdd = composed_rdd.map(equipment_type_transformer)

print("Equipment Type Transformation Examples:")
for record in transformed_equipment_rdd.take(3):
    print(f"Equipment ID: {record[0]}, Type: {record[1]}")


Equipment Type Transformation Examples:
Equipment ID: EQ001, Type: cnc_milling_001
Equipment ID: EQ002, Type: hydraulic_press_042
Equipment ID: EQ003, Type: robotic_arm_123


## Currying Functions

### Curried Alerts

In [14]:
def create_alert_checker(status_level: str) -> Callable:
    """
    Curried function: takes status level, returns function that checks location
    """
    def check_location(location: str) -> Callable:
        """
        Takes location, returns function that checks equipment record
        """
        def check_record(record: tuple) -> bool:
            """
            Checks if record matches status and location criteria
            """
            return (record[6] == status_level and
                    location.upper() in record[2].upper())
        return check_record
    return check_location

# Create curried alert checkers
warning_checker = create_alert_checker('WARNING')
jurong_warning_checker = warning_checker('JURONG')
tuas_warning_checker = warning_checker('TUAS')

# Apply filters
jurong_warnings = transformed_equipment_rdd.filter(jurong_warning_checker)
tuas_warnings = transformed_equipment_rdd.filter(tuas_warning_checker)

print(f"Warnings in Jurong area: {jurong_warnings.count()}")
print(f"Warnings in Tuas area: {tuas_warnings.count()}")

if jurong_warnings.count() > 0:
    print("\nJurong Warning Details:")
    for record in jurong_warnings.collect():
        print(f"  {record[0]} - {record[1]} - {record[7] if len(record) > 7 else 'N/A'}")




### Curried Temperature Analyser

In [15]:
def create_temp_analyzer(operation: str) -> Callable:
    """
    Curried function for temperature analysis
    operation: 'above', 'below', 'between'
    """
    def with_threshold(threshold: float) -> Callable:
        """
        Takes threshold value
        """
        def analyze(record: tuple) -> bool:
            """
            Analyzes temperature against threshold
            """
            temp = record[4]
            if temp is None:
                return False

            try:
                temp_float = float(temp)
                if operation == 'above':
                    return temp_float > threshold
                elif operation == 'below':
                    return temp_float < threshold
                return False
            except (ValueError, TypeError):
                return False
        return analyze
    return with_threshold

# Create curried temperature analyzers
above_analyzer = create_temp_analyzer('above')
above_85_checker = above_analyzer(85.0)

critical_temp_equipment = transformed_equipment_rdd.filter(above_85_checker)

print(f"Equipment with temperature above 85°C: {critical_temp_equipment.count()}")
print("\nCritical Temperature Equipment:")
for record in critical_temp_equipment.take(5):
    print(f"  {record[0]} at {record[2]}: {record[4]}°C")


Equipment with temperature above 85°C: 4

Critical Temperature Equipment:
  EQ001 at JURONG_IE: 85.5°C
  EQ002 at TUAS_SOUTH: 86.22°C
  EQ005 at AMK_IP2: 92.3°C
  EQ009 at CHANGI_BP: 95.0°C


### Curried Temeprature Scores

In [16]:
def create_quality_scorer(weight_temp: float) -> Callable:
    """
    Curried function that creates quality scorer with temperature weight
    """
    def with_completeness_weight(weight_complete: float) -> Callable:
        """
        Takes completeness weight, returns scoring function
        """
        def score_record(record: tuple) -> Tuple[str, float]:
            """
            Calculate data quality score for record
            """
            equipment_id = record[0]
            score = 0.0

            # Temperature validity score
            if record[4] is not None:
                score += weight_temp

            # Completeness score (all fields present)
            if all(field for field in record[:7]):
                score += weight_complete

            # Date validity score
            if record[3] is not None:
                score += (100 - weight_temp - weight_complete)

            return (equipment_id, score)
        return score_record
    return with_completeness_weight

# Create quality scorer with different weightings
quality_scorer = create_quality_scorer(40.0)  # 40% weight for temperature
with_complete_weight = quality_scorer(30.0)    # 30% weight for completeness
# Remaining 30% for date validity

quality_scores_rdd = transformed_equipment_rdd.map(with_complete_weight)

print("Data Quality Scores:")
for eq_id, score in quality_scores_rdd.sortBy(lambda x: x[1], ascending=False).take(5):
    print(f"  {eq_id}: {score:.1f}%")


Data Quality Scores:
  EQ001: 100.0%
  EQ002: 100.0%
  EQ005: 100.0%
  EQ006: 100.0%
  EQ009: 100.0%


## Grouping and Aggregates

In [17]:
location_equipment_rdd = transformed_equipment_rdd.map(
    lambda record: (record[2], record[0])  # (location, equipment_id)
)

grouped_by_location = location_equipment_rdd.groupByKey()

print("Equipment Count by Location:")
for location, equipment_ids in grouped_by_location.collect():
    eq_list = list(equipment_ids)
    print(f"  {location}: {len(eq_list)} equipment - {eq_list}")


def extract_location_temp(record: tuple) -> Optional[Tuple[str, Tuple[float, int]]]:
    """Extract location and temperature for aggregation"""
    location = record[2]
    temp = record[4]

    if temp is None:
        return None

    try:
        temp_float = float(temp)
        return (location, (temp_float, 1))
    except (ValueError, TypeError):
        return None

location_temp_rdd = transformed_equipment_rdd.map(extract_location_temp).filter(lambda x: x is not None)

# Reduce to calculate sum and count
temp_aggregates = location_temp_rdd.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

# Calculate averages
avg_temp_by_location = temp_aggregates.mapValues(
    lambda x: round(x[0] / x[1], 2)
)

print("Average Temperature by Location:")
for location, avg_temp in avg_temp_by_location.sortBy(lambda x: x[1], ascending=False).collect():
    print(f"  {location}: {avg_temp}°C")
valid_temps = transformed_equipment_rdd.map(
    lambda r: float(r[4]) if r[4] is not None else None
).filter(lambda x: x is not None)

if valid_temps.count() > 0:
    max_temp = valid_temps.reduce(lambda a, b: max(a, b))
    min_temp = valid_temps.reduce(lambda a, b: min(a, b))
    total_temp = valid_temps.reduce(lambda a, b: a + b)
    avg_temp = total_temp / valid_temps.count()

    print(f"Maximum Temperature: {max_temp}°C")
    print(f"Minimum Temperature: {min_temp}°C")
    print(f"Average Temperature: {avg_temp:.2f}°C")


Equipment Count by Location:
  TUAS_SOUTH: 2 equipment - ['EQ002', 'EQ007']
  WOODLANDS_IP: 1 equipment - ['EQ003']
  AMK_IP2: 1 equipment - ['EQ005']
  JURONG_IE: 2 equipment - ['EQ001', 'EQ006']
  CHANGI_BP: 2 equipment - ['EQ004', 'EQ009']
Average Temperature by Location:
  CHANGI_BP: 95.0°C
  AMK_IP2: 92.3°C
  TUAS_SOUTH: 86.22°C
  WOODLANDS_IP: 78.0°C
  JURONG_IE: 54.84°C
Maximum Temperature: 95.0°C
Minimum Temperature: 24.17°C
Average Temperature: 76.86°C


## Data Cleansing Pipeline

In [18]:
def complete_cleansing_pipeline(raw_data: str) -> Optional[dict]:
    """
    Complete data cleansing pipeline combining all techniques
    Returns cleansed record as dictionary or None if invalid
    """
    # Parse
    fields = raw_data.split('|')
    if len(fields) < 8:
        return None

    record = tuple(field.strip() for field in fields)

    # Validate
    if not record[0] or not record[1] or not record[2]:
        return None

    # Normalize date
    normalized_date = normalize_date(record[3])
    if normalized_date is None:
        return None

    # Normalize temperature
    temp = record[4]
    unit = record[5]
    normalized_temp = None

    if temp not in ('NULL', '-999', ''):
        try:
            temp_float = float(temp)
            if unit == 'FAHRENHEIT':
                temp_float = fahrenheit_to_celsius(temp_float)
            if 0 <= temp_float <= 150:
                normalized_temp = round(temp_float, 2)
        except ValueError:
            pass

    # Standardize location
    location = standardize_location(record[2])

    # Return cleansed record as dictionary
    return {
        'equipment_id': record[0],
        'equipment_type': record[1].replace('-', '_').lower(),
        'location': location,
        'timestamp': normalized_date,
        'temperature_celsius': normalized_temp,
        'status': record[6].upper(),
        'notes': record[7] if len(record) > 7 else '',
        'data_quality': 'HIGH' if normalized_temp is not None else 'MEDIUM'
    }

# Apply complete pipeline
cleansed_rdd = equipment_rdd.map(complete_cleansing_pipeline).filter(lambda x: x is not None)

print(f"\nTotal cleansed records: {cleansed_rdd.count()}")
print("\nCleansed Data Sample:")
print("-" * 80)

for record in cleansed_rdd.take(5):
    print(f"Equipment ID: {record['equipment_id']}")
    print(f"  Type: {record['equipment_type']}")
    print(f"  Location: {record['location']}")
    print(f"  Timestamp: {record['timestamp']}")
    print(f"  Temperature: {record['temperature_celsius']}°C")
    print(f"  Status: {record['status']}")
    print(f"  Data Quality: {record['data_quality']}")
    print()


Total cleansed records: 8

Cleansed Data Sample:
--------------------------------------------------------------------------------
Equipment ID: EQ001
  Type: cnc_milling_001
  Location: JURONG_IE
  Timestamp: 2024-12-25 14:30
  Temperature: 85.5°C
  Status: NORMAL
  Data Quality: HIGH

Equipment ID: EQ002
  Type: hydraulic_press_042
  Location: TUAS_SOUTH
  Timestamp: 2024-12-25 15:45
  Temperature: 86.22°C
  Status: ALERT
  Data Quality: HIGH

Equipment ID: EQ003
  Type: robotic_arm_123
  Location: WOODLANDS_IP
  Timestamp: 2024-12-25 16:00
  Temperature: 78.0°C
  Status: 
  Data Quality: HIGH

Equipment ID: EQ004
  Type: injection_mold_099
  Location: CHANGI_BP
  Timestamp: 2024-12-25 16:15
  Temperature: None°C
  Status: NORMAL
  Data Quality: MEDIUM

Equipment ID: EQ005
  Type: lathe_machine_067
  Location: AMK_IP2
  Timestamp: 2024-12-25 17:00
  Temperature: 92.3°C
  Data Quality: HIGH

