![Lancaster University](https://www.lancaster.ac.uk/media/lancaster-university/content-assets/images/fst/logos/SCC-Logo.svg)

# SCC.454: Large Scale Platforms for AI and Data Analysis
## Lab 3: Data Preprocessing with Apache Spark

**Duration:** 2 hours

**Learning Objectives:**
- Understand the importance of data preprocessing in big data pipelines
- Master techniques for handling missing values in Spark DataFrames
- Learn data type conversions and schema management
- Apply text preprocessing techniques at scale
- Use regular expressions (regex) for pattern matching and data extraction
- Build complete data cleaning pipelines in PySpark

## Table of Contents

1. **Part 1: Introduction to Data Preprocessing** (15 minutes)
   - Why Data Preprocessing Matters
   - Setting up the Environment
   - Overview of Spark's Data Cleaning Capabilities

2. **Part 2: Handling Missing Values** (30 minutes)
   - Identifying Missing Data
   - Strategies for Missing Value Treatment
   - Dropping Null Values
   - Filling Missing Values (Imputation)
   - Advanced Imputation Techniques
   - Practical Exercise: Customer Data Cleaning

3. **Part 3: Data Type Handling and Schema Management** (20 minutes)
   - Understanding Spark Data Types
   - Type Casting and Conversion
   - Handling Date and Timestamp Fields
   - Schema Validation and Enforcement

4. **Part 4: Text Preprocessing** (25 minutes)
   - String Functions in Spark
   - Case Normalization and Trimming
   - Tokenization and Text Splitting
   - Stop Word Removal
   - Text Cleaning Pipeline
   - Practical Exercise: Product Review Cleaning

5. **Part 5: Regular Expressions in Spark** (30 minutes)
   - Introduction to Regex in PySpark
   - Pattern Matching with rlike
   - Extracting Data with regexp_extract
   - Replacing Patterns with regexp_replace
   - Complex Pattern Matching
   - Practical Exercise: Log File Parsing

6. **Part 6: Building Complete Preprocessing Pipelines** (15 minutes)
   - Combining Multiple Preprocessing Steps
   - Creating Reusable Cleaning Functions
   - Data Quality Reporting
   - Final Challenge: End-to-End Data Cleaning

---
# Part 1: Introduction to Data Preprocessing
---

## 1.1 Why Data Preprocessing Matters

Data preprocessing is the foundation of any successful data analysis or machine learning project. In the context of big data, preprocessing becomes even more critical due to:

**The Scale Challenge:**
- Real-world datasets often contain millions or billions of records
- Traditional preprocessing tools (like pandas) may not scale effectively
- Distributed processing is essential for handling large volumes

**Common Data Quality Issues:**
- Missing values (null, NaN, empty strings)
- Inconsistent formatting (dates, currencies, units)
- Duplicate records
- Invalid or out-of-range values
- Inconsistent text data (case, whitespace, special characters)
- Unstructured or semi-structured data requiring parsing

**The Garbage In, Garbage Out Principle:**
```
Raw Data (Messy) --> Preprocessing Pipeline --> Clean Data (Analysis-Ready)
```

**Apache Spark's Advantages for Preprocessing:**
- Distributed processing across clusters
- Lazy evaluation for optimization
- Rich built-in functions for data manipulation
- SQL interface for familiar syntax
- Seamless integration with ML pipelines

## 1.2 Setting up the Environment

Let's set up PySpark in Google Colab. If you're running this on a different system, ensure Java is installed and JAVA_HOME is configured correctly.

In [None]:
# Install PySpark
!pip install pyspark==3.5.0 -q

# Install Java (Spark requires Java)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Set Java environment variable
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

print("PySpark and Java installed successfully!")

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

# Create a SparkSession configured for data preprocessing
spark = SparkSession.builder \
    .appName("SCC454-DataPreprocessing") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

# Get the underlying SparkContext
sc = spark.sparkContext

print(f"Spark Version: {spark.version}")
print(f"Application Name: {spark.sparkContext.appName}")
print(f"Master: {spark.sparkContext.master}")
print("\nSpark Session ready for data preprocessing!")

## 1.3 Overview of Spark's Data Cleaning Capabilities

Spark provides several modules and functions for data preprocessing:

**Key Modules:**
- `pyspark.sql.functions`: Built-in functions for transformations
- `pyspark.sql.types`: Data type definitions
- `pyspark.ml.feature`: Feature engineering tools

**Important Function Categories:**

| Category | Functions |
|----------|----------|
| Null Handling | `isNull()`, `isNotNull()`, `na.drop()`, `na.fill()` |
| String Functions | `lower()`, `upper()`, `trim()`, `split()`, `concat()` |
| Regex Functions | `regexp_extract()`, `regexp_replace()`, `rlike()` |
| Type Conversion | `cast()`, `to_date()`, `to_timestamp()` |
| Aggregation | `count()`, `avg()`, `sum()`, `mean()` |
| Conditional | `when()`, `otherwise()`, `coalesce()` |

In [None]:
# Import commonly used functions for data preprocessing
from pyspark.sql.functions import (
    col, lit, isnan, isnull, coalesce,
    lower, upper, trim, ltrim, rtrim, length,
    split, concat, concat_ws, substring, replace,
    regexp_extract, regexp_replace,
    to_date, to_timestamp,
    count, avg, sum as spark_sum, mean, stddev,
    min as spark_min, max as spark_max,
    when, expr,
    year, month, dayofmonth, datediff, date_format,
    monotonically_increasing_id, round as spark_round
)

from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType,
    FloatType, DoubleType, DateType, TimestampType, BooleanType
)

