In [1]:
!pip install Faker

Collecting Faker
  Downloading Faker-30.8.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-30.8.0-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Faker
Successfully installed Faker-30.8.0


In [2]:
import pandas as pd
from faker import Faker
from datetime import datetime

fake = Faker()

def generate_sanctioned_entities():
    global sanctioned_countries
    # List of sanctioned countries
    sanctioned_countries = ["Russia", "China", "North Korea", "Cuba", "Myanmar", "Venezuela", "Panama"]

    # Define specific risk levels for each country
    country_risk_mapping = {
        "Russia": "High", "China": "Medium",
        "North Korea": "High", "Cuba": "Medium",
        "Venezuela": "Medium", "Panama": "Low"
    }

    sanctioned_entities_data = []

    for i in range(1, len(sanctioned_countries) + 1):
        sanction_id = f"SANC{i:04d}"
        entity_name = f"Entity {fake.company()}"
        country = sanctioned_countries[i - 1]
        risk_level = country_risk_mapping[country]  # Assign risk level based on country mapping
        blacklist_date = fake.date_between(start_date='-10y', end_date='today')

        sanctioned_entities_data.append([sanction_id, entity_name, country, risk_level, blacklist_date])

    sanctioned_entities_df1 = pd.DataFrame(sanctioned_entities_data, columns=[
        "SanctionID", "EntityName", "Country", "RiskLevel", "BlacklistDate"
    ])

    return sanctioned_entities_df1

In [3]:
def generate_customer_data(num_customers):
    customer_data = []
    global countries, sanctioned_countries

    job_titles = ["Engineer", "Doctor", "Teacher", "Software Developer", "Data Scientist",
                  "Banker", "Consultant", "Artist", "Freelancer", "Manager", "Sales Executive"]
    countries = ["USA", "Germany", "France", "Japan", "India", "Brazil", "UK", "Canada"]
    sanctioned_countries = ["Russia", "China", "North Korea", "Cuba", "Venezuela", "Panama"]
    customer_account_types = ["Personal", "Business", "VIP", "Basic"]
    account_types = ["Loan", "Investment", "Saving", "Retirement"]

    for i in range(1000, num_customers+1000):
        customer_id = f"CUST{i:06d}"
        account_id = f"ACC{i:06d}"
        customer_name = fake.name()
        dob = fake.date_of_birth(minimum_age=18, maximum_age=60)
        country = random.choice(countries)
        address = fake.address()
        gender = random.choice(["Male", "Female"])
        job_title = random.choice(job_titles)
        AccountType = random.choice(account_types)
        CustomerAccount = random.choice(customer_account_types)

        # Salary will depend on the job title
        if job_title in ["Engineer", "Doctor", "Software Developer", "Data Scientist"]:
            salary = round(random.uniform(50000, 200000), 2)
        elif job_title in ["Teacher", "Artist", "Freelancer"]:
            salary = round(random.uniform(30000, 80000), 2)
        else:
            salary = round(random.uniform(40000, 150000), 2)

        customer_data.append([customer_id, account_id, customer_name, dob, country, address, gender, job_title, salary, AccountType, CustomerAccount])
    customer_df = pd.DataFrame(customer_data, columns=[
        "CustomerID", "AccountID", "CustomerName", "DateOfBirth", "Country", "Address", "Gender", "JobTitle", "Salary", "AccountType", "CustomerAccount"
    ])
    return customer_df

In [None]:
def generate_unique_transaction_ids(num_transactions):
    transaction_ids = [f"TRANS{i:06d}" for i in range(1000, num_transactions + 1000)]
    return list(transaction_ids)

