# Lesson 21 - Real-World Projects

Okay, here are detailed technical notes on PySpark, structured for professional learners and suitable for training material or reference documentation. The notes focus on PySpark functionality, architecture, coding practices, and advanced concepts, using real-world project contexts as examples.

---

## PySpark Technical Notes for Professional Learners

### Introduction to Apache Spark and PySpark

**Theory:**

Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python (PySpark), and R, along with an optimized engine that supports general execution graphs. Key characteristics include:

1.  **Speed:** Spark can be significantly faster than Hadoop MapReduce due to in-memory computation capabilities and optimized execution graphs (DAGs).
2.  **Ease of Use:** Rich APIs (like DataFrames and SQL) simplify complex data processing tasks.
3.  **Generality:** Spark combines SQL, streaming analytics, machine learning (MLlib), and graph processing (GraphX) within a single framework.
4.  **Runs Everywhere:** Spark can run on Hadoop YARN, Apache Mesos, Kubernetes, standalone, or in the cloud.

**PySpark** is the Python API for Apache Spark. It allows data scientists and engineers to leverage the power of Spark using the familiar Python language and its rich ecosystem of libraries (like Pandas, NumPy, scikit-learn), while Spark handles the distributed computation across a cluster.

**Core Abstraction: Resilient Distributed Datasets (RDDs)**
While historically foundational, modern PySpark development primarily uses higher-level abstractions like DataFrames. RDDs represent an immutable, partitioned collection of elements that can be operated on in parallel. They offer fault tolerance through lineage information (how an RDD was derived from others). You might encounter RDDs when needing low-level control or dealing with unstructured data, but DataFrames are generally preferred for structured/semi-structured data due to performance optimizations.

**Primary Abstraction: DataFrames**
Built on top of RDDs, DataFrames organize data into named columns, similar to tables in relational databases or pandas DataFrames. They benefit from Spark's Catalyst optimizer and Tungsten execution engine, providing significant performance improvements over RDDs for structured data processing. DataFrames can be manipulated using domain-specific language (DSL) methods (e.g., `select`, `filter`, `groupBy`) or standard SQL queries.

### Spark Architecture Fundamentals

**Theory:**

Understanding Spark's architecture is crucial for writing efficient applications and debugging performance issues.

1.  **Driver Program:** The process running the `main()` function of your application (e.g., your PySpark script). It creates the `SparkContext` (or `SparkSession`), which coordinates the execution.
    *   Responsibilities: Hosts the application logic, analyzes, distributes, and schedules work across the Executors, maintains metadata about the application.
2.  **Cluster Manager:** An external service responsible for acquiring resources on the cluster (e.g., YARN, Mesos, Kubernetes, Spark Standalone). The Driver requests resources from the Cluster Manager.
3.  **Executor:** A process launched for an application on a worker node.
    *   Responsibilities: Executes tasks assigned by the Driver, holds data partitions in memory or on disk (cache), reports results and status back to the Driver. Each application has its own Executors.
4.  **SparkSession:** The unified entry point for Spark functionality since Spark 2.0. It encapsulates `SparkContext`, `SQLContext`, `HiveContext`, and `StreamingContext`.
5.  **Job:** A parallel computation consisting of multiple tasks, triggered by a Spark *action* (e.g., `count()`, `collect()`, `save()`).
6.  **Stage:** Each job is divided into smaller sets of tasks called *stages*, separated by *shuffle* operations (wide transformations). Tasks within a stage can run in parallel without data shuffling.
7.  **Task:** A unit of work sent by the Driver to an Executor to be executed on a specific data partition.

**Lazy Evaluation:** Spark transformations (e.g., `select`, `filter`, `map`) are *lazy*. They don't execute immediately. Instead, Spark builds a Directed Acyclic Graph (DAG) of transformations. The computation starts only when an *action* (e.g., `show`, `count`, `collect`, `save`) is called. This allows Spark to optimize the entire execution plan.

**Code Example: Initializing SparkSession**

```python
# Import the SparkSession module
from pyspark.sql import SparkSession

# Create a SparkSession instance
# .appName(): Sets a name for the application, visible in the Spark UI.
# .master(): Specifies the cluster manager URL ('local[*]' runs locally using all available cores).
#           For cluster deployment, this would be 'yarn', 'k8s://<api_server>', etc.
# .config(): Allows setting various Spark configuration properties.
# .getOrCreate(): Gets an existing SparkSession or creates a new one if none exists.
spark = SparkSession.builder \
    .appName("PySparkFundamentals") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Access SparkContext (though often not needed directly with DataFrames)
sc = spark.sparkContext
sc.setLogLevel("WARN") # Reduce log verbosity

print(f"SparkSession initialized. Spark version: {spark.version}")

# Example: Stop the SparkSession when done (important in scripts)
# spark.stop()
```

**Explanation:**

