# Data Engineering Project in the financial services industry
This project intends to show how we can use Data engineering to avail data to a marketing department in the financial services organization to help them maximize their efforts in marketing particular products to specific customers


# Data Generation
Before we generate any data ,we are going to create a data model that is going to hold our data inside a relational database. The data model has 9 tables. 



![Data Model](./images/customer_database_schema_version_1.png)


# Required Libraries 
For all the required libraries we are going to put them inside a file called "requirements.txt" . Then we are going to run a python command 

In [None]:
pip install -r requirements.txt


# Database Creation
I use mysql Workbench which is the official Graphical User Interface tool for MySQL. I do it by setting up a new connection to the local mysql

![Database connection](./images/database_connection.png)


I go further to actually create the mock database and its tables. 

In [None]:
create database bank;

show databases;

use bank;

CREATE TABLE Customer (
    customer_id INT,
    name VARCHAR(100),
    email VARCHAR(100),
    phone VARCHAR(20),
    address VARCHAR(255),
    date_of_birth DATE,
    gender VARCHAR(2)
    identification_number VARCHAR(50),
);


CREATE TABLE Account (
    account_id INT ,
    customer_id INT,
    account_type VARCHAR(50),
    balance DECIMAL(18, 2),
    status_type VARCHAR(20)
);

CREATE TABLE Transactions (
    transaction_id ,
    account_id INT,
    transaction_type VARCHAR(50),
    amount DECIMAL(18, 2),
    transaction_date DATETIME,
    notes TEXT,
);

CREATE TABLE Payments (
    payment_id INT ,
    account_id INT,
    recipient VARCHAR(100),
    amount DECIMAL(18, 2),
    payment_date DATE,
    payment_method VARCHAR(50)
);

CREATE TABLE CreditHistory (
    credit_id INT ,
    customer_id INT,
    credit_score INT,
    credit_limit DECIMAL(18, 2),
    payment_history TEXT,
    credit_utilization DECIMAL(5, 2)
);

CREATE TABLE Security (
    security_id INT ,
    customer_id INT,
    username VARCHAR(50),
    password VARCHAR(100),
    access_level VARCHAR(50)
);

CREATE TABLE Relationships (
    relationship_id INT ,
    customer1_id INT,
    customer2_id INT,
    relationship_type VARCHAR(50),
    
);

CREATE TABLE CommunicationLog (
    log_id INT ,
    customer_id INT,
    communication_type VARCHAR(50),
    timestamp DATETIME,
    summary TEXT
);

CREATE TABLE Compliance (
    compliance_id INT ,
    customer_id INT,
    consent_given BOOLEAN,
    disclosures TEXT,
    regulatory_requirements_met BOOLEAN,
);

# Database Connection

We are going to have our database connection using the code below.

In [4]:
import mysql.connector
# import create_customers
import credentials
# Replace these variables with your actual database credentials


db_config = {
    'user': credentials.user,
    'password': credentials.password,
    'host': credentials.host,
    'database': credentials.database,
}

print(db_config)

# Establish a connection to the database
connection = mysql.connector.connect(**db_config)
try:
    # Establish a connection to the database
    connection = mysql.connector.connect(**db_config)
    if connection.is_connected():
        db_info = connection.get_server_info()
        print(f"Connected to MySQL Server version {db_info}")

except mysql.connector.Error as e:
    print(f"Error connecting to MySQL: {e}")

{'user': 'admin', 'password': 'Headphone123', 'host': 'localhost', 'database': 'bank'}
Connected to MySQL Server version 8.2.0


# Creation of Mock Data 
In the cells below , we are going to be creating mock data for all our nine tables.

### Customers Table

In [5]:
from faker import Faker
import subprocess
import random
fake = Faker()

cursor = connection.cursor()

def make_customers():
    customer_data = []
    # Repeat the code a million times
    for _ in range(10000):
        # Generate a fake customer_id
        fake_customer_id = fake.port_number()
        print("Fake customer_id:", fake_customer_id)

        # Generate a fake name
        fake_name = fake.name()
        print("Fake name:", fake_name)

        # Generate a fake name
        fake_phone = fake.basic_phone_number()
        print("Fake phone:", fake_phone)

        # Generate a fake email address
        fake_email = fake.email()
        print("Fake Email:", fake_email)

        # Generate a fake address
        fake_address = fake.address()
        print("Fake Address:", fake_address)

        # Generate a fake date_of_birth
        fake_date_of_birth = fake.date_of_birth()
        print("Fake date_of_birth:", fake_date_of_birth)

        # Generate a fake identification_number
        fake_identification_number = fake.passport_number()
        print("Fake identification_number:", fake_identification_number)

        # Generate a fake gender
        fake_gender = fake.passport_gender()
        print("Fake gender:", fake_gender)

        #create a tuple named row
        row = (fake_customer_id,fake_name,fake_email,fake_phone,fake_address,fake_date_of_birth,fake_identification_number,fake_gender)
        print(row)
        #append into customer_data_list
        customer_data.append(row)

    return customer_data

