In [None]:
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import pandas as pd
import numpy as np

# Load dataset
data = pd.read_csv("/Users/konstantinosdalampekis/VS_code/Quantum_Challenge/Quantum-Fraud-Detection-/Datasets/PS_20174392719_1491204439457_log.csv")

# Encode the 'type' column
encoder = OneHotEncoder(sparse_output=False)
type_encoded = encoder.fit_transform(data[['type']])
type_encoded_columns = [f"type_{cat}" for cat in encoder.categories_[0]]

# Create derived features
data['balance_diff_org'] = data['oldbalanceOrg'] - data['newbalanceOrig']
data['balance_diff_dest'] = data['oldbalanceDest'] - data['newbalanceDest']
data['amount_to_balance_ratio'] = data['amount'] / (data['oldbalanceOrg'] + 1e-5)

# Combine features into the final dataset
final_features = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 
                  'oldbalanceDest', 'newbalanceDest', 'balance_diff_org', 
                  'balance_diff_dest', 'amount_to_balance_ratio', 'isFraud']
data = pd.concat([data[final_features], pd.DataFrame(type_encoded, columns=type_encoded_columns)], axis=1)
data['isFraud'] = data['isFraud']  # Include target column

# Scale the features
scaler = StandardScaler()
data[final_features + type_encoded_columns] = scaler.fit_transform(data[final_features + type_encoded_columns])

# Display the processed dataset
print(data.head())

# Add sequential features
n = 5  # Number of previous transactions to consider

# Create cumulative statistics
data['cumulative_amount_last_n'] = data['amount'].rolling(window=n, min_periods=1).sum()
data['transaction_count_last_n'] = data['amount'].rolling(window=n, min_periods=1).count()

# Add high suspicion flag
data['high_suspicion_flag'] = (
    (data['amount_to_balance_ratio'] > 0.9) | 
    (data['balance_diff_org'] < -10000)
).astype(int)

# Remove redundant features
features_to_keep = [
    'step', 'amount', 'balance_diff_org', 'balance_diff_dest',
    'amount_to_balance_ratio', 'cumulative_amount_last_n',
    'transaction_count_last_n', 'high_suspicion_flag'
] + list(type_encoded_columns)

data = data[features_to_keep + ['isFraud']]  # Retain target column



In [None]:
import numpy as np
from gym import Env
from gym.spaces import Discrete, Box

class FraudDetectionEnv(Env):
    def __init__(self, data, max_steps=100):
        """
        Initialize the Fraud Detection Environment.
        :param data: Preprocessed transaction dataset.
        :param max_steps: Number of steps in each episode.
        """
        self.data = data
        self.features = data.columns.drop(['isFraud'])  # All features except target
        self.action_space = Discrete(2)  # Actions: {0: Non-Fraud, 1: Fraud}
        self.observation_space = Box(
            low=-np.inf, high=np.inf, shape=(len(self.features),), dtype=np.float32
        )
        self.max_steps = max_steps
        self.current_step = 0
        self.episode_rewards = 0
        self.current_index = 0

    def reset(self):
        """
        Reset the environment to start a new episode.
        :return: Initial state (features of the first transaction).
        """
        self.current_step = 0
        self.episode_rewards = 0
        self.current_index = np.random.randint(0, len(self.data) - self.max_steps)
        return self.data[self.features].iloc[self.current_index].values

    def step(self, action):
        """
        Perform an action and advance to the next transaction.
        :param action: 0 (Non-Fraud) or 1 (Fraud).
        :return: Tuple (next_state, reward, done, info).
        """
        row = self.data.iloc[self.current_index]
        is_fraud = row['isFraud']

        # Reward System
        if action == 1 and is_fraud == 1:  # Correct fraud detection (True Positive)
            reward = 2.0  # Increased reward for detecting fraud
        elif action == 1 and is_fraud == 0:  # False Positive
            reward = -0.5  # Slightly increased penalty for flagging legitimate
        elif action == 0 and is_fraud == 1:  # False Negative
            reward = -2.5  # Heavier penalty for missing fraud
        elif action == 0 and is_fraud == 0:  # True Negative
            reward = 0.5  # Unchanged

        # Bonus reward for suspicious patterns
        if action == 1 and (row['amount_to_balance_ratio'] > 0.9 or row['balance_diff_org'] < -5000):
            reward += 0.2

        # Update environment state
        self.episode_rewards += reward
        self.current_step += 1
        self.current_index += 1
        done = self.current_step >= self.max_steps or self.current_index >= len(self.data) - 1

        # Get the next state or mark the episode as done
        next_state = None if done else self.data[self.features].iloc[self.current_index].values

        return next_state, reward, done, {}

    def render(self):
        """
        Render the current episode summary.
        """
        print(f"Step: {self.current_step}, Total Rewards: {self.episode_rewards}")


Training Loop with Greedy Decay e

Hybrid QNN

Hybrid Actor

In [None]:
# import torch
# import torch.nn as nn
# import torch.optim as optim
# import numpy as np

# # class HybridActor(nn.Module):
# #     def __init__(self, num_qubits, num_layers, num_actions):
# #         super(HybridActor, self).__init__()
# #         self.q_weights = nn.Parameter(torch.randn(num_layers, num_qubits))
# #         self.fc = nn.Linear(num_qubits, num_actions)

# #     def forward(self, x):
# #         # Quantum layer: Variational Quantum Circuit (VQC)
# #         q_out = variational_circuit(x, self.q_weights.detach().numpy())
# #         q_out = torch.tensor(q_out, dtype=torch.float32)
        
# #         # Classical layer: Fully connected
# #         return torch.softmax(self.fc(q_out), dim=-1)

Actor Critic

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