*   `from pyspark.sql import SparkSession`: Imports the necessary class.
*   `SparkSession.builder`: Accesses the builder pattern to configure the session.
*   `.appName("...")`: Assigns a name for tracking the application.
*   `.master("local[*]")`: Tells Spark to run locally using as many worker threads as logical cores on the machine. For production, this would point to your cluster manager (e.g., `yarn`).
*   `.config("spark.driver.memory", "2g")`: Sets the memory allocated to the Driver process.
*   `.config("spark.executor.memory", "4g")`: Sets the memory allocated to each Executor process. These are crucial tuning parameters.
*   `.getOrCreate()`: Constructs the `SparkSession` object.
*   `spark.sparkContext`: Accesses the underlying `SparkContext` if needed for RDD operations or lower-level configuration.
*   `sc.setLogLevel("WARN")`: Filters log messages to show only warnings and errors, making output cleaner.
*   `spark.stop()`: Releases the resources used by the Spark application. Essential to call at the end of a script.

---

### Project Example 1: Retail Sales Dashboard Data Aggregation

**Goal:** Process raw retail sales data (e.g., CSV files) to compute daily/monthly sales summaries per store or product category, suitable for feeding a dashboard.

**PySpark Concepts Illustrated:** Data Ingestion (CSV), Schema Inference/Definition, DataFrame Transformations (`select`, `withColumn`, `filter`, `groupBy`, `agg`), Data Output (Parquet).

**Theory: DataFrames and Transformations**

DataFrames provide a structured view of data. Common operations include:

*   **Reading Data:** `spark.read.format(...).load(...)` or shortcuts like `spark.read.csv(...)`. Schema can be inferred or explicitly defined for robustness and performance.
*   **Selecting Columns:** `df.select("col1", "col2")`
*   **Filtering Rows:** `df.filter(df["col_name"] > value)` or `df.where("col_name > value")` (using SQL syntax).
*   **Adding/Modifying Columns:** `df.withColumn("new_col", expression)`
*   **Grouping and Aggregating:** `df.groupBy("key_col").agg(agg_function("value_col"))`
*   **Transformations:**
    *   **Narrow:** Input partitions map one-to-one to output partitions (e.g., `select`, `filter`, `withColumn`). No data shuffling across the network is required. Efficient.
    *   **Wide:** Input partitions contribute to multiple output partitions (e.g., `groupBy`, `join` on non-partitioned keys). Requires shuffling data between executors, which is expensive.

**Code Example:**

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, date_format, month, year
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Initialize SparkSession (assuming already done from previous section)
# spark = SparkSession.builder.appName("RetailSales").getOrCreate()

# 1. Data Ingestion with Explicit Schema
# Define the schema for better performance and data integrity
schema = StructType([
    StructField("TransactionID", StringType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("StoreID", StringType(), True),
    StructField("ProductID", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("UnitPrice", DoubleType(), True)
])

# Load data from CSV
# header=True: Uses the first line as header.
# schema=schema: Applies the defined schema.
# timestampFormat: Specifies the format for parsing timestamp strings.
sales_df = spark.read.csv(
    "path/to/your/sales_data.csv",
    header=True,
    schema=schema,
    timestampFormat="yyyy-MM-dd HH:mm:ss"
)

# 2. Initial Exploration (Optional but Recommended)
print("Sales Data Schema:")
sales_df.printSchema()
print("Sample Sales Data:")
sales_df.show(5, truncate=False)

# 3. Data Cleaning / Preparation (Example: Handle potential nulls if necessary)
# For this example, assume data is clean or schema handles nullability

# 4. Feature Engineering & Transformation
# Calculate total sale amount per transaction
sales_df = sales_df.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))

# Extract date components for aggregation
sales_df = sales_df.withColumn("SaleDate", date_format(col("Timestamp"), "yyyy-MM-dd"))
sales_df = sales_df.withColumn("SaleMonth", date_format(col("Timestamp"), "yyyy-MM"))
# Alternative using month() and year() functions
# sales_df = sales_df.withColumn("SaleYear", year(col("Timestamp")))
# sales_df = sales_df.withColumn("SaleMonthNum", month(col("Timestamp")))

# 5. Aggregation - Daily Sales per Store
daily_sales_summary = sales_df.groupBy("StoreID", "SaleDate") \
    .agg(
        sum("TotalAmount").alias("TotalDailySales"),
        count("TransactionID").alias("TotalDailyTransactions")
    ) \
    .orderBy("StoreID", "SaleDate")

print("Daily Sales Summary per Store:")
daily_sales_summary.show(10)

# 6. Aggregation - Monthly Sales per Store
monthly_sales_summary = sales_df.groupBy("StoreID", "SaleMonth") \
    .agg(
        sum("TotalAmount").alias("TotalMonthlySales"),
        count("TransactionID").alias("TotalMonthlyTransactions")
    ) \
    .orderBy("StoreID", "SaleMonth")

print("Monthly Sales Summary per Store:")
monthly_sales_summary.show(10)


# 7. Data Output
# Save the aggregated results, often in an optimized format like Parquet
# mode("overwrite"): If the destination exists, replace its contents.
# partitionBy(): Optional - physically partitions data on disk for faster reads on filtered queries.
daily_sales_summary.write \
    .mode("overwrite") \
    .partitionBy("StoreID") \
    .parquet("path/to/output/daily_sales_summary.parquet")

monthly_sales_summary.write \
    .mode("overwrite") \
    .parquet("path/to/output/monthly_sales_summary.parquet")