print("All preprocessing functions imported successfully!")

---
# Part 2: Handling Missing Values
---

## 2.1 Identifying Missing Data

Missing data is one of the most common data quality issues. In Spark, missing values can appear as:
- `null`: The absence of a value
- `NaN` (Not a Number): Result of undefined numerical operations
- Empty strings: For string columns
- Placeholder values (e.g., -999, "N/A", "Unknown")

Let's create a sample dataset with various types of missing values.

In [None]:
# Create a DataFrame with various missing value patterns
customer_data = [
    (1, "John Smith", 28, "john@email.com", 75000.0, "New York", "2023-01-15"),
    (2, "Jane Doe", None, "jane@email.com", 82000.0, "Los Angeles", "2023-02-20"),
    (3, "Bob Wilson", 35, None, 65000.0, "Chicago", "2023-03-10"),
    (4, "Alice Brown", 42, "alice@email.com", None, "Houston", None),
    (5, None, 31, "unknown@email.com", 70000.0, "Phoenix", "2023-05-01"),
    (6, "Charlie Lee", 29, "", 58000.0, "", "2023-06-15"),
    (7, "Diana Prince", None, "diana@email.com", 95000.0, None, "2023-07-20"),
    (8, "Edward Kim", 38, "N/A", float('nan'), "Boston", "2023-08-25"),
    (9, "Fiona Garcia", 45, "fiona@email.com", 88000.0, "Seattle", "2023-09-30"),
    (10, "George Hall", None, None, None, None, None),
    (11, "Hannah White", 33, "hannah@email.com", 72000.0, "Denver", "2023-11-10"),
    (12, "Ivan Torres", 27, "ivan@email.com", 63000.0, "Miami", "2023-12-05")
]

columns = ["customer_id", "name", "age", "email", "salary", "city", "signup_date"]
df_customers = spark.createDataFrame(customer_data, columns)

print("Customer Data with Missing Values:")
df_customers.show(truncate=False)

In [None]:
# Check the schema
print("Schema:")
df_customers.printSchema()

### Counting Missing Values

The first step in handling missing data is to understand its extent.

In [None]:
# Method 1: Count nulls per column
print("Null counts per column:")
null_counts = df_customers.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_customers.columns
])
null_counts.show()

In [None]:
# Method 2: Count nulls AND NaN values (important for numeric columns)
print("Null + NaN counts for numeric columns:")
df_customers.select([
    count(when(col("salary").isNull() | isnan(col("salary")), 1)).alias("salary_missing")
]).show()

In [None]:
# Method 3: Comprehensive missing value report
def missing_value_report(df):
    total_rows = df.count()
    report_data = []
    for col_name in df.columns:
        col_type = str(df.schema[col_name].dataType)
        null_count = df.filter(col(col_name).isNull()).count()
        empty_count = 0
        if "String" in col_type:
            empty_count = df.filter(
                (col(col_name) == "") | (col(col_name) == "N/A") | (col(col_name) == "Unknown")
            ).count()
        nan_count = 0
        if "Double" in col_type or "Float" in col_type:
            nan_count = df.filter(isnan(col(col_name))).count()
        total_missing = null_count + empty_count + nan_count
        missing_pct = (total_missing / total_rows) * 100
        report_data.append((col_name, col_type, null_count, empty_count, nan_count, total_missing, round(missing_pct, 2)))
    report_columns = ["Column", "Type", "Nulls", "Empty", "NaN", "Total_Missing", "Missing_%"]
    return spark.createDataFrame(report_data, report_columns)

print("="*60)
print("COMPREHENSIVE MISSING VALUE REPORT")
print("="*60)
report = missing_value_report(df_customers)
report.show(truncate=False)

## 2.2 Strategies for Missing Value Treatment

There are several strategies for handling missing values:

1. **Deletion**: Remove rows or columns with missing values
2. **Imputation**: Fill missing values with estimated values (mean, median, mode)
3. **Flagging**: Create indicator variables for missingness
4. **Keep as-is**: Leave missing values for algorithms that can handle them

## 2.3 Dropping Null Values

Spark's `na.drop()` method provides flexible options for removing rows with missing values.

In [None]:
# Original count
print(f"Original row count: {df_customers.count()}")

# Drop rows with ANY null value (most aggressive)
df_drop_any = df_customers.na.drop(how="any")
print(f"After dropping rows with ANY null: {df_drop_any.count()}")
df_drop_any.show()

In [None]:
# Drop rows where ALL values are null (least aggressive)
df_drop_all = df_customers.na.drop(how="all")
print(f"After dropping rows where ALL are null: {df_drop_all.count()}")

In [None]:
# Drop rows where specific columns are null (most practical)
df_drop_subset = df_customers.na.drop(subset=["name", "email"])
print(f"After dropping rows where name OR email is null: {df_drop_subset.count()}")
df_drop_subset.show()

In [None]:
# Drop rows with fewer than a threshold of non-null values
df_drop_thresh = df_customers.na.drop(thresh=5)
print(f"After dropping rows with < 5 non-null values: {df_drop_thresh.count()}")
df_drop_thresh.show()

