# Distributed Spark ETL + ML

This notebook implements live ETL ingestion and distributed ML training using Apache Spark.

# Distributed ETL & Machine Learning using Apache Spark

This notebook implements a parallel and distributed machine learning pipeline
on financial OHLCV data using Apache Spark Structured Streaming.

### Architecture Layers Covered
- Distributed Processing Layer
- Model Training Layer
- Model Aggregation Layer
- Analytics & Serving Layer

## 1. Spark Session Initialization
SparkSession is the entry point for Spark applications and enables distributed execution.


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, DoubleType, DateType, StringType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder \
    .appName("DistributedFinancialETL") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")


## 2. Schema Definition (OHLCV Data)
Defines the structure of the financial dataset.


In [2]:
schema = StructType() \
    .add("Date", DateType()) \
    .add("Open", DoubleType()) \
    .add("High", DoubleType()) \
    .add("Low", DoubleType()) \
    .add("Close", DoubleType()) \
    .add("Volume", DoubleType()) \
    .add("Symbol", StringType())


## 3. Distributed Streaming ETL
Incoming CSV files are processed as a continuous stream.


In [3]:
stream_df = spark.readStream \
    .schema(schema) \
    .option("maxFilesPerTrigger", 1) \
    .csv("/content/sample_data/data_stream/")

etl_df = stream_df.dropna()


## 4. Model Training, Aggregation & Serving
Each micro-batch triggers distributed training, evaluation, aggregation, and inference.


In [4]:
def train_model(batch_df, batch_id):
    if batch_df.count() == 0:
        return

    assembler = VectorAssembler(
        inputCols=["Open", "High", "Low", "Volume"],
        outputCol="features"
    )

    data = assembler.transform(batch_df).select("features", "Close")

    # Model Training Layer
    lr = LinearRegression(labelCol="Close")
    model = lr.fit(data)

    predictions = model.transform(data)

    evaluator = RegressionEvaluator(
        labelCol="Close",
        predictionCol="prediction",
        metricName="rmse"
    )

    rmse = evaluator.evaluate(predictions)
    print(f"Batch {batch_id} - RMSE: {rmse}")

    # Model Aggregation Layer
    model_path = f"/tmp/spark_ml_models/lr_model_batch_{batch_id}"
    model.write().overwrite().save(model_path)

    # Analytics & Serving Layer
    loaded_model = LinearRegressionModel.load(model_path)
    served_predictions = loaded_model.transform(
        assembler.transform(batch_df)
    )

    served_predictions.select("Date", "Close", "prediction").show(5)


## 5. Start Streaming Execution
The query runs continuously until manually stopped.


In [None]:
query = etl_df.writeStream \
    .foreachBatch(train_model) \
    .start()

query.awaitTermination()


Batch 1 - RMSE: 0.4117673563747198
+----------+------+------------------+
|      Date| Close|        prediction|
+----------+------+------------------+
|1999-03-10|45.665|  45.1474054424904|
|1999-03-11| 45.88|45.449368401321806|
|1999-03-12| 44.77|  44.7361557151062|
|1999-03-15|46.052| 45.60527753806399|
|1999-03-16|46.447|   46.187687470691|
+----------+------+------------------+
only showing top 5 rows
Batch 2 - RMSE: 0.5546923008937202
+----------+------+------------------+
|      Date| Close|        prediction|
+----------+------+------------------+
|2005-02-25|105.79|  105.753031334879|
|2005-02-28|105.08|104.95027324085483|
|2005-03-01|105.62|105.80692604266632|
|2005-03-02|105.57|105.99933384290371|
|2005-03-03|105.61|105.57199084636645|
+----------+------+------------------+
only showing top 5 rows
Batch 3 - RMSE: 0.14776488757051942
+----------+------+------------------+
|      Date| Close|        prediction|
+----------+------+------------------+
|2005-02-25|21.637|21.50484