# Remember to stop the SparkSession
# spark.stop()
```

**Explanation:**

*   **`schema = StructType([...])`**: Defines the expected structure and data types of the input CSV. Using an explicit schema avoids a potentially slow and error-prone schema inference step, and ensures data type correctness.
*   **`spark.read.csv(...)`**: Reads the CSV file into a DataFrame using the specified options and schema.
*   **`printSchema()`**: Displays the DataFrame's schema (column names and types).
*   **`show(5, truncate=False)`**: Displays the first 5 rows without truncating column content. Useful for quick inspection.
*   **`withColumn("TotalAmount", ...)`**: Adds a new column `TotalAmount` calculated by multiplying `Quantity` and `UnitPrice`. `col()` is used to refer to existing columns.
*   **`withColumn("SaleDate", ...)`**: Extracts the date part from the `Timestamp` column using `date_format`.
*   **`groupBy("StoreID", "SaleDate")`**: Groups the DataFrame rows based on unique combinations of `StoreID` and `SaleDate`. This is a wide transformation, potentially involving data shuffling.
*   **`.agg(...)`**: Performs aggregations on each group.
    *   `sum("TotalAmount").alias("...")`: Calculates the sum of `TotalAmount` for each group and names the resulting column `TotalDailySales`.
    *   `count("TransactionID").alias("...")`: Counts the number of transactions in each group.
*   **`.orderBy(...)`**: Sorts the results for better readability (can be expensive on large data).
*   **`write.mode("overwrite")...`**: Specifies how to save the DataFrame. `overwrite` replaces existing data. Other modes include `append`, `ignore`, `errorifexists`.
*   **`.partitionBy("StoreID")`**: Instructs Spark to partition the output data into separate directories based on the values in the `StoreID` column. This significantly speeds up queries that filter by `StoreID` (e.g., `spark.read.parquet(...).filter(col("StoreID") == "S101")`).
*   **`.parquet(...)`**: Saves the DataFrame in the Parquet format. Parquet is a columnar storage format optimized for analytical queries, offering good compression and performance.

**Use Cases:** Business intelligence reporting, feeding dashboards, data warehousing ETL.

---

### Project Example 2: Fraud Detection Pipeline (Feature Engineering)

**Goal:** Process transaction data and user activity logs to engineer features that can be used by a machine learning model to detect potentially fraudulent activities.

**PySpark Concepts Illustrated:** Joins, Window Functions, UDFs (and alternatives), Caching/Persistence.

**Theory: Joins, Window Functions, and Performance**

*   **Joins:** Combine rows from two DataFrames based on a related column. Spark supports various join types (`inner`, `left_outer`, `right_outer`, `full_outer`, `left_semi`, `left_anti`). Performance depends heavily on data size, partitioning, and whether a *Broadcast Join* can be used (if one DataFrame is small enough to fit in each Executor's memory).
*   **Window Functions:** Perform calculations across a set of table rows that are somehow related to the current row (similar to SQL window functions). Unlike `groupBy`, they don't collapse rows; they return a value for *each* row based on a "window" of related rows defined by `PARTITION BY` and `ORDER BY` clauses. Examples: calculating running totals, ranking, accessing previous/next row's value.
*   **User-Defined Functions (UDFs):** Allow executing custom Python code on DataFrame columns. While flexible, UDFs can be performance bottlenecks because:
    *   Spark cannot optimize the Python code within the UDF.
    *   Data must be serialized/deserialized between the JVM (Spark) and the Python interpreter.
    *   Whenever possible, use built-in Spark SQL functions, which operate directly on Spark's internal optimized format (Tungsten). Use UDFs only when the required logic cannot be expressed using built-in functions. Pandas UDFs (Vectorized UDFs) offer better performance by working on Pandas Series/DataFrames via Apache Arrow.
*   **Caching (`.cache()`, `.persist()`):** Stores the DataFrame's partitions in memory (or disk, depending on the storage level) on the Executors. This is beneficial when a DataFrame is used multiple times in the execution plan (e.g., in iterative algorithms or when branching logic uses the same base data). `cache()` is a shorthand for `persist(StorageLevel.MEMORY_ONLY)`. Use judiciously, as memory is a finite resource.

**Code Example:**

```python
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, avg, lag, lead, rank, datediff, expr, udf
from pyspark.sql.types import DoubleType
# from pyspark.sql.pandas.functions import pandas_udf # Alternative for better performance

# Assume 'spark' session is available and 'sales_df' from Project 1 exists
# Let's assume another DataFrame 'user_activity_df' exists
# Schema: UserID, ActivityTimestamp, ActivityType, DeviceID

# Sample DataFrames (Replace with actual loading)
# sales_df = spark.read.parquet("path/to/sales_data.parquet") # Assuming it includes UserID
# user_activity_df = spark.read.parquet("path/to/user_activity.parquet")

# For demonstration, let's create dummy DataFrames
data_sales = [("T1", "U1", "2023-10-26 10:00:00", 100.0),
              ("T2", "U2", "2023-10-26 10:05:00", 50.0),
              ("T3", "U1", "2023-10-26 11:15:00", 200.0),
              ("T4", "U1", "2023-10-27 09:30:00", 10.0),
              ("T5", "U2", "2023-10-27 14:00:00", 75.0),
              ("T6", "U1", "2023-10-27 14:05:00", 300.0)] # High value txn shortly after low value