## 2.4 Filling Missing Values (Imputation)

Spark's `na.fill()` method allows filling missing values with specified values.

In [None]:
# Fill all numeric columns with a single value
print("Fill all numeric nulls with 0:")
df_customers.na.fill(0).show()

In [None]:
# Fill specific columns with specific values (dictionary approach)
fill_values = {
    "name": "Unknown",
    "age": 30,
    "email": "no_email@placeholder.com",
    "salary": 0.0,
    "city": "Unknown",
    "signup_date": "1970-01-01"
}

print("Fill with specific values per column:")
df_filled = df_customers.na.fill(fill_values)
df_filled.show(truncate=False)

### Statistical Imputation

For numerical columns, it's often better to use statistical measures like mean or median.

In [None]:
# Calculate mean for age column (excluding nulls)
age_stats = df_customers.select(
    mean(col("age")).alias("mean_age"),
    expr("percentile_approx(age, 0.5)").alias("median_age")
).collect()[0]

mean_age = age_stats["mean_age"]
median_age = age_stats["median_age"]

print(f"Mean age: {mean_age:.2f}")
print(f"Median age: {median_age}")

# Fill with mean
df_mean_imputed = df_customers.na.fill({"age": int(mean_age)})
print("\nAge filled with mean:")
df_mean_imputed.select("customer_id", "name", "age").show()

In [None]:
# Calculate and impute with median for salary
salary_stats = df_customers.filter(
    col("salary").isNotNull() & ~isnan(col("salary"))
).select(
    mean(col("salary")).alias("mean_salary"),
    expr("percentile_approx(salary, 0.5)").alias("median_salary")
).collect()[0]

print(f"Mean salary: ${salary_stats['mean_salary']:,.2f}")
print(f"Median salary: ${salary_stats['median_salary']:,.2f}")

## 2.5 Advanced Imputation Techniques

In [None]:
# Group-wise imputation: Fill missing values based on group statistics
from pyspark.sql.window import Window

# Calculate average salary per city
city_avg_salary = df_customers.filter(
    col("salary").isNotNull() & ~isnan(col("salary")) & col("city").isNotNull()
).groupBy("city").agg(
    spark_round(mean("salary"), 2).alias("city_avg_salary")
)

print("Average salary per city:")
city_avg_salary.show()

In [None]:
# Using coalesce for conditional filling
overall_avg_salary = salary_stats['mean_salary']

df_smart_imputed = df_customers.join(
    city_avg_salary, on="city", how="left"
).withColumn(
    "salary_imputed",
    coalesce(
        when(~isnan(col("salary")), col("salary")),
        col("city_avg_salary"),
        lit(overall_avg_salary)
    )
).select(
    "customer_id", "name", "city", "salary", "city_avg_salary", "salary_imputed"
)

print("Smart imputation using city averages:")
df_smart_imputed.show()

In [None]:
# Forward Fill using Window functions (useful for time-series)
from pyspark.sql.functions import last, first

ts_data = [
    ("2024-01-01", 100.0),
    ("2024-01-02", None),
    ("2024-01-03", None),
    ("2024-01-04", 105.0),
    ("2024-01-05", None),
    ("2024-01-06", 108.0),
    ("2024-01-07", None),
]

df_ts = spark.createDataFrame(ts_data, ["date", "value"])
print("Time series with missing values:")
df_ts.show()

# Forward fill
window_forward = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, 0)

df_forward_fill = df_ts.withColumn(
    "value_ffill",
    last(col("value"), ignorenulls=True).over(window_forward)
)

print("After forward fill:")
df_forward_fill.show()

In [None]:
# Creating missing value indicator columns
df_with_indicators = df_customers.withColumn(
    "age_was_missing",
    when(col("age").isNull(), 1).otherwise(0)
).withColumn(
    "salary_was_missing",
    when(col("salary").isNull() | isnan(col("salary")), 1).otherwise(0)
)

print("Data with missing value indicators:")
df_with_indicators.select(
    "customer_id", "name", "age", "age_was_missing", "salary", "salary_was_missing"
).show()

## 2.6 Practical Exercise: Customer Data Cleaning

Now it's your turn! Clean the customer dataset by applying appropriate imputation strategies.

**Tasks:**
1. Handle missing names (fill with "Customer_[ID]")
2. Impute missing ages with the median age
3. Replace empty/placeholder emails with a standardized format
4. Impute missing salaries with the median salary
5. Fill missing cities with "Unknown"
6. Handle missing signup dates appropriately
7. Remove the NaN value from salary column
8. Create a data quality score

In [None]:
# Your code here



---
# Part 3: Data Type Handling and Schema Management
---

## 3.1 Understanding Spark Data Types

Proper data types are essential for efficient processing.

**Numeric Types:** ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType

**String and Binary:** StringType, BinaryType

**Date/Time:** DateType, TimestampType

**Complex Types:** ArrayType, MapType, StructType

In [None]:
# Create a DataFrame with mixed type issues
messy_data = [
    ("1", "25", "50000.50", "2024-01-15", "true"),
    ("2", "thirty", "60000", "15/02/2024", "yes"),
    ("3", "35", "$75,000", "2024-03-20", "1"),
    ("4", "40", "80000.00", "March 25, 2024", "false"),
    ("5", "N/A", "invalid", "2024-04-30", "no"),
]

