# PySpark Data Pipeline Example with Caching

This notebook demonstrates a simple data pipeline in PySpark and shows common use cases for the `.cache()` method.

## 1. Initial Setup

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, regexp_replace, current_timestamp
from pyspark.sql.types import StringType, IntegerType, StructType, StructField

# Create a Spark session
spark = SparkSession.builder     .appName("Data Quality Pipeline")     .getOrCreate()

# Configure logging
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## 2. Loading Data

In [None]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("email", StringType(), True),
    StructField("signup_date", StringType(), True)
])

df = spark.read.csv("path/to/file.csv", schema=schema, header=True)
logger.info("Data loaded successfully")

## 3. Data Cleaning and Transformation

In [None]:
df = df.withColumn("name", trim(lower(col("name"))))
df = df.withColumn("email", regexp_replace(col("email"), "\s+", ""))
df = df.withColumn("signup_date", regexp_replace(col("signup_date"), "/", "-"))
df = df.withColumn("ingestion_time", current_timestamp())
logger.info("Data cleaned and transformed")

## 4. Data Validation

In [None]:
def validate_email(email):
    import re
    if re.match(r"[^@]+@[^@]+\.[^@]+", email):
        return email
    return None

from pyspark.sql.functions import udf
validate_email_udf = udf(validate_email, StringType())

df = df.withColumn("valid_email", validate_email_udf(col("email")))
df = df.filter(col("valid_email").isNotNull())
logger.info("Data validated")

## 5. Data Enrichment

In [None]:
df = df.withColumn("name_length", col("name").cast(StringType()).rlike("^[a-zA-Z]+$"))
logger.info("Data enriched")

## 6. Aggregation and Analysis

In [None]:
result = df.groupBy("name_length").avg("age")
logger.info("Aggregation and analysis complete")

## 7. Saving Data

In [None]:
result.write.mode("overwrite").parquet("path/to/output/folder")
logger.info("Data saved successfully")

## 8. Monitoring and Logging

In [None]:
logger.info("Pipeline completed successfully")

## Using `.cache()` in PySpark

The `.cache()` method stores a DataFrame or RDD in memory so that subsequent actions reuse the cached data instead of recomputing it. This is helpful when the same dataset is accessed multiple times.

### 1. Repeated Operations on the Same DataFrame

In [None]:
df = spark.read.csv("path/to/large_file.csv", header=True, inferSchema=True)

df.cache()

df.count()
df.groupBy("column_name").count().show()

### 2. Data Exploration and Profiling

In [None]:
df = spark.read.parquet("path/to/data.parquet")

df.cache()

df.describe().show()
df.groupBy("column_name").agg({"column_name": "mean"}).show()
df.filter(df["column_name"] > 100).show()

### 3. ETL Pipelines

In [None]:
df = spark.read.json("path/to/data.json")

df = df.withColumn("new_column", df["existing_column"] * 2)

df.cache()

df = df.filter(df["new_column"] > 100)

df.write.parquet("path/to/output.parquet")

### 4. Iterative Machine Learning Algorithms

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator

df = spark.read.parquet("path/to/data.parquet")
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
df = assembler.transform(df).select("features")

df.cache()

kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df)

predictions = model.transform(df)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette score: {silhouette}")

### 5. Join Operations

In [None]:
df1 = spark.read.csv("path/to/data1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("path/to/data2.csv", header=True, inferSchema=True)
df3 = spark.read.csv("path/to/data3.csv", header=True, inferSchema=True)

df1.cache()

joined_df1 = df1.join(df2, "key_column")
joined_df2 = df1.join(df3, "key_column")

joined_df1.show()
joined_df2.show()

### 6. Data Subsets for Model Evaluation

In [None]:
df = spark.read.csv("path/to/data.csv", header=True, inferSchema=True)

train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

train_df.cache()
test_df.cache()

model = train_model(train_df)
evaluate_model(model, test_df)

### Releasing Cached Data

In [None]:
df.unpersist()