In [None]:
def generate_transactions(customer_df, transaction_ids, num_transactions_per_customer=20):
    transaction_data = []
    transaction_types = ["Deposit", "Withdrawal", "Transfer", "Payment"]

    transaction_idx = 0
    for _, customer in customer_df.iterrows():
        customer_id = customer['CustomerID']
        account_id = customer['AccountID']
        for j in range(num_transactions_per_customer):
            if transaction_idx >= len(transaction_ids):
                raise IndexError(f"Not enough transaction IDs for the total number of transactions required!")

            transaction_id = transaction_ids[transaction_idx]  # Use existing TransactionID
            transaction_idx += 1  # Move to the next ID
            transaction_date = fake.date_time_between(start_date='-1y', end_date='now')
            transaction_type = random.choice(transaction_types)

            # Usual transactions: 50 - 100,000, but 35% above 500,000 for anomaly detection
            if random.random() < 0.35:
                transaction_amount = round(random.uniform(500000, 1000000), 2)
            else:
                transaction_amount = round(random.uniform(50, 100000), 2)

            transaction_location = random.choice(countries + sanctioned_countries)

            transaction_data.append([
                transaction_id, customer_id, account_id, transaction_date, transaction_type, transaction_amount, transaction_location
            ])

    transaction_df = pd.DataFrame(transaction_data, columns=[
        "TransactionID", "CustomerID", "AccountID", "TransactionDate", "TransactionType", "TransactionAmount", "TransactionLocation"
    ])

    return transaction_df

In [None]:
def generate_cross_border_transactions(customer_df, transaction_ids, num_transactions_per_customer=20):
    cross_border_data = []
    transaction_types = ["Deposit", "Withdrawal", "Transfer", "Payment"]

    transaction_idx = 0
    for _, customer in customer_df.iterrows():
        customer_id = customer['CustomerID']
        account_id = customer['AccountID']
        for j in range(num_transactions_per_customer):
            transaction_id = transaction_ids[transaction_idx]  # Use existing TransactionID
            transaction_idx += 1
            transaction_date = fake.date_time_between(start_date='-1y', end_date='now')
            transaction_type = random.choice(transaction_types)

            # Normal and large transactions
            if random.random() < 0.4:
                transaction_amount = round(random.uniform(500000, 1000000), 2)
            else:
                transaction_amount = round(random.uniform(50, 100000), 2)

            # 30% of customers have transactions in sanctioned countries
            if random.random() < 0.3:
                transaction_location = random.choice(sanctioned_countries)
            else:
                transaction_location = random.choice(countries)

            cross_border_data.append([
                transaction_id, customer_id, account_id, transaction_date, transaction_type, transaction_amount, transaction_location
            ])

    cross_border_df = pd.DataFrame(cross_border_data, columns=[
        "TransactionID", "CustomerID", "AccountID", "TransactionDate", "TransactionType", "TransactionAmount", "TransactionLocation"
    ])

    return cross_border_df

In [None]:
# Generate Sanctioned Entities Transactions
def generate_sanctioned_entities_transactions(customer_df, transaction_ids, num_transactions_per_customer=20):
    sanctioned_entities_data = []

    transaction_idx = 0
    for _, customer in customer_df.iterrows():
        customer_id = customer['CustomerID']
        customer_name = customer['CustomerName']
        customer_country = customer['Country']
        customer_dob = customer['DateOfBirth']

        for j in range(num_transactions_per_customer):
            transaction_id = transaction_ids[transaction_idx]  # Use existing TransactionID
            transaction_idx += 1
            transaction_date = fake.date_time_between(start_date='-1y', end_date='now')

            # 30% of customers have transactions in sanctioned countries
            if random.random() < 0.3:
                transaction_location = random.choice(sanctioned_countries)
            else:
                transaction_location = customer_country  # Normal country for the customer

            sanctioned_entities_data.append([
                transaction_id, customer_id, customer_name, customer_country, customer_dob, transaction_date, transaction_location
            ])

    sanctioned_entities_df = pd.DataFrame(sanctioned_entities_data, columns=[
        "TransactionID", "CustomerID", "CustomerName", "Country", "DateOfBirth", "TransactionDate", "TransactionLocation"
    ])

    return sanctioned_entities_df

