# Data Processing Job
Pre-process the 2023 trip data for training (35M+ rows) and first quarter of 2024 (10M+ rows) data for testing using spark cluster and PySpark

### What Does this notebook do?
- Runs a pyspark script using PySparkProcessor on a spark cluster
- Production grade data processing:
    - Processes traing data first, stores the onhotencode and other transformation object to process test data
- PySpark Script
    - Input: Glue Data Catalog Table that represents schema unified NYC data from different monthly parquet files (generated from an ETL Glue job)
    - Ouptut: Processed/Transformed data, randomly partitioned to 128 shards.
    - Processing/Transformation:
        - Calculate Target
        - Filter Outliers
        - Extract Seasonality features i.e hour_of_the_day, day_of_the_week etc
        - Assigne rare categories to 'Other'
        - Encodes categorical features to OneHotEncoded variables stored in as vectors under single column "features"
        - Generates a summary for 0.1% sample of the data
        - Store metadata for features i.e. target/features names
        - Storing Preprocessr details to use to transform test data
                - Storing Pipeline=[

#### Pyspark Feature Extraction and Transformations:
https://spark.apache.org/docs/latest/ml-features.html

**Important Operations used here:**
- StringIndexer: Encodes string type column to indices/numbers based on frequency of strings i.e. if "hot" is most frequent strings it encodes it 0, then next frequent word to 1 and so on.
- OneHotEncoder: First you need to encode the strings/categories to indices using StringIndexer
- VectorAssembler: Assembles all ohe coded vectors into a single vector and put it in single column called "features" along with other numerical feautres in the dataframe.
- GroupBy: To create summaries

**Advices:**
- Do all pre-processing especially stateful (that needs a scan over whole dataset) transformations operations on data before distributed training, as processing is not synchornized over multi-node training over XGBoost estimator framework.
- Use parquet files, smaller in size, faster read and writes v/s csv. pandas.read_parque: just provide the directory and it will read all files in dir, sub dir, sub-sub dirs etc v/s pandas.read_csv doesn't support that.

In [None]:
import sagemaker
print(sagemaker.__version__)
# If < 2.27.0, upgrade:
#!pip install --upgrade sagemaker

In [7]:
import sagemaker 
import boto3 
import pandas as pd 
from datetime import datetime

In [None]:
REGION = sagemaker.session.Session().boto_region_name
print("REGION: ", REGION) 

boto3_session = boto3.Session(region_name=REGION)

sagemaker_boto3_client = boto3_session.client("sagemaker")
s3_boto3_client = boto3_session.client("s3")
sagemaker_session = sagemaker.session.Session(boto_session=boto3_session, sagemaker_client=sagemaker_boto3_client)

BUCKET = sagemaker_session.default_bucket()
PREFIX = "NYC_Taxi_Prediction"

ROLE=sagemaker.get_execution_role()
print("ROLE: ", ROLE)
print("BUCKET: ", BUCKET) 
print("PREFIX: ", PREFIX) 

s3_dir_uri = f"s3://{BUCKET}/{PREFIX}"
print(s3_dir_uri)

## Pipeline Parameters

Pipeline parameters are conceptually similar to command-line arguments (argparse) in a Python script. Both allow external users or systems to provide input values at runtime instead of hardcoding them.

As well, unlike command line args, these "Parameters" are automatically logged and tracked. 


In [None]:
# current timestamp
current_timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") 
print(current_timestamp)

# Pipeline Name and it's execution name
pipeline_name = "NYC-Taxi-Prediction"
pipeline_execution_name = f'{pipeline_name}-{current_timestamp}'

# Data Processing Parameters
s3_dataprocess_out_dir_uri = f'{s3_dir_uri}/01_dataprocessing_jobs/{pipeline_execution_name}'
print(s3_dataprocess_out_dir_uri)
#s3_dataprocess_out_dir_uri = "s3://sagemaker-us-east-1-205930620783/NYC_Taxi_Prediction/01_dataprocessing_jobs/NYC-Taxi-Prediction-2025-08-15-03-34-04"

## Data Preprocessing Step

# PySparkProcessor (v/s SKLearnProcessor)
It's recommended to use PySparkProcessor for distributed data processing using spark, as SKLearnProcessor doesn't support distributed processing unless the input script handles it. So even if you provide instance_count > 1 for SKLearnProcessor it still will use a single instance for the job.

While for distributed training, you can use XGBoost estimator and set parameters like instance_count>1 and/or S3ShardKey. The XGBoost estimator framework/Sagemaker automatically launches multiple instances and use RABIT protocol for distributed tree building and gradient computation among instances. 

You can use Spark ML Framework too i.e. MLib for distributed computing using "PySparkProcessor" too. But only use it for specific use-cases.


In [None]:
from sagemaker.spark.processing import PySparkProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

pyspark_processor = PySparkProcessor(
    base_job_name=f'{pipeline_name}-DataProc-train',
    framework_version='3.3',
    role=ROLE,
    sagemaker_session=sagemaker_session,
    instance_type="ml.m5.xlarge",
    instance_count=4,
    max_runtime_in_seconds=1200
    #dependency_location=,
)

pyspark_processor.run(
    inputs=[],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output/",
            destination=s3_dataprocess_out_dir_uri
        )
    ],
    submit_app="scripts/pyspark_data_processing.py",
    arguments=[
        "--mode", "train",
        "--database-name", "nyc_taxi_data",
        "--table-name", "schema_corrected_data",
        "--num-shards", "128",
        "--start-date", "2023-01-01",
        "--end-date", "2023-12-31",
        "--out-path", f"{s3_dataprocess_out_dir_uri}"
    ]#,spark_event_logs_s3_uri=s3_dataprocess_out_dir_uri
)


In [None]:
from sagemaker.spark.processing import PySparkProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

pyspark_processor = PySparkProcessor(
    base_job_name=f'{pipeline_name}-DataProc-test',
    framework_version='3.3',
    role=ROLE,
    sagemaker_session=sagemaker_session,
    instance_type="ml.m5.xlarge",
    instance_count=4,
    max_runtime_in_seconds=1200
)

pyspark_processor.run(
    submit_app="scripts/pyspark_data_processing.py",
    inputs=[
        ProcessingInput(
            source=f"{s3_dataprocess_out_dir_uri}/common_categories_pick_drop_loc.json",
            destination="/opt/ml/processing/input/"
        )
    ],
    arguments=[
        "--mode", "test",
        "--database-name", "nyc_taxi_data",
        "--table-name", "schema_corrected_data",
        "--num-shards", "128",
        "--start-date", "2024-01-01",
        "--end-date", "2024-03-30",
        "--out-path", f"{s3_dataprocess_out_dir_uri}"
    ]
)