df_messy = spark.createDataFrame(messy_data, ["id", "age", "salary", "date", "active"])

print("Messy data with type issues:")
df_messy.show()
df_messy.printSchema()

## 3.2 Type Casting and Conversion

In [None]:
# Simple casting
df_cast = df_messy.withColumn("id_int", col("id").cast(IntegerType()))
print("ID cast to integer:")
df_cast.select("id", "id_int").show()

In [None]:
# Casting with potential failures - non-numeric strings become null
df_age_cast = df_messy.withColumn("age_int", col("age").cast(IntegerType()))
print("Age cast to integer (note nulls for invalid values):")
df_age_cast.select("age", "age_int").show()

In [None]:
# Clean salary string before casting
df_salary_clean = df_messy.withColumn(
    "salary_cleaned",
    regexp_replace(col("salary"), "[$,]", "")
).withColumn(
    "salary_double",
    col("salary_cleaned").cast(DoubleType())
)

print("Salary cleaning and casting:")
df_salary_clean.select("salary", "salary_cleaned", "salary_double").show()

In [None]:
# Boolean conversion with multiple representations
df_bool = df_messy.withColumn(
    "active_bool",
    when(lower(col("active")).isin("true", "yes", "1", "t", "y"), True)
    .when(lower(col("active")).isin("false", "no", "0", "f", "n"), False)
    .otherwise(None)
)

print("Boolean conversion:")
df_bool.select("active", "active_bool").show()

## 3.3 Handling Date and Timestamp Fields

In [None]:
# Common date formats
date_examples = [
    ("2024-01-15", "ISO format"),
    ("15/02/2024", "European DD/MM/YYYY"),
    ("03/20/2024", "American MM/DD/YYYY"),
    ("March 25, 2024", "Written format"),
    ("2024.04.30", "Dot separator"),
]

df_dates = spark.createDataFrame(date_examples, ["date_string", "format_name"])
df_dates.show(truncate=False)

In [None]:
# Parse dates with multiple format patterns using coalesce
df_parsed = df_dates.withColumn(
    "parsed_date",
    coalesce(
        to_date(col("date_string"), "yyyy-MM-dd"),
        to_date(col("date_string"), "dd/MM/yyyy"),
        to_date(col("date_string"), "MM/dd/yyyy"),
        to_date(col("date_string"), "MMMM dd, yyyy"),
        to_date(col("date_string"), "yyyy.MM.dd")
    )
)

print("Dates parsed from multiple formats:")
df_parsed.show(truncate=False)

In [None]:
# Extract components from dates
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, quarter, date_format

df_date_parts = df_parsed.filter(col("parsed_date").isNotNull()).select(
    col("date_string"),
    col("parsed_date"),
    year(col("parsed_date")).alias("year"),
    month(col("parsed_date")).alias("month"),
    dayofmonth(col("parsed_date")).alias("day"),
    quarter(col("parsed_date")).alias("quarter"),
    date_format(col("parsed_date"), "EEEE").alias("day_name")
)

print("Date components extracted:")
df_date_parts.show(truncate=False)

## 3.4 Schema Validation and Enforcement