columns_sales = ["TransactionID", "UserID", "Timestamp", "Amount"]
sales_df = spark.createDataFrame(data_sales, columns_sales) \
    .withColumn("Timestamp", col("Timestamp").cast("timestamp"))

data_activity = [("U1", "2023-10-26 09:58:00", "login", "DevA"),
                 ("U2", "2023-10-26 10:03:00", "login", "DevB"),
                 ("U1", "2023-10-26 11:13:00", "update_profile", "DevA"),
                 ("U1", "2023-10-27 09:00:00", "login", "DevC"), # Login from new device
                 ("U2", "2023-10-27 13:55:00", "login", "DevB")]
columns_activity = ["UserID", "ActivityTimestamp", "ActivityType", "DeviceID"]
user_activity_df = spark.createDataFrame(data_activity, columns_activity) \
    .withColumn("ActivityTimestamp", col("ActivityTimestamp").cast("timestamp"))


# 1. Feature Engineering using Window Functions
# Calculate average transaction amount for the user *up to the current transaction*
# Define the window specification: Partition by UserID, order by Timestamp
user_window = Window.partitionBy("UserID").orderBy("Timestamp") \
                    .rowsBetween(Window.unboundedPreceding, Window.currentRow) # Or Window.currentRow for current row only avg

sales_with_avg = sales_df.withColumn(
    "UserAvgTxnAmount",
    avg("Amount").over(user_window)
)

# Calculate time difference between consecutive transactions for the same user
user_window_prev = Window.partitionBy("UserID").orderBy("Timestamp")
sales_with_time_diff = sales_with_avg.withColumn(
    "PrevTxnTimestamp",
    lag("Timestamp", 1).over(user_window_prev) # Get timestamp of previous row in window
)

# Calculate time diff in seconds
sales_with_time_diff = sales_with_time_diff.withColumn(
    "TimeSinceLastTxn_sec",
    col("Timestamp").cast("long") - col("PrevTxnTimestamp").cast("long")
)
sales_with_time_diff = sales_with_time_diff.fillna(0, subset=["TimeSinceLastTxn_sec"]) # Handle first transaction

print("Sales with Window Features:")
sales_with_time_diff.show(truncate=False)

# 2. Feature Engineering using Joins
# Add user's last login information before the transaction
# Use a non-equi join condition (requires Spark 3.0+) or a more complex approach for older versions
# For simplicity here, let's find the *closest prior* login (can be complex, often approximated)

# A common pattern: Find the latest activity *before* the transaction time for each user
# This often involves joins and aggregations or window functions on the activity table first.

# Simpler Example: Join sales with the *latest* user activity overall (less precise for fraud)
latest_activity = user_activity_df.groupBy("UserID") \
                                .agg(expr("max(ActivityTimestamp)").alias("LastActivityTimestamp"))

sales_joined = sales_with_time_diff.join(
    latest_activity,
    on="UserID",
    how="left"
)

print("Sales joined with latest activity timestamp:")
sales_joined.show(truncate=False)

# More relevant: Join transaction with activity *immediately preceding* it.
# This often requires careful handling of timestamps and potentially complex joins or window functions.

# Example: Feature - Transaction Amount compared to User's Average
# This was already computed (`UserAvgTxnAmount`)


# 3. Feature Engineering using UDF (Use with Caution - Prefer built-in functions)
# Example: Calculate a custom risk score based on amount and time (Illustrative only)
def calculate_risk_score(amount, avg_amount, time_since_last):
    if avg_amount is None or avg_amount == 0 or time_since_last is None:
        return 0.0
    amount_ratio = amount / avg_amount
    time_factor = 1 / (1 + time_since_last / 3600.0) # Higher score if time is short (in hours)

    # Simple heuristic (replace with actual model/logic)
    score = amount_ratio * time_factor
    return float(score) if score is not None else 0.0

# Register the function as a UDF, specifying the return type
risk_score_udf = udf(calculate_risk_score, DoubleType())

# Apply the UDF
# Note: Ensure columns passed to UDF are not null or handle nulls inside the UDF
final_features = sales_with_time_diff.withColumn(
    "CustomRiskScore",
    risk_score_udf(col("Amount"), col("UserAvgTxnAmount"), col("TimeSinceLastTxn_sec"))
)

print("Final Features with Custom UDF Score:")
final_features.select("TransactionID", "UserID", "Amount", "UserAvgTxnAmount", "TimeSinceLastTxn_sec", "CustomRiskScore").show(truncate=False)


# 4. Caching Intermediate Results
# If 'final_features' DataFrame is used multiple times later (e.g., for model training AND analysis)
final_features.persist() # Or .cache()

# Perform actions that use the cached DataFrame
count = final_features.count()
print(f"Total transactions processed: {count}")
high_risk = final_features.filter(col("CustomRiskScore") > 1.5).count()
print(f"Transactions with risk score > 1.5: {high_risk}")

# Unpersist when done to free up memory
final_features.unpersist()