def delete_duplicates():
    # Execute a query to select a specific column from a table
    column_name = 'customer_id'  # Replace 'column_name_here' with your column name
    table_name = 'Customer'    # Replace 'table_name_here' with your table name
    query = f"DELETE t1 FROM {table_name} t1 JOIN (SELECT {column_name} FROM {table_name} GROUP BY {column_name} HAVING COUNT(*) > 1) t2 ON t1.{column_name} = t2.{column_name};"
    return query


customers = make_customers()

# SQL statement to perform the insertions
insert_customers = """
    INSERT INTO Customer (customer_id, name, email, phone, address, date_of_birth, identification_number,gender)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""

# Execute the SQL statement for each set of data
for entry in customers:
    cursor.execute(insert_customers, entry)
    print(entry)

# Commit the changes to the database
connection.commit()

#Delete any duplicates
cursor.execute(delete_duplicates())

# Commit the changes to the database
connection.commit()
# Close the cursor and connection


## Accounts Table

In [None]:


# Execute a query to select a specific column from a table
column_name = 'customer_id'  # Replace 'column_name_here' with your column name
table_name = 'Customer'    # Replace 'table_name_here' with your table name
query = f"SELECT {column_name} FROM {table_name}"

cursor.execute(query)

# Fetch the results
customer_ids = cursor.fetchall()

# Print the selected column
print(customer_ids)

account = ['Savings Account','Checking/Current Account','Certificate of Deposit (CD)','Individual Retirement Account (IRA)','Business Account','Trust Account','Custodial Account']

status = ['Active','Inactive/Dormant','Closed','Blocked/Frozen','Limited/Restricted','Defaulted']

def create_accounts():
    accounts_data = []

    for customer_id in customer_ids:

        # Generate a random number between 1 and 10 (you can adjust the range)
        # One person can have maximum of 5 accounts
        random_iterations = random.randint(1, 5)

        # Run the for loop for the random number of times
        for i in range(random_iterations):
            # Generate a fake account_id
            fake_account_id = fake.port_number()
            print("Fake customer_id:", fake_account_id)

            #Generate a fake account type
            account_type = random.choice(account)

            #Generate a fake status
            status_type = random.choice(status)

            # Generate a fake balance
            fake_balance = fake.random_number()
            print("Fake balance:", fake_balance)

            #create a tuple named row
            row = (fake_account_id,customer_id[0],account_type,fake_balance,status_type)
            # print(row)
            #append into customer_data_list
            accounts_data.append(row)

            print(f"This is iteration number {i + 1}")

    return accounts_data
    

#Run the create accounts function
accounts = create_accounts()

# SQL statement to perform the insertions
insert_account = """
    INSERT INTO Account (account_id, customer_id, account_type, balance, status_type)
    VALUES (%s, %s, %s, %s, %s)