# Define Actor-Critic Networks
class Actor(nn.Module):
    def __init__(self, input_dim, num_actions):
        super(Actor, self).__init__()
        self.fc = nn.Linear(input_dim, 128)
        self.out = nn.Linear(128, num_actions)

    def forward(self, x):
        x = torch.relu(self.fc(x))
        action_probs = torch.softmax(self.out(x), dim=-1)
        return action_probs


class Critic(nn.Module):
    def __init__(self, input_dim):
        super(Critic, self).__init__()
        self.fc = nn.Linear(input_dim, 128)
        self.out = nn.Linear(128, 1)

    def forward(self, x):
        x = torch.relu(self.fc(x))
        value = self.out(x)
        return value


# Initialize Environment, Actor, and Critic
env = FraudDetectionEnv(data, max_steps=100)
input_dim = len(env.features)
num_actions = env.action_space.n
actor = Actor(input_dim, num_actions)
critic = Critic(input_dim)

# Define Optimizers
actor_optimizer = optim.Adam(actor.parameters(), lr=0.01)
critic_optimizer = optim.Adam(critic.parameters(), lr=0.01)

# Training Parameters
num_episodes = 500
gamma = 0.99  # Discount factor

# Epsilon-Greedy Parameters
epsilon_start = 1.0
epsilon_end = 0.1
epsilon_decay = 0.995
epsilon = epsilon_start

# Training Loop
reward_history = []
for episode in range(num_episodes):
    state = env.reset()
    done = False
    episode_rewards = []
    log_probs = []
    values = []
    rewards = []

    while not done:
        state_tensor = torch.tensor(state, dtype=torch.float32)
        
        # Epsilon-Greedy Action Selection
        if np.random.rand() < epsilon:
            action = env.action_space.sample()  # Random action (exploration)
        else:
            action_probs = actor(state_tensor)
            action_dist = torch.distributions.Categorical(action_probs)
            action = action_dist.sample()
            log_prob = action_dist.log_prob(action)
            log_probs.append(log_prob)

        # Critic: Get state value
        value = critic(state_tensor)
        values.append(value)

        # Step environment
        next_state, reward, done, _ = env.step(action.item())
        rewards.append(reward)
        state = next_state if not done else None

    # Decay Epsilon
    epsilon = max(epsilon_end, epsilon * epsilon_decay)

    # Compute Returns (Discounted Rewards)
    returns = []
    G = 0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    returns = torch.tensor(returns, dtype=torch.float32)

    # Convert values to tensor
    values = torch.cat(values).squeeze()
    log_probs = torch.stack(log_probs)

    # Normalize Returns
    returns = (returns - returns.mean()) / (returns.std() + 1e-5)

    # Compute Advantages
    advantages = returns - values.detach()

    # Actor Loss (Policy Gradient)
    actor_loss = -(log_probs * advantages).mean()

    # Critic Loss (TD Error)
    critic_loss = advantages.pow(2).mean()

    # Optimize Actor and Critic
    actor_optimizer.zero_grad()
    actor_loss.backward()
    actor_optimizer.step()

    critic_optimizer.zero_grad()
    critic_loss.backward()
    critic_optimizer.step()

    # Track total rewards
    total_reward = sum(rewards)
    reward_history.append(total_reward)

    # Logging
    if episode % 10 == 0:
        print(f"Episode {episode}: Total Reward = {total_reward}, Epsilon = {epsilon:.4f}")

# Plot Training Rewards
plt.plot(reward_history)
plt.title("Total Rewards Over Episodes")
plt.xlabel("Episodes")
plt.ylabel("Total Reward")
plt.show()


### **Evaluation Metrics for Fraud Detection**

1. **Accuracy**:  
   Overall correctness of predictions.  
   \[
   \text{Accuracy} = \frac{\text{True Positives} + \text{True Negatives}}{\text{Total Predictions}}
   \]

2. **Precision**:  
   Proportion of correctly predicted fraud cases among all predicted fraud cases.  
   \[
   \text{Precision} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Positives}}
   \]

3. **Recall (Sensitivity)**:  
   Proportion of correctly predicted fraud cases out of all actual fraud cases.  
   \[
   \text{Recall} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Negatives}}
   \]

4. **F1-Score**:  
   Harmonic mean of precision and recall.  
   \[
   \text{F1-Score} = 2 \times \frac{\text{Precision} \times \text{Recall}}{\text{Precision} + \text{Recall}}
   \]

5. **AUC-ROC**:  
   Measures the trade-off between true positive rate and false positive rate.  


In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

def evaluate_agent(actor, env, num_episodes=100):
    """
    Evaluate the trained Actor-Critic agent on the environment.
    :param actor: Trained Actor network.
    :param env: FraudDetectionEnv environment.
    :param num_episodes: Number of episodes for evaluation.
    :return: Metrics and visualization data.
    """
    y_true = []
    y_pred = []

    for episode in range(num_episodes):
        state = env.reset()
        done = False

        while not done:
            state_tensor = torch.tensor(state, dtype=torch.float32)
            action_probs = actor(state_tensor)
            action = torch.argmax(action_probs).item()

            # Store true label and prediction
            y_true.append(env.data.iloc[env.current_index]['isFraud'])
            y_pred.append(action)

            # Step the environment
            state, _, done, _ = env.step(action)

    # Compute metrics
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    auc_roc = roc_auc_score(y_true, y_pred)

    metrics = {
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1-Score': f1,
        'AUC-ROC': auc_roc
    }

    # Confusion Matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Non-Fraud", "Fraud"], yticklabels=["Non-Fraud", "Fraud"])
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()

    # Print Metrics
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")

    return metrics, y_true, y_pred

# Evaluate the trained agent
metrics, y_true, y_pred = evaluate_agent(actor, env)