# spark.stop()
```

**Explanation:**

*   **`Window.partitionBy(...).orderBy(...)`**: Defines a window specification. Partitions data by `UserID` and orders rows within each partition by `Timestamp`. `rowsBetween` defines the frame boundaries (e.g., all preceding rows up to the current row).
*   **`avg("Amount").over(user_window)`**: Calculates the average `Amount` over the specified window frame for each row.
*   **`lag("Timestamp", 1).over(...)`**: Accesses the value of the `Timestamp` column from the previous row within the window partition. `lead()` accesses the next row.
*   **`.cast("long")`**: Converts timestamp to Unix epoch seconds (as long integer) for easy subtraction to find time differences.
*   **`.fillna(0, ...)`**: Replaces null values (which occur for the first transaction where `lag` finds no previous row) with 0.
*   **`join(..., on="UserID", how="left")`**: Performs a left outer join. Keeps all rows from the left DataFrame (`sales_with_time_diff`) and adds matching data from the right (`latest_activity`). If no match is found for a `UserID`, columns from the right DataFrame will be null.
*   **`udf(calculate_risk_score, DoubleType())`**: Registers the Python function `calculate_risk_score` as a Spark UDF, specifying that it returns a `DoubleType`.
*   **`risk_score_udf(...)`**: Applies the UDF to the specified columns. Spark serializes row data, sends it to a Python process, executes the function, and deserializes the result. *This is often slower than built-in functions.*
*   **`.persist()` / `.cache()`**: Caches the DataFrame `final_features` in memory across the cluster's executors. Subsequent actions on `final_features` will read from the cache instead of recomputing the entire lineage, speeding up iterative workloads.
*   **`.unpersist()`**: Removes the DataFrame from the cache, freeing up executor memory. It's crucial to unpersist when the cached data is no longer needed.

**Performance Considerations:**

*   **Joins:** Analyze join strategies using `df.explain()`. Ensure join keys are well-distributed. Consider broadcasting smaller tables (`spark.sql.autoBroadcastJoinThreshold` config or `broadcast()` hint). Ensure data types of join keys match.
*   **Window Functions:** Can be expensive, especially with large partitions or complex window frames. Ensure `partitionBy` keys are effective.
*   **UDFs:** Avoid if possible. Explore built-in functions first. If necessary, consider Pandas UDFs for vectorized execution, which can be significantly faster for complex numerical or string operations.
*   **Caching:** Use `cache()` or `persist()` strategically for DataFrames that are accessed multiple times. Monitor memory usage via the Spark UI. Choose appropriate storage levels (`MEMORY_ONLY`, `MEMORY_AND_DISK`, etc.) based on data size and memory availability. Don't cache everything.

**Use Cases:** Real-time fraud detection systems (often combined with streaming), risk modeling, anomaly detection, cybersecurity analytics.

---

### Project Example 3: Customer Segmentation on E-Commerce Data

**Goal:** Group customers into distinct segments based on their purchasing behavior (e.g., frequency, recency, monetary value - RFM analysis) or other attributes. This often involves feature engineering followed by clustering algorithms (like K-Means from MLlib). Here, we focus on the PySpark data preparation part.

**PySpark Concepts Illustrated:** Advanced Aggregations, Date/Time Functions, Data Scaling (MLlib), VectorAssembler (MLlib).

**Theory: Feature Engineering for Segmentation**

Customer segmentation requires creating meaningful features that capture distinct customer behaviors. RFM analysis is a common technique:

*   **Recency:** How recently did the customer make a purchase? (e.g., days since last purchase).
*   **Frequency:** How often does the customer purchase? (e.g., total number of transactions).
*   **Monetary Value:** How much does the customer spend? (e.g., total or average transaction value).

These features often need to be scaled before being fed into distance-based clustering algorithms like K-Means. Spark's MLlib library provides tools for this.

**Code Example:**

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max as spark_max, count, sum, datediff, current_date, lit
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Assume 'spark' session and 'sales_df' (with UserID, Timestamp, TotalAmount) are available

# 1. Calculate RFM Features
# Find the most recent date in the dataset (or use current date)
# Using current_date() assumes data is up-to-date. If historical, use max timestamp.
max_date_df = sales_df.agg(spark_max("Timestamp").alias("MaxTimestamp"))
# Extract the actual date value to use in calculations
max_date = max_date_df.first()["MaxTimestamp"]
# Use lit() to treat max_date as a literal value in DataFrame operations
# Alternatively, crossJoin max_date_df if needed in complex scenarios.

# Calculate Recency, Frequency, Monetary Value per UserID
rfm_df = sales_df.groupBy("UserID").agg(
    # Recency: Days since last purchase from the reference date
    datediff(lit(max_date), spark_max("Timestamp")).alias("Recency"),
    # Frequency: Total number of transactions
    count("TransactionID").alias("Frequency"),
    # Monetary: Total amount spent by the user
    sum("TotalAmount").alias("MonetaryValue")
)

print("RFM Features:")
rfm_df.show(10)

# 2. Prepare Features for ML Clustering (using MLlib)
# K-Means requires features combined into a single vector column.

# Define input columns for the vector
feature_cols = ["Recency", "Frequency", "MonetaryValue"]

# Use VectorAssembler to combine features into a single vector column
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_unscaled"
)

# Use StandardScaler to scale features (important for distance-based algorithms like K-Means)
# StandardScaler standardizes features by removing the mean and scaling to unit variance.
# setWithStd(True): Scale data to unit standard deviation.
# setWithMean(True): Center data with mean before scaling (optional, depends on algorithm).
scaler = StandardScaler(
    inputCol="features_unscaled",
    outputCol="features_scaled",
    withStd=True,
    withMean=False # K-Means is sensitive to scale, less sensitive to mean centering
)

# Create a Pipeline to chain the assembler and scaler
pipeline = Pipeline(stages=[assembler, scaler])

# Fit the pipeline to the data (calculates necessary statistics like mean/std dev)
scaler_model = pipeline.fit(rfm_df)

# Transform the data to add the scaled features column
rfm_scaled_df = scaler_model.transform(rfm_df)

print("RFM Features Scaled for Clustering:")
rfm_scaled_df.select("UserID", "Recency", "Frequency", "MonetaryValue", "features_scaled").show(10, truncate=False)

# 3. Next Steps (Conceptual - MLlib K-Means)
# from pyspark.ml.clustering import KMeans
#
# kmeans = KMeans(featuresCol="features_scaled", k=5) # Specify number of clusters (k)
# model = kmeans.fit(rfm_scaled_df)
# predictions = model.transform(rfm_scaled_df)
# predictions.select("UserID", "prediction").show() # 'prediction' column holds the cluster ID

# 4. Output Prepared Features (Optional)
rfm_scaled_df.write \
    .mode("overwrite") \
    .parquet("path/to/output/customer_features_scaled.parquet")

# spark.stop()
```