In [None]:
# Generate Unmatched Accounts Transactions
def generate_unmatched_accounts(customer_df, transaction_ids, num_transactions_per_customer=1):
    unmatched_data = []
    transaction_idx = 0

    for _, customer in customer_df.iterrows():
        customer_id = customer['CustomerID']
        account_id = customer['AccountID']
        account_balance = round(random.uniform(10000, 500000), 2)  # Initial account balance
        open_date = fake.date_between(start_date="-5y", end_date="-1y")

        # Randomly decide if the account is still open or closed for a short period
        if random.random() < 0.30:  # 30% of accounts closed shortly after opening
            close_date = fake.date_between(start_date=open_date, end_date=open_date + timedelta(days=30))
        elif random.random() < 0.4:  # 40% of accounts closed later
            close_date = fake.date_between(start_date=open_date, end_date="today")
        else:  # 30% of accounts remain open (no close date)
            close_date = None

        for j in range(num_transactions_per_customer):
            transaction_id = transaction_ids[transaction_idx]  # Use existing TransactionID
            transaction_idx += 1
            end_date = close_date if close_date else datetime.now().date()
            transaction_date = fake.date_time_between(start_date=open_date, end_date=end_date)
            transaction_amount = round(random.uniform(50, 50000), 2)

            # Deduct transaction amount from account balance
            account_balance -= transaction_amount

            unmatched_data.append([
                account_id, customer_id, open_date, close_date, account_balance, transaction_id, transaction_amount, transaction_date
            ])

    unmatched_df = pd.DataFrame(unmatched_data, columns=[
        "AccountID", "CustomerID", "OpenDate", "CloseDate", "AccountBalance", "TransactionID", "TransactionAmount", "TransactionDate"
    ])

    return unmatched_df

In [None]:
# Generate Customer Behavior Transactions
def generate_customer_behavior_transactions(customer_df, transaction_ids, num_transactions_per_customer=20):
    behavior_data = []
    transaction_idx = 0

    for _, customer in customer_df.iterrows():
        customer_id = customer['CustomerID']
        for j in range(num_transactions_per_customer):
            transaction_id = transaction_ids[transaction_idx]  # Use existing TransactionID
            transaction_idx += 1
            transaction_date = fake.date_time_between(start_date='-1y', end_date='now')

            # Regular transactions, with occasional spikes
            if random.random() < 0.4:
                transaction_amount = round(random.uniform(100000, 500000), 2)
            else:
                transaction_amount = round(random.uniform(50, 10000), 2)

            behavior_data.append([
                transaction_id, customer_id, transaction_amount, transaction_date
            ])

    behavior_df = pd.DataFrame(behavior_data, columns=[
        "TransactionID", "CustomerID", "TransactionAmount", "TransactionDate"
    ])

    return behavior_df

In [None]:
# Generate Time Gap Transactions
def generate_time_gap_transactions(customer_df, transaction_ids, num_transactions_per_customer=20):
    time_gap_data = []
    transaction_idx = 0

    for _, customer in customer_df.iterrows():
        customer_id = customer['CustomerID']
        prev_transaction_date = None
        for j in range(num_transactions_per_customer):
            transaction_id = transaction_ids[transaction_idx]  # Use existing TransactionID
            transaction_idx += 1

            # Generate small time gaps for some transactions and large for others
            if prev_transaction_date:
                if random.random() < 0.5:  # Small gap (same day)
                    transaction_date = prev_transaction_date + timedelta(hours=random.randint(1, 6))
                else:  # Next day
                    transaction_date = prev_transaction_date + timedelta(days=1)
            else:
                transaction_date = fake.date_time_between(start_date='-1y', end_date='now')

            prev_transaction_date = transaction_date
            time_gap_data.append([transaction_id, customer_id, transaction_date])

    time_gap_df = pd.DataFrame(time_gap_data, columns=["TransactionID", "CustomerID", "TransactionDate"])

    return time_gap_df

In [None]:
import xlsxwriter
# Number of customers and transactions per customer
num_customers = 20000
num_transactions_per_customer = 20
total_transactions = num_customers * num_transactions_per_customer


# Generate unique transaction IDs
transaction_ids = generate_unique_transaction_ids(total_transactions)