In [None]:
# Define an expected schema
expected_schema = StructType([
    StructField("customer_id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True),
    StructField("email", StringType(), nullable=True),
    StructField("salary", DoubleType(), nullable=True),
    StructField("city", StringType(), nullable=True),
    StructField("signup_date", DateType(), nullable=True)
])

print("Expected Schema:")
for field in expected_schema.fields:
    print(f"  {field.name}: {field.dataType} (nullable={field.nullable})")

In [None]:
# Function to validate and transform data to match expected schema
def enforce_schema(df, target_schema):
    result_df = df
    for field in target_schema.fields:
        col_name = field.name
        col_type = field.dataType
        if col_name in df.columns:
            result_df = result_df.withColumn(col_name, col(col_name).cast(col_type))
        else:
            result_df = result_df.withColumn(col_name, lit(None).cast(col_type))
    return result_df.select([field.name for field in target_schema.fields])

df_enforced = enforce_schema(df_customers, expected_schema)
print("Schema-enforced DataFrame:")
df_enforced.printSchema()
df_enforced.show(5)

---
# Part 4: Text Preprocessing
---

## 4.1 String Functions in Spark

Text data often requires extensive preprocessing before analysis.

In [None]:
# Create sample text data
text_data = [
    (1, "   Hello World   ", "john.doe@email.com"),
    (2, "APACHE SPARK", "JANE.DOE@EMAIL.COM"),
    (3, "data science", "bob_wilson@company.co.uk"),
    (4, "  Machine Learning  ", "alice-brown@domain.org"),
    (5, "NLP is Great!!!", "charlie.lee123@test.com"),
]

df_text = spark.createDataFrame(text_data, ["id", "text", "email"])
print("Sample text data:")
df_text.show(truncate=False)

## 4.2 Case Normalization and Trimming

In [None]:
# Case conversion
df_case = df_text.select(
    col("text"),
    lower(col("text")).alias("lowercase"),
    upper(col("text")).alias("uppercase"),
    expr("initcap(text)").alias("titlecase")
)

print("Case conversion:")
df_case.show(truncate=False)

In [None]:
# Trimming whitespace
df_trim = df_text.select(
    col("text"),
    trim(col("text")).alias("trimmed"),
    ltrim(col("text")).alias("left_trimmed"),
    rtrim(col("text")).alias("right_trimmed"),
    length(col("text")).alias("original_len"),
    length(trim(col("text"))).alias("trimmed_len")
)

print("Whitespace trimming:")
df_trim.show(truncate=False)

## 4.3 Tokenization and Text Splitting

In [None]:
# Create sample sentences
sentences_data = [
    (1, "Apache Spark is a unified analytics engine for big data processing"),
    (2, "Natural language processing enables computers to understand text"),
    (3, "Machine learning models require clean preprocessed data"),
    (4, "Text mining extracts valuable insights from unstructured data"),
]

df_sentences = spark.createDataFrame(sentences_data, ["id", "sentence"])
df_sentences.show(truncate=False)

In [None]:
# Basic tokenization using split
from pyspark.sql.functions import split, size, explode

df_tokens = df_sentences.withColumn(
    "words",
    split(lower(col("sentence")), " ")
).withColumn(
    "word_count",
    size(col("words"))
)

print("Tokenized sentences:")
df_tokens.show(truncate=False)

In [None]:
# Explode tokens and calculate word frequency
df_exploded = df_tokens.select(
    col("id"),
    explode(col("words")).alias("word")
)

word_freq = df_exploded.groupBy("word").agg(
    count("*").alias("frequency")
).orderBy(col("frequency").desc())

print("Word frequencies:")
word_freq.show(15)

## 4.4 Stop Word Removal

In [None]:
# Define common English stop words
stop_words = [
    "a", "an", "the", "and", "or", "but", "is", "are", "was", "were",
    "to", "of", "in", "for", "on", "with", "at", "by", "from",
    "it", "its", "this", "that", "these", "those",
    "be", "been", "being", "have", "has", "had"
]

from pyspark.sql.functions import array_except, array

stop_words_array = array([lit(w) for w in stop_words])

df_no_stopwords = df_tokens.withColumn(
    "words_filtered",
    array_except(col("words"), stop_words_array)
)

print("Words with stop words removed:")
df_no_stopwords.select("sentence", "words", "words_filtered").show(truncate=False)

In [None]:
# Using Spark ML's StopWordsRemover
from pyspark.ml.feature import StopWordsRemover, Tokenizer

tokenizer = Tokenizer(inputCol="sentence", outputCol="words_ml")
df_tokenized_ml = tokenizer.transform(df_sentences)

remover = StopWordsRemover(inputCol="words_ml", outputCol="words_cleaned")
df_cleaned_ml = remover.transform(df_tokenized_ml)

print("Using Spark ML StopWordsRemover:")
df_cleaned_ml.select("sentence", "words_ml", "words_cleaned").show(truncate=False)

## 4.5 Text Cleaning Pipeline

In [None]:
# Sample product reviews (messy text data)
reviews_data = [
    (1, "  GREAT product!!!  Would buy again.   ", 5),
    (2, "Terrible... don't waste your money ", 1),
    (3, "Good quality, fast shipping!!!", 4),
    (4, "   not bad, could be better...   ", 3),
    (5, "AMAZING!!!!! Best purchase EVER!!!!!", 5),
    (6, "meh, it's OK I guess", 3),
    (7, "Product arrived damaged", 1),
    (8, "10/10 would recommend to friends!", 5),
]

df_reviews = spark.createDataFrame(reviews_data, ["id", "review", "rating"])
print("Raw reviews:")
df_reviews.show(truncate=False)

In [None]:
def clean_text_pipeline(df, text_col, output_col="text_cleaned"):
    result = df \
        .withColumn("_step1_trim", trim(col(text_col))) \
        .withColumn("_step2_lower", lower(col("_step1_trim"))) \
        .withColumn("_step3_alphanum",
                    regexp_replace(col("_step2_lower"), "[^a-z0-9\\s]", "")) \
        .withColumn("_step4_spaces",
                    regexp_replace(col("_step3_alphanum"), "\\s+", " ")) \
        .withColumn(output_col, trim(col("_step4_spaces"))) \
        .drop("_step1_trim", "_step2_lower", "_step3_alphanum", "_step4_spaces")
    return result

df_reviews_cleaned = clean_text_pipeline(df_reviews, "review", "review_cleaned")

print("Cleaned reviews:")
df_reviews_cleaned.select("review", "review_cleaned").show(truncate=False)

## 4.6 Practical Exercise: Product Review Cleaning

**Tasks:**
1. Clean the review text (remove special chars, normalize case, trim)
2. Extract the word count from each cleaned review
3. Identify reviews that mention negative keywords (return, refund, broken)
4. Calculate the average rating for reviews with negative keywords vs others

In [None]:
# Extended reviews dataset
extended_reviews = [
    (1, "LOVE IT! Fast shipping, great quality. Will order more!", 5),
    (2, "Product broke after 1 week. Requesting refund immediately.", 1),
    (3, "Decent product for the price. Nothing special but works.", 3),
    (4, "DO NOT BUY! Cheap quality, had to return it!", 1),
    (5, "Excellent! Better than expected. 100% recommend.", 5),
    (6, "Item was damaged during shipping. Broken on arrival.", 1),
    (7, "Good value for money. Satisfied with purchase.", 4),
    (8, "WORST PURCHASE EVER! Want my money back!!!", 1),
    (9, "Nice product, works as described. Happy customer.", 4),
    (10, "Meh, it's okay. Nothing to complain about.", 3),
]

df_extended = spark.createDataFrame(extended_reviews, ["id", "review", "rating"])
df_extended.show(truncate=False)

In [None]:
# Your code here



---
# Part 5: Regular Expressions in Spark
---

## 5.1 Introduction to Regex in PySpark

**Key Functions:**
- `rlike(pattern)` - Returns true if string matches pattern
- `regexp_extract(col, pattern, idx)` - Extracts groups matching the pattern
- `regexp_replace(col, pattern, replacement)` - Replaces matches with new text

**Common Regex Patterns:**
| Pattern | Meaning |
|---------|----------|
| `\d` | Any digit (0-9) |
| `\w` | Any word character |
| `\s` | Any whitespace |
| `.` | Any character |
| `*` | Zero or more |
| `+` | One or more |
| `[]` | Character class |
| `()` | Capture group |

In [None]:
# Sample data for regex operations
contact_data = [
    (1, "John Smith", "john.smith@email.com", "(555) 123-4567", "123 Main St, New York, NY 10001"),
    (2, "Jane Doe", "jane_doe@company.co.uk", "555-987-6543", "456 Oak Ave, Los Angeles, CA 90001"),
    (3, "Bob Wilson", "bob123@test.org", "5551234567", "789 Pine Rd, Chicago, IL 60601"),
    (4, "Alice Brown", "alice.brown@domain.net", "(555)456-7890", "321 Elm Blvd, Houston, TX 77001"),
    (5, "Charlie Lee", "invalid-email", "phone: 555-111-2222", "PO Box 100, Phoenix, AZ 85001"),
]

df_contacts = spark.createDataFrame(
    contact_data,
    ["id", "name", "email", "phone", "address"]
)

print("Contact data:")
df_contacts.show(truncate=False)

## 5.2 Pattern Matching with rlike

In [None]:
# Check if email is valid format
email_pattern = r"^[\w.+-]+@[\w-]+\.[a-zA-Z]{2,}$"

df_email_check = df_contacts.withColumn(
    "is_valid_email",
    col("email").rlike(email_pattern)
)

print("Email validation:")
df_email_check.select("name", "email", "is_valid_email").show(truncate=False)

In [None]:
# Filter for specific domain emails
df_company_emails = df_contacts.filter(
    col("email").rlike(r"@company\.")
)

print("Company domain emails:")
df_company_emails.select("name", "email").show()

## 5.3 Extracting Data with regexp_extract

In [None]:
# Extract email components
df_email_parts = df_contacts.withColumn(
    "email_username",
    regexp_extract(col("email"), r"^([\w.+-]+)@", 1)
).withColumn(
    "email_domain",
    regexp_extract(col("email"), r"@([\w-]+)\.", 1)
).withColumn(
    "email_tld",
    regexp_extract(col("email"), r"\.([a-zA-Z.]+)$", 1)
)

print("Extracted email components:")
df_email_parts.select("email", "email_username", "email_domain", "email_tld").show(truncate=False)

In [None]:
# Extract and normalize phone numbers
df_phone_extract = df_contacts.withColumn(
    "phone_digits",
    regexp_replace(col("phone"), r"[^\d]", "")
).withColumn(
    "area_code",
    regexp_extract(regexp_replace(col("phone"), r"[^\d]", ""), r"^(\d{3})", 1)
).withColumn(
    "phone_normalized",
    concat(
        lit("("),
        substring(regexp_replace(col("phone"), r"[^\d]", ""), 1, 3),
        lit(") "),
        substring(regexp_replace(col("phone"), r"[^\d]", ""), 4, 3),
        lit("-"),
        substring(regexp_replace(col("phone"), r"[^\d]", ""), 7, 4)
    )
)

print("Phone number extraction and normalization:")
df_phone_extract.select("phone", "phone_digits", "area_code", "phone_normalized").show(truncate=False)

In [None]:
# Extract address components
df_address_parts = df_contacts.withColumn(
    "street",
    regexp_extract(col("address"), r"^(.+),", 1)
).withColumn(
    "city",
    regexp_extract(col("address"), r", ([^,]+), [A-Z]{2}", 1)
).withColumn(
    "state",
    regexp_extract(col("address"), r", ([A-Z]{2}) \d", 1)
).withColumn(
    "zip_code",
    regexp_extract(col("address"), r"(\d{5})$", 1)
)

print("Extracted address components:")
df_address_parts.select("address", "street", "city", "state", "zip_code").show(truncate=False)

## 5.4 Replacing Patterns with regexp_replace

In [None]:
# Mask sensitive data
df_masked = df_contacts.withColumn(
    "email_masked",
    regexp_replace(
        col("email"),
        r"^(.{2}).*(@.*)$",
        r"$1***$2"
    )
).withColumn(
    "phone_masked",
    regexp_replace(
        regexp_replace(col("phone"), r"[^\d]", ""),
        r"^(\d{3})(\d{3})(\d{4})$",
        r"(***) ***-$3"
    )
)

print("Masked sensitive data:")
df_masked.select("name", "email", "email_masked", "phone", "phone_masked").show(truncate=False)

In [None]:
# Clean and standardize text
messy_text_data = [
    (1, "Hello    World!!!"),
    (2, "Multiple   spaces   here"),
    (3, "Lots of!!!!! punctuation???"),
    (4, "   Leading and trailing   "),
    (5, "MixED CaSe TeXt"),
]

df_messy_text = spark.createDataFrame(messy_text_data, ["id", "text"])

df_standardized = df_messy_text.withColumn(
    "text_clean",
    trim(
        regexp_replace(
            regexp_replace(
                lower(col("text")),
                r"([!?.]){2,}",
                r"$1"
            ),
            r"\s+",
            " "
        )
    )
)

print("Text standardization:")
df_standardized.show(truncate=False)

## 5.5 Practical Exercise: Log File Parsing

Parse web server access logs using regex to extract structured information.

**Tasks:**
1. Extract IP address, timestamp, HTTP method, URL path, status code, and response size
2. Filter for error status codes (4xx and 5xx)
3. Count requests by HTTP method
4. Identify the most accessed URLs

In [None]:
# Sample Apache access logs
log_data = [
    (1, '192.168.1.100 - - [15/Jan/2024:10:30:45 +0000] "GET /index.html HTTP/1.1" 200 1234'),
    (2, '10.0.0.50 - - [15/Jan/2024:10:31:00 +0000] "POST /api/users HTTP/1.1" 201 567'),
    (3, '192.168.1.101 - - [15/Jan/2024:10:31:15 +0000] "GET /about.html HTTP/1.1" 200 890'),
    (4, '172.16.0.25 - - [15/Jan/2024:10:31:30 +0000] "GET /missing.html HTTP/1.1" 404 234'),
    (5, '192.168.1.100 - - [15/Jan/2024:10:31:45 +0000] "GET /api/data HTTP/1.1" 500 100'),
    (6, '10.0.0.75 - - [15/Jan/2024:10:32:00 +0000] "PUT /api/users/1 HTTP/1.1" 200 456'),
    (7, '192.168.1.102 - - [15/Jan/2024:10:32:15 +0000] "DELETE /api/users/2 HTTP/1.1" 204 0'),
    (8, '172.16.0.30 - - [15/Jan/2024:10:32:30 +0000] "GET /index.html HTTP/1.1" 200 1234'),
    (9, '10.0.0.50 - - [15/Jan/2024:10:32:45 +0000] "POST /api/login HTTP/1.1" 401 89'),
    (10, '192.168.1.100 - - [15/Jan/2024:10:33:00 +0000] "GET /api/products HTTP/1.1" 200 5678'),
]

df_logs = spark.createDataFrame(log_data, ["id", "log_line"])
print("Sample access logs:")
df_logs.show(truncate=False)

In [None]:
# Your code here: Parse the log files



---
# Part 6: Building Complete Preprocessing Pipelines
---

## 6.1 Combining Multiple Preprocessing Steps

In [None]:
# Real-world messy dataset
raw_data = [
    ("1", "  JOHN SMITH  ", "john.smith@EMAIL.com", "(555) 123-4567", "25", "$50,000.00", "2024-01-15"),
    ("2", "jane doe", "JANE_DOE@COMPANY.CO.UK", "555-987-6543", "N/A", "60000", "15/02/2024"),
    ("3", "Bob Wilson", "invalid-email", "5551234567", "35", "75,000", "March 20, 2024"),
    ("4", None, "alice@domain.net", "(555)456-7890", "28", None, "2024-04-10"),
    ("5", "Charlie Lee", "", "phone: 555-111-2222", "forty", "$80,000", None),
]

df_raw = spark.createDataFrame(
    raw_data,
    ["id", "name", "email", "phone", "age", "salary", "hire_date"]
)

print("Raw messy data:")
df_raw.show(truncate=False)

In [None]:
def preprocess_employee_data(df):
    # Step 1: Clean and standardize name
    df = df.withColumn(
        "name_clean",
        when(col("name").isNull() | (trim(col("name")) == ""),
             concat(lit("Employee_"), col("id")))
        .otherwise(expr("initcap(trim(name))"))
    )
    # Step 2: Clean and validate email
    email_pattern = r"^[\w.+-]+@[\w-]+\.[a-zA-Z]{2,}$"
    df = df.withColumn(
        "email_clean",
        when(lower(trim(col("email"))).rlike(email_pattern),
             lower(trim(col("email"))))
        .otherwise(None)
    )
    # Step 3: Normalize phone number
    df = df.withColumn(
        "phone_digits",
        regexp_replace(col("phone"), r"[^\d]", "")
    ).withColumn(
        "phone_clean",
        when(length(col("phone_digits")) == 10,
             concat(
                 lit("("), substring(col("phone_digits"), 1, 3), lit(") "),
                 substring(col("phone_digits"), 4, 3), lit("-"),
                 substring(col("phone_digits"), 7, 4)
             ))
        .otherwise(None)
    )
    # Step 4: Convert age to integer
    df = df.withColumn("age_clean", col("age").cast(IntegerType()))
    # Step 5: Clean and convert salary
    df = df.withColumn(
        "salary_clean",
        regexp_replace(col("salary"), r"[$,]", "").cast(DoubleType())
    )
    # Step 6: Parse hire date
    df = df.withColumn(
        "hire_date_clean",
        coalesce(
            to_date(col("hire_date"), "yyyy-MM-dd"),
            to_date(col("hire_date"), "dd/MM/yyyy"),
            to_date(col("hire_date"), "MMMM dd, yyyy")
        )
    )
    return df.select(
        col("id").cast(IntegerType()).alias("id"),
        col("name_clean").alias("name"),
        col("email_clean").alias("email"),
        col("phone_clean").alias("phone"),
        col("age_clean").alias("age"),
        col("salary_clean").alias("salary"),
        col("hire_date_clean").alias("hire_date")
    )

df_clean = preprocess_employee_data(df_raw)

print("Cleaned data:")
df_clean.show(truncate=False)
df_clean.printSchema()

## 6.2 Data Quality Reporting

In [None]:
def generate_quality_report(df_original, df_cleaned):
    total_rows = df_original.count()
    print("="*60)
    print("DATA QUALITY REPORT")
    print("="*60)
    print(f"\nTotal rows: {total_rows}")
    print("\n--- Missing Value Summary (Cleaned Data) ---")
    for col_name in df_cleaned.columns:
        null_count = df_cleaned.filter(col(col_name).isNull()).count()
        null_pct = (null_count / total_rows) * 100
        status = "OK" if null_pct < 10 else "WARNING" if null_pct < 30 else "CRITICAL"
        print(f"  {col_name}: {null_count} nulls ({null_pct:.1f}%) [{status}]")
    total_cells = total_rows * len(df_cleaned.columns)
    null_cells = sum(df_cleaned.filter(col(c).isNull()).count() for c in df_cleaned.columns)
    completeness = ((total_cells - null_cells) / total_cells) * 100
    print(f"\n--- Overall Data Quality Score ---")
    print(f"  Completeness: {completeness:.1f}%")
    print("="*60)

generate_quality_report(df_raw, df_clean)

## 6.3 Final Challenge: End-to-End Data Cleaning

Apply everything you've learned to clean a complex real-world dataset.

**Dataset:** E-commerce transaction data with multiple quality issues

**Tasks:**
1. Identify and report all data quality issues
2. Handle missing values appropriately for each column
3. Standardize text fields (product names, categories)
4. Parse and validate dates
5. Clean and validate email addresses
6. Extract useful information using regex
7. Create a comprehensive data quality report
8. Output the cleaned dataset

In [None]:
# Complex e-commerce dataset
ecommerce_data = [
    ("ORD001", "  LAPTOP Computer  ", "ELECTRONICS", "john@email.com", "$999.99", "2", "2024-01-15 10:30:00", "Completed"),
    ("ORD002", "wireless mouse", "electronics", "JANE@COMPANY.CO.UK", "29.99", None, "15/01/2024", "completed"),
    ("ORD003", "USB-C Cable (6ft)", "Accessories", "invalid-email", "$12.50", "5", "January 16, 2024", "PENDING"),
    ("ORD004", None, "CLOTHING", "bob@test.org", "N/A", "1", "2024-01-16", "Cancelled"),
    ("ORD005", "Running Shoes - Size 10", "  clothing  ", "", "89.00", "2", None, "Shipped"),
    ("ORD006", "HDMI Cable!!!", "accessories", "alice@domain.net", "$15.00", "10", "2024-01-17 14:20:00", "completed"),
    ("ORD007", "smartphone case", "Electronics", "charlie@email.com", "$25", "3", "17-Jan-2024", "Processing"),
    ("ORD008", "  Winter Jacket  ", "Clothing", "N/A", "$149.99", None, "2024-01-18", "SHIPPED"),
]

df_ecommerce = spark.createDataFrame(
    ecommerce_data,
    ["order_id", "product", "category", "email", "price", "quantity", "order_date", "status"]
)

print("Raw e-commerce data:")
df_ecommerce.show(truncate=False)

In [None]:
# Your code here: Build a complete preprocessing pipeline



---
# Cleanup
---

Always remember to stop your Spark session when you're done.

In [None]:
# Stop the Spark session
spark.stop()
print("Spark session stopped.")

---
# Summary
---

In this lab, you learned:

1. **Missing Value Handling**
   - Identifying missing data (null, NaN, empty strings, placeholders)
   - Dropping rows/columns with missing values
   - Imputation strategies (mean, median, mode, group-wise)
   - Forward/backward fill for time series
   - Creating missing value indicators

2. **Data Type Management**
   - Understanding Spark data types
   - Type casting and conversion
   - Handling dates and timestamps
   - Schema validation and enforcement

3. **Text Preprocessing**
   - Case normalization and trimming
   - Tokenization and text splitting
   - Stop word removal
   - Building text cleaning pipelines

4. **Regular Expressions**
   - Pattern matching with rlike
   - Data extraction with regexp_extract
   - Pattern replacement with regexp_replace
   - Complex parsing tasks (emails, phones, addresses, logs)

5. **Building Pipelines**
   - Combining multiple preprocessing steps
   - Creating reusable functions
   - Data quality reporting

---
# Additional Resources
---

- Apache Spark Documentation: https://spark.apache.org/docs/latest/
- PySpark API Reference: https://spark.apache.org/docs/latest/api/python/
- Spark SQL Functions: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html
- Regular Expression Reference: https://docs.python.org/3/library/re.html
- Spark ML Feature Transformers: https://spark.apache.org/docs/latest/ml-features.html