**Explanation:**

*   **`agg(max(...))`**: Used to find the latest timestamp in the entire dataset, which serves as the reference point for calculating recency.
*   **`first()["MaxTimestamp"]`**: Retrieves the maximum timestamp value from the single-row DataFrame created by `agg`.
*   **`lit(max_date)`**: Creates a literal column containing the `max_date` value, allowing it to be used in functions like `datediff` across all rows.
*   **`datediff(lit(max_date), spark_max("Timestamp"))`**: Calculates the difference in days between the reference date and the user's latest purchase date (`spark_max` within the `groupBy`).
*   **`count("TransactionID")`**: Counts transactions per user for Frequency.
*   **`sum("TotalAmount")`**: Sums purchase amounts per user for Monetary value.
*   **`VectorAssembler`**: An MLlib *Transformer* that combines a list of columns into a single vector column (`features_unscaled`). This is a standard requirement for most MLlib algorithms.
*   **`StandardScaler`**: An MLlib *Estimator*. When `fit()` is called, it computes the standard deviation (and optionally mean) for each feature in the input vector column. The resulting *Model* (`scaler_model`) can then `transform()` the data, applying the scaling.
*   **`Pipeline`**: Chains multiple MLlib stages (Transformers and Estimators) together. `fit()` calls `fit()` on Estimators and `transform()` on Transformers sequentially. This simplifies the workflow.
*   **`scaler_model.transform(rfm_df)`**: Applies the fitted pipeline (assembler then scaler) to the RFM data, producing the `features_scaled` column.
*   **`features_scaled` column:** Contains the standardized RFM values in a vector format, ready for clustering algorithms like K-Means.

**Use Cases:** Targeted marketing campaigns, personalized recommendations, customer churn prediction, understanding customer lifetime value.

---

### Project Example 4: Twitter Sentiment Analysis (Streaming)

**Goal:** Process a stream of tweets in near real-time, perform sentiment analysis, and aggregate results (e.g., count positive/negative tweets per topic).

**PySpark Concepts Illustrated:** Structured Streaming (Sources, Sinks, Transformations), Windowing on Event Time, Watermarking.

**Theory: Structured Streaming**

Structured Streaming is Spark's scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It treats a live data stream as a continuously growing table. You can apply standard DataFrame/SQL operations on this "input table" and output the results to a "result table".

*   **Input Sources:** Kafka, Kinesis, Event Hubs, files (various formats), sockets (for testing).
*   **Output Sinks:** Kafka, Kinesis, files, console (for debugging), foreachBatch (for custom logic, e.g., writing to databases).
*   **Model:** Based on micro-batches. Spark processes stream data in small batches, achieving low end-to-end latency and exactly-once fault tolerance.
*   **Event Time Processing:** Allows handling data based on timestamps embedded in the data itself (event time), rather than when Spark processes it (processing time). Crucial for handling out-of-order data.
*   **Windowing:** Group data into time windows (e.g., 5-minute tumbling windows) for aggregation. Requires an event time column.
*   **Watermarking:** A mechanism to handle late-arriving data in event-time processing. Spark tracks the maximum observed event time (watermark) and drops state for windows older than the watermark, limiting the amount of state maintained.

**Code Example:**

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, expr, window, udf, lower
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Assume 'spark' session is available

# 1. Define Schema for incoming JSON data (e.g., from Kafka)
# Assuming tweets have 'text' and 'created_at' fields
tweet_schema = StructType([
    StructField("created_at", StringType(), True), # Assuming string format initially
    StructField("id_str", StringType(), True),
    StructField("text", StringType(), True)
    # Add other fields as needed (e.g., user info)
])

