# End-to-End Deployment & Integration for Real-Time Transaction Success Prediction

This Jupyter Notebook provides a comprehensive implementation for deploying and integrating the best-performing transaction success prediction model from Phase 4 of our Transaction Success Rate Optimization Project.

## Table of Contents 
1. [Load Best Model & Preprocess Data](#1-load-best-model--preprocess-data)
2. [Build API with FastAPI](#2-build-api-with-fastapi)
3. [Deploy Model as a Microservice](#3-deploy-model-as-a-microservice)
4. [Implement CI/CD Pipeline](#4-implement-cicd-pipeline)
5. [Conduct A/B Testing for Business Impact](#5-conduct-ab-testing-for-business-impact)
6. [Monitor & Improve the Deployment](#6-monitor--improve-the-deployment)


<a id="home"></a>

<a id="1-load-best-model--preprocess-data"></a>

## 1. Load Best Model & Preprocess Data

First, let's import necessary libraries and load our best-performing model.

[Back to Top](#home)

In [1]:
import sys
import os

# Get the absolute path of the parent directory
sys.path.insert(0, os.path.abspath('../'))

# import the necessary packages
from src.dependencies import *
from src.utils import *


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Define paths
MODEL_PATH = "../models/Optimized XGBoost.pkl"
PIPELINE_PATH = "../models/feature_engineering_pipeline.pkl"

In [3]:
def load_model_and_pipeline():
    """
    Load the trained model and preprocessing pipeline
    """
    try:
        logger.info(f"Loading model from {MODEL_PATH}")
        model = joblib.load(MODEL_PATH)
        
        logger.info(f"Loading preprocessing pipeline from {PIPELINE_PATH}")
        preprocessing_pipeline = joblib.load(PIPELINE_PATH)
        
        return model, preprocessing_pipeline
    except Exception as e:
        logger.error(f"Error loading model or pipeline: {str(e)}")
        raise

In [4]:
# Load model and preprocessing pipeline
model, preprocessing_pipeline = load_model_and_pipeline()

logger.info(f"Model type: {type(model).__name__}")
logger.info(f"Pipeline type: {type(preprocessing_pipeline).__name__}")

2025-03-22 19:42:04,779 - __main__ - INFO - Loading model from ../models/Optimized XGBoost.pkl
2025-03-22 19:42:04,972 - __main__ - INFO - Loading preprocessing pipeline from ../models/feature_engineering_pipeline.pkl
2025-03-22 19:42:04,972 - __main__ - INFO - Model type: Pipeline
2025-03-22 19:42:04,972 - __main__ - INFO - Pipeline type: Pipeline


In [5]:
def preprocess_transaction_data(transaction_data):
    """
    Preprocess incoming transaction data using the feature engineering pipeline
    
    Args:
        transaction_data (dict or pd.DataFrame): Raw transaction data
        
    Returns:
        pd.DataFrame: Preprocessed transaction features ready for prediction
    """
    try:
        # Convert dict to DataFrame if necessary
        if isinstance(transaction_data, dict):
            transaction_df = pd.DataFrame([transaction_data])
        else:
            transaction_df = transaction_data.copy()
            
        # Apply the preprocessing pipeline
        preprocessed_data = preprocessing_pipeline.fit_transform(transaction_df)
        
        return preprocessed_data
    except Exception as e:
        logger.error(f"Error preprocessing transaction data: {str(e)}")
        raise

def make_prediction(transaction_data):
    """
    Make prediction on transaction success probability
    
    Args:
        transaction_data (dict or pd.DataFrame): Raw transaction data
        
    Returns:
        dict: Prediction results with success probability
    """
    try:
        # Preprocess the data
        preprocessed_data = preprocess_transaction_data(transaction_data)
        
        # Make prediction
        success_probability = model.predict_proba(preprocessed_data)[:, 1]
        
        # Prepare results
        results = {
            "transaction_id": transaction_data.get("transaction_id", "unknown"),
            "success_probability": float(success_probability[0]),
            "recommended_action": "route" if success_probability[0] > 0.5 else "review"
        }
        
        return results
    except Exception as e:
        logger.error(f"Error making prediction: {str(e)}")
        raise

In [6]:
# Test with a sample transaction
sample_transaction = {
    "transaction_id": "test-456",
    "timestamp": "2025-03-21 14:30:45",
    "merchant_id": "MERCH00123",
    "customer_id": "CUST002345",
    "customer_location": "suburban",
    "payment_amount": 75.50,
    "payment_method": "credit_card",
    "device_type": "mobile",
    "network_latency": 98.45,
    "result": "pending",
    "latency_bin_encoded": 2,
    "network_latency_scaled": 0.142367,
    "merchant_rolling_avg_amount_scaled": 0.05672,
    "merchant_success_rate_scaled": 0.912345,
    "device_success_rate_scaled": 0.974562,
    "payment_method_rolling_success_scaled": 0.899874,
    "location_success_rate_scaled": 0.765432,
    "payment_location_success_rate_scaled": 0.832145,
    "merchant_transaction_count_log": 4.56789,
    "hourly_transaction_volume_log": 6.12345,
    'amount_log': 4.321,
    'merchant_rolling_avg_amount': 100.0,
    'hourly_transaction_volume': 1000,
    'merchant_success_rate': 0.95,
    'time_of_day': 'afternoon',
    'latency_bin': 'medium',
    'day_name': 'Monday',
    'amount_bin': 'low',
}


prediction_result = make_prediction(sample_transaction)
print(f"Prediction result: {prediction_result}")

Prediction result: {'transaction_id': 'test-456', 'success_probability': 0.9820582270622253, 'recommended_action': 'route'}


<a id="2-build-api-with-fastapi"></a>

## 2. Build API with FastAPI

Now, let's create a REST API using FastAPI to serve our model.


[Back to Top](#home)



In [9]:
# app.py content
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Load model and pipeline
MODEL_PATH = "../models/Optimized XGBoost.pkl"
PIPELINE_PATH = "../models/feature_engineering_pipeline.pkl"

model = joblib.load(MODEL_PATH)
preprocessing_pipeline = joblib.load(PIPELINE_PATH)

# Initialize FastAPI app
app = FastAPI(
    title="Transaction Success Predictor API",
    description="API for predicting transaction success probability",
    version="1.0.0"
)

# Define data models
class TransactionData(BaseModel):
    transaction_id: str = Field(..., description="Unique transaction identifier")
    timestamp: str = Field(..., description="Timestamp of the transaction in YYYY-MM-DD HH:MM:SS format")
    merchant_id: str = Field(..., description="Unique identifier for the merchant")
    customer_id: str = Field(..., description="Unique identifier for the customer")
    customer_location: str = Field(..., description="Location type of the customer (urban, suburban, etc.)")
    payment_amount: float = Field(..., description="Transaction payment amount")
    payment_method: str = Field(..., description="Payment method used (credit_card, bank_transfer, etc.)")
    device_type: str = Field(..., description="Type of device used for transaction (mobile, web, etc.)")
    network_latency: float = Field(..., description="Latency in milliseconds for transaction processing")
    result: str = Field(..., description="Transaction result (success, pending, failed)")
    
    latency_bin_encoded: int = Field(..., description="Encoded category of network latency")
    network_latency_scaled: float = Field(..., description="Scaled network latency value")
    merchant_rolling_avg_amount_scaled: float = Field(..., description="Scaled rolling average transaction amount for the merchant")
    merchant_success_rate_scaled: float = Field(..., description="Scaled success rate of the merchant")
    device_success_rate_scaled: float = Field(..., description="Scaled success rate of transactions per device")
    payment_method_rolling_success_scaled: float = Field(..., description="Scaled rolling success rate for the payment method")
    location_success_rate_scaled: float = Field(..., description="Scaled success rate of transactions per customer location")
    payment_location_success_rate_scaled: float = Field(..., description="Scaled success rate of transactions per payment location")
    
    merchant_transaction_count_log: float = Field(..., description="Log-transformed count of transactions for the merchant")
    hourly_transaction_volume_log: float = Field(..., description="Log-transformed transaction volume per hour")
    amount_log: float = Field(..., description="Log-transformed transaction amount")
    merchant_rolling_avg_amount: float = Field(..., description="Rolling average transaction amount for the merchant")
    hourly_transaction_volume: int = Field(..., description="Total transaction volume per hour")
    merchant_success_rate: float = Field(..., description="Overall success rate of the merchant")
    
    time_of_day: str = Field(..., description="Time of the day category (morning, afternoon, evening, etc.)")
    latency_bin: str = Field(..., description="Latency bin category (low, medium, high)")
    day_name: str = Field(..., description="Name of the day (Monday, Tuesday, etc.)")
    amount_bin: str = Field(..., description="Transaction amount category (low, medium, high)")

    class Config:
        json_schema_extra = {
            "example": {
                "transaction_id": "test-456",
                "timestamp": "2025-03-21 14:30:45",
                "merchant_id": "MERCH00123",
                "customer_id": "CUST002345",
                "customer_location": "suburban",
                "payment_amount": 75.50,
                "payment_method": "credit_card",
                "device_type": "mobile",
                "network_latency": 98.45,
                "result": "pending",
                "latency_bin_encoded": 2,
                "network_latency_scaled": 0.142367,
                "merchant_rolling_avg_amount_scaled": 0.05672,
                "merchant_success_rate_scaled": 0.912345,
                "device_success_rate_scaled": 0.974562,
                "payment_method_rolling_success_scaled": 0.899874,
                "location_success_rate_scaled": 0.765432,
                "payment_location_success_rate_scaled": 0.832145,
                "merchant_transaction_count_log": 4.56789,
                "hourly_transaction_volume_log": 6.12345,
                "amount_log": 4.321,
                "merchant_rolling_avg_amount": 100.0,
                "hourly_transaction_volume": 1000,
                "merchant_success_rate": 0.95,
                "time_of_day": "afternoon",
                "latency_bin": "medium",
                "day_name": "Monday",
                "amount_bin": "low"
            }
        }

class PredictionResponse(BaseModel):
    transaction_id: str
    success_probability: float
    recommended_action: str
    prediction_time: str
    model_version: str

class FeedbackData(BaseModel):
    transaction_id: str
    actual_success: bool
    prediction_probability: float
    
    class Config:
        json_schema_extra = {
            "example": {
                "transaction_id": "tx-12345",
                "actual_success": True,
                "prediction_probability": 0.87
            }
        }

class HealthResponse(BaseModel):
    status: str
    timestamp: str
    uptime: float
    model_version: str

# Track API start time
start_time = time.time()
feedback_data = []

# Helper functions
def preprocess_transaction_data(transaction_data):
    """Preprocess transaction data using the pipeline"""
    if isinstance(transaction_data, dict):
        transaction_df = pd.DataFrame([transaction_data])
    else:
        transaction_df = pd.DataFrame([transaction_data.dict()])
    
    # Convert to expected format
    return preprocessing_pipeline.fit_transform(transaction_df)

def log_feedback(feedback: FeedbackData):
    """Log feedback data for later model retraining"""
    feedback_data.append(feedback.dict())
    
    # If we've collected enough feedback, save to disk
    if len(feedback_data) >= 100:
        df = pd.DataFrame(feedback_data)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        df.to_csv(f"./feedback/feedback_data_{timestamp}.csv", index=False)
        feedback_data.clear()
    
    logger.info(f"Feedback logged for transaction {feedback.transaction_id}")

# API endpoints
app = FastAPI()

# Get absolute path
static_dir = os.path.join(os.path.dirname('__file__'), "static")

# Ensure directory exists
if not os.path.exists(static_dir):
    os.makedirs(static_dir)

# Mount the "static" directory
app.mount("/static", StaticFiles(directory=static_dir), name="static")

@app.get("/")
def read_root():
    return {"message": "Welcome to the Transaction Success Predictor API"}

@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
    favicon_path = os.path.join(static_dir, "favicon.ico")
    if os.path.exists(favicon_path):
        return FileResponse(favicon_path)
    return {"error": "favicon.ico not found"}

@app.get("/", response_model=dict)
async def root():
    return {"message": "Welcome to the Transaction Success Predictor API"}

@app.post("/predict", response_model=PredictionResponse)
async def predict(transaction: TransactionData):
    """
    Predict transaction success probability
    """
    try:
        # Preprocess data
        preprocessed_data = preprocess_transaction_data(transaction)
        
        # Make prediction
        success_probability = float(model.predict_proba(preprocessed_data)[:, 1][0])
        
        # Determine recommended action
        recommended_action = "route" if success_probability > 0.5 else "review"
        
        # Prepare response
        response = {
            "transaction_id": transaction.transaction_id,
            "success_probability": success_probability,
            "recommended_action": recommended_action,
            "prediction_time": datetime.now().isoformat(),
            "model_version": "1.0.0"
        }
        
        logger.info(f"Prediction made for transaction {transaction.transaction_id}: {success_probability:.4f}")
        return response
        
    except Exception as e:
        logger.error(f"Error making prediction: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Prediction error: {str(e)}")

@app.post("/feedback")
async def feedback(feedback_data: FeedbackData, background_tasks: BackgroundTasks):
    """
    Log transaction outcome feedback for continuous learning
    """
    try:
        # Process feedback in the background to not block the API
        background_tasks.add_task(log_feedback, feedback_data)
        return {"status": "success", "message": "Feedback received"}
    except Exception as e:
        logger.error(f"Error processing feedback: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Feedback processing error: {str(e)}")

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """
    Check if the API is running correctly
    """
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "uptime": time.time() - start_time,
        "model_version": "1.0.0"
    }

# Run the API with uvicorn
if __name__ == "__main__":
    import uvicorn
    uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)

INFO:     Will watch for changes in these directories: ['c:\\Users\\USER\\Documents\\Documents\\Personal Development\\Portfolio Projects\\transaction_success_optimization\\notebooks']
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [7508] using StatReload
INFO:     Stopping reloader process [7508]


In [11]:
# Define the API endpoint URL (assuming it's running locally)
API_URL = "http://localhost:8000"

# Test the predict endpoint
def test_predict_endpoint():
    sample_transaction = {
        "transaction_id": "test-789",
        "timestamp": "2025-03-22 09:15:30",
        "merchant_id": "MERCH00987",
        "customer_id": "CUST004567",
        "customer_location": "urban",
        "payment_amount": 150.75,
        "payment_method": "debit_card",
        "device_type": "web",
        "network_latency": 120.67,
        "result": "success",
        "latency_bin_encoded": 3,
        "network_latency_scaled": 0.189654,
        "merchant_rolling_avg_amount_scaled": 0.07895,
        "merchant_success_rate_scaled": 0.945678,
        "device_success_rate_scaled": 0.985432,
        "payment_method_rolling_success_scaled": 0.923456,
        "location_success_rate_scaled": 0.812345,
        "payment_location_success_rate_scaled": 0.876543,
        "merchant_transaction_count_log": 5.12345,
        "hourly_transaction_volume_log": 6.78901,
        'amount_log': 5.012,
        'merchant_rolling_avg_amount': 200.0,
        'hourly_transaction_volume': 1200,
        'merchant_success_rate': 0.97,
        'time_of_day': 'morning',
        'latency_bin': 'high',
        'day_name': 'Tuesday',
        'amount_bin': 'medium',
    }
    
    response = requests.post(
        f"{API_URL}/predict",
        json=sample_transaction
    )
    
    print(f"Status Code: {response.status_code}")
    print(f"Response: {json.dumps(response.json(), indent=2)}")
    
    return response.json()

# Test the feedback endpoint
def test_feedback_endpoint(transaction_id, actual_success, prediction_probability):
    feedback_data = {
        "transaction_id": transaction_id,
        "actual_success": actual_success,
        "prediction_probability": prediction_probability
    }
    
    response = requests.post(
        f"{API_URL}/feedback",
        json=feedback_data
    )
    
    print(f"Status Code: {response.status_code}")
    print(f"Response: {json.dumps(response.json(), indent=2)}")

# Test the health endpoint
def test_health_endpoint():
    response = requests.get(f"{API_URL}/health")
    
    print(f"Status Code: {response.status_code}")
    print(f"Response: {json.dumps(response.json(), indent=2)}")

# Run the tests
prediction = test_predict_endpoint()
test_feedback_endpoint(
    prediction["transaction_id"],
    True,
    prediction["success_probability"]
)
test_health_endpoint()

Status Code: 200
Response: {
  "transaction_id": "test-789",
  "success_probability": 0.9820582270622253,
  "recommended_action": "route",
  "prediction_time": "2025-03-22T19:46:42.752450",
  "model_version": "1.0.0"
}
Status Code: 200
Response: {
  "status": "success",
  "message": "Feedback received"
}
Status Code: 200
Response: {
  "status": "healthy",
  "timestamp": "2025-03-22T19:46:46.830827",
  "uptime": 10969.854054927826,
  "model_version": "1.0.0"
}


## 3. Deploy Model as a Microservice
<a id="3-deploy-model-as-a-microservice"></a>

Let's create a Dockerfile to containerize our API.  

[Back to Top](#home)

#### 🚀 Running FastAPI Application with Docker

##### **1️⃣ Install Docker**
Ensure Docker is installed on your system. If not, download and install it from:  
🔗 [https://www.docker.com/get-started](https://www.docker.com/get-started)

---

##### **2️⃣ Prepare the Required Files**
Ensure your project directory contains:
- `Dockerfile` (already provided)
- `requirements.txt` (containing dependencies)
- `app.py` (your FastAPI application)
- A `models/` directory (if required by your app)

---

##### **3️⃣ Create the Dockerfile**
Create a `Dockerfile` with the following content:

```dockerfile
# Use a lightweight Python base image
FROM python:3.9-slim

# Set the working directory
WORKDIR /app

# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy model files
COPY ./models/ ./models/

# Create directory for feedback data
RUN mkdir -p ./feedback

# Copy the FastAPI application
COPY app.py .

# Expose port 8000
EXPOSE 8000

# Start the application using Gunicorn + Uvicorn
CMD ["gunicorn", "app:app", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
```
---

##### 4️⃣ Create a Build & Run Script

Create a new file named build_and_run.sh and add the following content:
```
#!/bin/bash

# Build Docker image
docker build -t transaction-success-predictor:latest .

# Run the container
docker run -d -p 8000:8000 --name transaction-predictor transaction-success-predictor:latest

echo "✅ API is running at http://localhost:8000"
echo "📄 Documentation available at http://localhost:8000/docs"
```

##### 5️⃣ Grant Execution Permission to the Script

Run the following command in your terminal to make the script executable:
```
chmod +x build_and_run.sh
```
##### 6️⃣ Build and Run the Docker Container

Execute the script to build the Docker image and start the container:
```
./build_and_run.sh
```
##### 7️⃣ Verify That the API is Running

Once the container is running, visit:

🔗 [Base URL: ]('http://localhost:8000')http://localhost:8000  
🔗 [Swagger Docs: ]('http://localhost:8000/docs')http://localhost:8000/docs

##### 8️⃣ Stopping and Removing the Container

To stop the running container:
```
docker stop transaction-predictor
```
To remove the container:
```
docker rm transaction-predictor
```
To delete the built Docker image:
```
docker rmi transaction-success-predictor:latest
```

##### 9️⃣ Deploy to AWS

To deploy the application to AWS, run the deploy_to_aws.py script inside the app folder:  

```
python app/deploy_to_aws.py

```

Ensure that:  

1. You have AWS CLI configured with the necessary credentials.
2. The script is set up to handle deployment correctly.


## 4. Implement CI/CD Pipeline

<a id="4-implement-cicd-pipeline"></a>

Let's create a GitHub Actions workflow to automate our CI/CD pipeline.    

[Back to Top](#home)

##### A yml file has been created `.github/workflows/deploy.yml` for GitHub Actions workflow to automate our CI/CD pipeline.  
##### We can also use the unit test file `tests/test_app.py` to ensure our API is working correctly

## 5. Conduct A/B Testing for Business Impact

<a id="5-conduct-ab-testing-for-business-impact"></a>

We will implement an A/B testing framework to compare the old routing strategy versus our new model-based routing strategy with `ab_testing.py` file.    

[Back to Top](#home)

## 6. Monitor & Improve the Deployment

<a id="6-monitor--improve-the-deployment"></a>

Monitoring and development improvement will be done with the files `app/monitoring.py` and `scr/model_retrainig.py`.    

[Back to Top](#home)