# Generate data with consistent TransactionIDs
df = generate_sanctioned_entities()
customer_df = generate_customer_data(num_customers)
transaction_df = generate_transactions(customer_df, transaction_ids, num_transactions_per_customer)
cross_border_df = generate_cross_border_transactions(customer_df, transaction_ids, num_transactions_per_customer)
behavior_df = generate_customer_behavior_transactions(customer_df, transaction_ids, num_transactions_per_customer)
time_gap_df = generate_time_gap_transactions(customer_df, transaction_ids, num_transactions_per_customer)
unmatched_df = generate_unmatched_accounts(customer_df, transaction_ids, num_transactions_per_customer)
sanctioned_entities_df = generate_sanctioned_entities_transactions(customer_df, transaction_ids, num_transactions_per_customer)

# Export all tables to Excel
with pd.ExcelWriter("AML_synthetic_data.xlsx", engine='xlsxwriter') as writer:
    df.to_excel(writer, sheet_name="Sanctioned_Entities.csv", index=False)
    customer_df.to_excel(writer, sheet_name="Customer_Informations", index=False)
    transaction_df.to_excel(writer, sheet_name="Unusual_Transactions", index=False)
    cross_border_df.to_excel(writer, sheet_name="Cross_Border_Transactions", index=False)
    behavior_df.to_excel(writer, sheet_name="Customer_Behavior", index=False)
    time_gap_df.to_excel(writer, sheet_name="Time_Gap_Transactions", index=False)
    unmatched_df.to_excel(writer, sheet_name="Unmatched_Accounts", index=False)
    sanctioned_entities_df.to_excel(writer, sheet_name="Sanctioned_Entities_Matching", index=False)


In [None]:
customer_df.to_csv("Customer_Information.csv", index=False)
transaction_df.to_csv("Unusual_Transactions.csv", index=False)
cross_border_df.to_csv("Cross_Border_Transactions.csv", index=False)
behavior_df.to_csv("Customer_Behavior.csv", index=False)
time_gap_df.to_csv("Time_Gap_Transactions.csv", index=False)
unmatched_df.to_csv("Unmatched_Accounts.csv", index=False)
sanctioned_entities_df.to_csv("Sanctioned_Entities_Matching.csv", index=False)

In [None]:
# Load the Excel file
file_path = 'AML_synthetic_data.xlsx'
account_types = ["Loan", "Investment", "Saving", "Retirement"]

# Load the customer information sheet
customer_info_df = pd.read_excel(file_path, sheet_name='Customer_Information')

# Add the 'AccountType' column with random values
customer_info_df['AccountType'] = [random.choice(account_types) for _ in range(len(customer_info_df))]

# Save the updated data into a new sheet in the same Excel file
with pd.ExcelWriter(file_path, engine='openpyxl', mode='a') as writer:
    customer_info_df.to_excel(writer, sheet_name='Customer_Inforomation', index=False)

print("AccountType column added and saved to a new sheet in the existing Excel file.")

AccountType column added and saved to a new sheet in the existing Excel file.


In [None]:
def generate_banks(num_banks):
    bank_data = []

    # List of example countries
    countries = [
        "USA", "Canada", "UK", "Germany", "France", "Italy",
        "Spain", "Australia", "Japan", "China", "India", "Brazil",
        "South Africa", "Mexico", "Russia", "Netherlands", "Sweden",
        "Switzerland", "Singapore", "New Zealand", "Turkey"
    ]

    for i in range(1, num_banks + 1):
        bank_id = f"BANK{i:04d}"  # Create unique BankID
        bank_name = fake.company()  # Generate random bank name
        bank_country = random.choice(countries)  # Randomly select a country
        branch_name = fake.city()  # Generate random branch name

        bank_data.append([bank_id, bank_name, bank_country, branch_name])

    # Create DataFrame
    banks_df = pd.DataFrame(bank_data, columns=["BankID", "BankName", "BankCountry", "BranchName"])

    return banks_df

# Generate 60 banks
banks_df = generate_banks(60)

# Save to Excel
banks_df.to_excel("Synthetic_Bank_Data.xlsx", index=False)

print("Synthetic bank data generated and saved in CSV, Excel, and XML formats.")

Synthetic bank data generated and saved in CSV, Excel, and XML formats.