# 2. Define Input Stream
# Read from a Kafka source (replace with actual brokers and topic)
# Requires 'spark-sql-kafka-0-10' package:
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.x.x ...
streaming_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "your_kafka_broker:9092") \
    .option("subscribe", "twitter_topic") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka messages have 'key', 'value', 'topic', 'partition', 'offset', 'timestamp', 'timestampType'
# The tweet data is usually in the 'value' column (as bytes)

# 3. Process the Stream
# Decode the 'value' column from bytes to string, then parse JSON
# Select the 'value' column, cast it to STRING
# Use from_json to parse the string using the defined schema
parsed_stream = streaming_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), tweet_schema).alias("data")) \
    .select("data.*") # Flatten the struct fields into columns

# Convert 'created_at' string to Timestamp and handle potential format issues
# Example format: "Wed Oct 26 10:00:00 +0000 2023"
# Use expr for complex timestamp conversion
# Note: Timestamp parsing in streams needs care; ensure format consistency.
#       It might be better to pre-process timestamps into a standard format upstream.
parsed_stream = parsed_stream.withColumn(
    "event_time",
    expr("to_timestamp(created_at, 'EEE MMM dd HH:mm:ss Z yyyy')")
)

# Filter out rows where timestamp parsing failed
parsed_stream = parsed_stream.filter(col("event_time").isNotNull())

# 4. Perform Sentiment Analysis (Illustrative - using a simple UDF)
# In a real application, use a pre-trained model or a more robust library.
def basic_sentiment(text):
    text = text.lower()
    if "happy" in text or "good" in text or "excellent" in text:
        return "positive"
    elif "sad" in text or "bad" in text or "terrible" in text:
        return "negative"
    else:
        return "neutral"

sentiment_udf = udf(basic_sentiment, StringType())

# Apply sentiment analysis
sentiment_stream = parsed_stream.withColumn("sentiment", sentiment_udf(lower(col("text"))))

# 5. Aggregate results over a time window with Watermarking
# Count tweets by sentiment in 5-minute tumbling windows based on 'event_time'
# Watermark: Allow data to be up to 10 minutes late before being dropped.
windowed_counts = sentiment_stream \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"), # Tumbling window
        col("sentiment")
    ) \
    .count() \
    .orderBy("window")

# 6. Define Output Sink and Start the Query
# Output to console for debugging (use 'append', 'complete', or 'update' mode)
# 'update': Only rows updated since the last trigger will be output.
# 'complete': The entire updated result table will be output. Requires aggregation.
# 'append': Only new rows added since the last trigger will be output. Not suitable for aggregations without watermarking.
query = windowed_counts \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

# Keep the query running until terminated
query.awaitTermination()