"""

# Execute the SQL statement for each set of data
for entry in accounts:
    cursor.execute(insert_account, entry)
    print(entry)

# Commit the changes to the database
connection.commit()


# Transactions

In [None]:

# Execute a query to select a specific column from a table
column_name = 'account_id'  # Replace 'column_name_here' with your column name
table_name = 'Account'    # Replace 'table_name_here' with your table name
query = f"SELECT {column_name} FROM {table_name}"

cursor.execute(query)

# Fetch the results
account_ids = cursor.fetchall()

# Print the selected column
print(account_ids)

transaction_type = ['Purchase','Financial','Online','Credit','Investment','Payment ','Subscription','Wire']


def create_transactions():
    transactional_data = []

    for account_id in account_ids:

        # Generate a random number between 1 and 10 (you can adjust the range)
        # One person can have maximum of 5 accounts
        random_iterations = random.randint(1, 2)

        # Run the for loop for the random number of times
        for i in range(random_iterations):
            # Generate a fake transaction_id
            fake_transaction_id = fake.port_number()
            print("Fake transaction_id:", fake_transaction_id)

            # Generate a fake transaction type
            fake_transaction_type = random.choice(transaction_type)
            print("Fake transaction_type:", fake_transaction_type)


            # Generate a fake amount
            fake_amount = fake.random_number()
            print("Fake amount:", fake_amount)

            # Generate a fake transactional_date
            fake_transactional_date = fake.date()
            print("Fake transactional date:", fake_transactional_date)

            #Generate a fake payment method
            # fake_payment_method = random.choice(payment_method)

            #create a tuple named row
            row = (fake_transaction_id,account_id[0],fake_transaction_type,fake_amount,fake_transactional_date)
            # print(row)
            #append into customer_data_list
            transactional_data.append(row)

            print(f"This is iteration number {i + 1}")

    return transactional_data
    
def delete_duplicates():
    # Execute a query to select a specific column from a table
    column_name = 'account_id'  # Replace 'column_name_here' with your column name
    table_name = 'Account'    # Replace 'table_name_here' with your table name
    query = f"DELETE t1 FROM {table_name} t1 JOIN (SELECT {column_name} FROM {table_name} GROUP BY {column_name} HAVING COUNT(*) > 1) t2 ON t1.{column_name} = t2.{column_name};"
    return query

#Run the create accounts function
transactions = create_transactions()

# SQL statement to perform the insertions
insert_transaction = """
    INSERT INTO Transactions (transaction_id, account_id, transaction_type, amount, transaction_date)
    VALUES (%s, %s, %s, %s, %s)
"""

# Execute the SQL statement for each set of data
for entry in transactions:
    cursor.execute(insert_transaction, entry)
    print(entry)

# Commit the changes to the database
connection.commit()

# Delete any duplicates
cursor.execute(delete_duplicates())

# Commit the changes to the database
connection.commit()
     

        

# Payments

In [None]:

# Execute a query to select a specific column from a table
column_name = 'account_id'  # Replace 'column_name_here' with your column name
table_name = 'Account'    # Replace 'table_name_here' with your table name
query = f"SELECT {column_name} FROM {table_name}"

cursor.execute(query)

# Fetch the results
account_ids = cursor.fetchall()

# Print the selected column
print(account_ids)

payment_method = ['Credit Card','Debit Card','Cash','Mobile Payment','Bank Transfer','E-wallet','Cryptocurrency','Contactless Card','Check','Prepaid Card']

def create_payments():
    payments_data = []

    for account_id in account_ids:

        # Generate a random number between 1 and 10 (you can adjust the range)
        # One person can have maximum of 5 accounts
        random_iterations = random.randint(1, 50)

        # Run the for loop for the random number of times
        for i in range(random_iterations):
            # Generate a fake account_id
            fake_payment_id = fake.port_number()
            print("Fake payment_id:", fake_payment_id)

            # Generate a fake amount
            fake_receipient = fake.company()
            print("Fake receipient:", fake_receipient)


            # Generate a fake amount
            fake_amount = fake.random_number()
            print("Fake amount:", fake_amount)

            # Generate a fake payment_date
            fake_payment_date = fake.date()
            print("Fake payment date:", fake_payment_date)

            #Generate a fake payment method
            fake_payment_method = random.choice(payment_method)

            #create a tuple named row
            row = (fake_payment_id,account_id[0],fake_receipient,fake_amount,fake_payment_date,fake_payment_method)
            # print(row)
            #append into customer_data_list
            payments_data.append(row)

            print(f"This is iteration number {i + 1}")

    return payments_data
    
# def delete_duplicates():
#     # Execute a query to select a specific column from a table
#     column_name = 'account_id'  # Replace 'column_name_here' with your column name
#     table_name = 'Account'    # Replace 'table_name_here' with your table name
#     query = f"DELETE t1 FROM {table_name} t1 JOIN (SELECT {column_name} FROM {table_name} GROUP BY {column_name} HAVING COUNT(*) > 1) t2 ON t1.{column_name} = t2.{column_name};"
#     return query

#Run the create accounts function
payments = create_payments()

# SQL statement to perform the insertions
insert_payment = """
    INSERT INTO Payments (payment_id, account_id, recipient, amount, payment_date, payment_method)
    VALUES (%s, %s, %s, %s, %s, %s)
"""

# Execute the SQL statement for each set of data
for entry in payments:
    cursor.execute(insert_payment, entry)
    print(entry)

# Commit the changes to the database
connection.commit()

#Delete any duplicates
# cursor.execute(delete_duplicates())

# # Commit the changes to the database
# connect_to_database.connection.commit()
     
# Close the cursor and connection
cursor.close()
        

Now that we have our Mocked Data Created we move on to the Data Engineering process , Here we have our data in the production environment , the transaction environment. 