# To stop programmatically (e.g., in another thread or based on a condition)
# query.stop()
# spark.stop()
```

**Explanation:**

*   **`spark.readStream.format("kafka")...load()`**: Sets up the connection to Kafka as a streaming source.
*   **`selectExpr("CAST(value AS STRING)")`**: Selects the Kafka message value and casts it from binary to a string.
*   **`from_json(col("value"), tweet_schema).alias("data")`**: Parses the JSON string in the `value` column according to `tweet_schema`, creating a struct column named `data`.
*   **`.select("data.*")`**: Promotes the fields within the `data` struct to top-level columns (`created_at`, `id_str`, `text`).
*   **`expr("to_timestamp(...)").alias("event_time")`**: Parses the string timestamp into a Spark `TimestampType` column, which is crucial for event-time processing. The format string must match the input exactly.
*   **`sentiment_udf = udf(...)`**: Defines a simple UDF for sentiment analysis (placeholder for a real model).
*   **`.withWatermark("event_time", "10 minutes")`**: Specifies the event-time column and the threshold for late data. Spark will keep state for windows until `max(event_time) - 10 minutes`. Data arriving later than this watermark will be dropped. This is essential for bounding state in aggregations.
*   **`groupBy(window(...), col(...))`**: Groups data by a time window (here, 5-minute non-overlapping/tumbling windows on `event_time`) and the `sentiment` column.
*   **`.count()`**: Aggregates the count within each group (window, sentiment).
*   **`writeStream.outputMode("update")...start()`**: Configures the output sink (console) and starts the streaming query execution. `update` mode is suitable for aggregations with watermarking, outputting only the rows whose counts changed in the current micro-batch.
*   **`query.awaitTermination()`**: Blocks the main thread, keeping the application alive to process the stream continuously.

**Use Cases:** Real-time monitoring dashboards, alerting systems, online machine learning model updates, IoT data processing.

---

### Advanced Concepts: Performance Tuning & Optimization

**Theory:**

Writing efficient PySpark code involves understanding how Spark executes jobs and applying optimization techniques.

1.  **Partitioning:**
    *   Data in Spark is processed in partitions. The number of partitions affects parallelism. Too few partitions lead to underutilization of cluster resources; too many can cause overhead (task scheduling, small data per task).
    *   **`df.rdd.getNumPartitions()`**: Check the current number of partitions.
    *   **`repartition(n)` / `repartition(col)`**: Increases or decreases the number of partitions, involving a full data shuffle. Use `repartition(col)` to partition data based on column values, which can optimize subsequent joins or groupBys on that column (co-location of data).
    *   **`coalesce(n)`**: Decreases the number of partitions *without* a full shuffle. It merges existing partitions on the same executor, making it more efficient than `repartition` for *reducing* partition count but can lead to uneven data distribution. Ideal before writing output to reduce the number of small files.
    *   **Configuration:** `spark.sql.shuffle.partitions` (default 200) controls the number of partitions created after shuffles (joins, groupBys). Tune based on data size and cluster capacity.

2.  **Caching/Persistence:** (Covered in Fraud Detection example) Re-use computed DataFrames efficiently. Monitor cache usage in Spark UI. Choose appropriate `StorageLevel` (MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY, etc.).

3.  **Broadcast Joins:**
    *   When joining a large DataFrame with a small one, Spark can *broadcast* the small DataFrame to all executors. This avoids shuffling the large DataFrame.
    *   Controlled by `spark.sql.autoBroadcastJoinThreshold` (default 10MB). Spark automatically broadcasts tables smaller than this threshold.
    *   Can be forced using `from pyspark.sql.functions import broadcast; large_df.join(broadcast(small_df), on="key")`.
    *   Monitor `explain()` output and Spark UI (DAG visualization) to see if broadcast joins are used.

4.  **Catalyst Optimizer & Tungsten:**
    *   **Catalyst:** Spark's extensible query optimizer. It analyzes DataFrame/SQL queries, applies optimization rules (e.g., predicate pushdown, column pruning, join reordering), and generates multiple logical and physical execution plans, choosing the most efficient one.
    *   **Tungsten:** Spark's execution engine. Optimizes Spark jobs by operating directly on binary data (off-heap memory management), reducing garbage collection overhead and improving CPU efficiency through whole-stage code generation.
    *   **Leverage Them:** Use DataFrame API and Spark SQL functions whenever possible, as they allow Catalyst and Tungsten to perform extensive optimizations. Avoid UDFs that hide logic from the optimizer.

5.  **Data Formats:** Choose efficient storage formats like Parquet or ORC. They offer columnar storage (only read needed columns), predicate pushdown (filter data at the storage layer), and good compression. Avoid text-based formats like CSV or JSON for intermediate or large datasets.

6.  **Spark UI:** An essential tool for monitoring and debugging. Access it typically via `http://<driver-node>:4040`. Key sections:
    *   **Jobs:** View active and completed jobs triggered by actions.
    *   **Stages:** See stages within jobs, identify shuffles, track task progress.
    *   **Storage:** Monitor cached/persisted RDDs/DataFrames and memory usage.
    *   **Environment:** Check Spark configuration properties.
    *   **Executors:** View resource usage (memory, disk, cores) per executor, check for bottlenecks or failures.
    *   **SQL:** Inspect query plans (logical, physical), identify bottlenecks like inefficient joins or scans.

7.  **Skew Handling:** Data skew (where some partitions have vastly more data than others due to specific key values) can cripple performance, as stages wait for the longest-running tasks. Techniques include:
    *   Salting: Add a random prefix/suffix to skewed keys before joining/grouping, then aggregate results afterwards.
    *   Splitting skewed keys: Handle highly frequent keys separately.
    *   Adaptive Query Execution (AQE, Spark 3.0+): Spark can dynamically optimize query plans at runtime, including handling skew by splitting/merging shuffle partitions (`spark.sql.adaptive.enabled=true`).

**Code Example Snippets (Illustrative):**

```python
# Check partitions
print(f"Number of partitions: {my_df.rdd.getNumPartitions()}")

# Repartition before a join/groupBy (can help co-location)
# Careful: Causes a shuffle
partitioned_df = my_df.repartition(200, "user_id") # Partition by user_id into 200 partitions

# Coalesce before writing to reduce small files (more efficient than repartition for reduction)
final_df_coalesced = aggregated_df.coalesce(10)
final_df_coalesced.write.parquet("path/to/output")

# Force broadcast join hint
from pyspark.sql.functions import broadcast
joined_df = large_df.join(broadcast(small_lookup_df), large_df["key"] == small_lookup_df["id"], "left")

# View execution plan
large_df.filter(col("value") > 100).groupBy("category").count().explain()
```

**Best Practices:**

*   Use DataFrames and Spark SQL for structured data.
*   Define explicit schemas when reading data.
*   Use efficient file formats (Parquet, ORC).
*   Filter data early (`filter()` before `join()` or `groupBy()`).
*   Avoid `collect()` on large DataFrames.
*   Use built-in functions over UDFs where possible. Consider Pandas UDFs otherwise.
*   Tune `spark.sql.shuffle.partitions` and executor/driver memory.
*   Leverage partitioning (`repartition`, `partitionBy` writing).
*   Use `cache()`/`persist()` strategically.
*   Monitor applications using the Spark UI.
*   Enable and understand Adaptive Query Execution (AQE) in Spark 3+.

---

This comprehensive set of notes covers PySpark fundamentals, architecture, core APIs through practical project examples, and advanced optimization techniques, suitable for professional training and reference.