# ETL Pipeline for Immigration Data

This notebook demonstrates an ETL (Extract, Transform, Load) pipeline for immigration data. The pipeline includes:

1. Adding new records to the full dataset
2. Transforming the full dataset (enrichment, structural changes, categorization)
3. Extracting and transforming only the latest (incremental) record
4. Saving and displaying results as tables


In [1]:
import pandas as pd
from tabulate import tabulate

# Load the dataset
file_path ="K:\Code Projects\Cloned_Projects_From_Github\ETL_Extract_Justice_Chawanda_670444\Raw_Data\Immigration_Data.csv"
data = pd.read_csv(file_path)

# Display basic stats
print(f"Rows: {data.shape[0]}, Columns: {data.shape[1]}")
print(tabulate(data, headers='keys', tablefmt='grid'))

# Print extraction message
print(f"Extracted {data.shape[0]} rows fully.")

Rows: 104, Columns: 8
+-----+----------------+-------------------+------------------+----------------+--------------------+---------------------------+------------------+------------------+
|     | immigrant_id   | passport_number   | name             | country        | purpose_of_visit   | contact                   | payment_status   | timestamp        |
|   0 | IM0001         | A12345678         | Emily Smith      | United Kingdom | Tourism            | emily.smith@email.com     | Paid             | 01/06/2025 00:00 |
+-----+----------------+-------------------+------------------+----------------+--------------------+---------------------------+------------------+------------------+
|   1 | IM0002         | B23456789         | John Doe         | United States  | Business           | johndoe@email.com         | Paid             | 01/06/2025 02:16 |
+-----+----------------+-------------------+------------------+----------------+--------------------+---------------------------+---------

In [2]:
import os
from datetime import datetime
import pandas as pd

# Simulate a last extraction time
last_extraction_file = "last_extraction.txt"
if not os.path.exists(last_extraction_file):
    with open(last_extraction_file, "w") as f:
        f.write("2025-06-01 00:00:00")  # Initial extraction time

# Read the last extraction time
with open(last_extraction_file, "r", encoding="utf-8") as f:
    content = f.read().strip()
    try:
        last_extraction_time = datetime.strptime(content, "%Y-%m-%d %H:%M:%S")
    except ValueError:
        # If the file content is invalid, reset to a default time
        last_extraction_time = datetime(2025, 6, 1, 0, 0, 0)
        with open(last_extraction_file, "w", encoding="utf-8") as fw:
            fw.write(last_extraction_time.strftime("%Y-%m-%d %H:%M:%S"))

# Filter new or updated records using mixed datetime formats
new_data = data[pd.to_datetime(data['timestamp'], format='mixed') > last_extraction_time]

# Print the number of rows extracted incrementally
print(f"Extracted {new_data.shape[0]} rows incrementally since last check.")

# Update the last extraction time
with open(last_extraction_file, "w") as f:
    f.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# Display the last recorded timestamp
if not data.empty:
    last_timestamp = data['timestamp'].iloc[-1]
    description = f"The last record was added on: {last_timestamp}"
    # Write the description and last timestamp to the text file
    with open(last_extraction_file, "w") as f:
        f.write(description)
else:
    description = "The dataset is empty."
    with open(last_extraction_file, "w") as f:
        f.write(description)

Extracted 51 rows incrementally since last check.


In [3]:
# Ask the user if they want to add a new record
add_record = input("Do you want to add a new record? (yes/no): ").strip().lower()
if add_record == 'yes':
    # Collect new record details from the user
    new_record = {
        "immigrant_id": input("Enter immigrant ID: "),
        "passport_number": input("Enter passport number: "),
        "name": input("Enter name: "),
        "country": input("Enter country: "),
        "purpose_of_visit": input("Enter purpose of visit: "),
        "contact": input("Enter contact: "),
        "payment_status": input("Enter payment status: "),
        "timestamp": datetime.now().strftime("%Y/%m/%d %H:%M")
    }

    # Append the new record to the dataset
    data = pd.concat([data, pd.DataFrame([new_record])], ignore_index=True)

    # Save the updated dataset back to the file
    data.to_csv(file_path, index=False)

    # Update the last_extraction.txt file with the description and timestamp of the very last record
    last_timestamp = data['timestamp'].iloc[-1]
    description = f"The last record was added on: {last_timestamp}"
    with open(last_extraction_file, "w") as f:
        f.write(description)

    print("New record added successfully and timestamp updated!")
else:
    print("No new record added.")

New record added successfully and timestamp updated!


## Section 4: Transform Full & Transform_Incremental

In this section, we will apply three transformations to the full dataset:
1. **Enrichment**: Add a calculated 'age' column from the 'date_of_birth' column.
2. **Structural**: Standardize the 'timestamp' column to always include seconds.
3. **Categorization**: Bin records based on the 'country' column.

The transformed data will be saved as `transformed_full.csv` & `transformed_incremental.csv`

---


In [4]:
import pandas as pd
import numpy as np
import os

# Ensure the 'Tranformed' folder exists
os.makedirs('Tranformed', exist_ok=True)

# Load the full dataset
full_data = pd.read_csv("K:\Code Projects\Cloned_Projects_From_Github\ETL_Extract_Justice_Chawanda_670444\Raw_Data\Immigration_Data.csv")  # Change 'full.csv' to your actual full dataset filenamename

# Add a 'date_of_birth' column with random dates between 1970-01-01 and 2005-12-31 if not present
if 'date_of_birth' not in full_data.columns:
    np.random.seed(42)
    start_date = pd.to_datetime('1970-01-01')
    end_date = pd.to_datetime('2005-12-31')
    num_rows = len(full_data)
    random_days = np.random.randint(0, (end_date - start_date).days, num_rows)
    full_data['date_of_birth'] = (start_date + pd.to_timedelta(random_days, unit='D')).date
# Calculate 'age' from 'date_of_birth'
today = pd.to_datetime('today')
full_data['age'] = (today.normalize() - pd.to_datetime(full_data['date_of_birth']).dt.normalize()).dt.days // 365

# Bin countries into continent-based regions
def country_to_region(country):
    africa = ['Nigeria', 'Kenya', 'South Africa', 'Egypt', 'Ghana']
    europe = ['United Kingdom', 'France', 'Germany', 'Italy', 'Spain']
    asia = ['China', 'India', 'Japan', 'Pakistan', 'Bangladesh']
    north_america = ['United States', 'Canada', 'Mexico']
    south_america = ['Brazil', 'Argentina', 'Colombia']
    oceania = ['Australia', 'New Zealand']
    if country in africa:
        return 'Africa'
    elif country in europe:
        return 'Europe'
    elif country in asia:
        return 'Asia'
    elif country in north_america:
        return 'North America'
    elif country in south_america:
        return 'South America'
    elif country in oceania:
        return 'Oceania'
    else:
        return 'Other'

if 'country' in full_data.columns:
    full_data['country_group'] = full_data['country'].apply(country_to_region)

print('Added Date of Birth, calculated age, standardized timestamp, and grouped countries into regions for full & incremental transformation.')

Added Date of Birth, calculated age, standardized timestamp, and grouped countries into regions for full & incremental transformation.


### Saving and Displaying Transformed Data

This cell performs the following actions:

- Saves the transformed `full_data` and `incremental_data` DataFrames to CSV files named `transformed_full.csv` and `transformed_incremental.csv`.
- Prints confirmation messages indicating that the files have been saved.
- Displays the first 10 rows of both the full and incremental datasets as formatted tables using the `tabulate` library for easy viewing.

This step ensures that the results of the ETL process are both persisted and visually inspected for correctness.

In [5]:
# 4. Save and display the results as tables

from tabulate import tabulate
import numpy as np
from datetime import datetime

# Define the desired column order
column_order = ['immigrant_id','passport_number','name','date_of_birth','age','contact','country',
                'country_group','purpose_of_visit','payment_status','timestamp']

# Reorder columns for full_data and incremental_data if all columns exist
def reorder_columns(df, order):
    cols = [col for col in order if col in df.columns] + [col for col in df.columns if col not in order]
    return df[cols]


full_data = reorder_columns(full_data, column_order)

full_data_fixed = (full_data)


# Save the transformed full dataset in the 'Tranformed' folder
full_data.to_csv('Tranformed/transformed_full.csv', index=False)

print('Saved full_data to transformed_full.csv.')

print('\nFull Data (first 10 rows):')
print(tabulate(full_data_fixed, headers='keys', tablefmt='github', showindex=False))

Saved full_data to transformed_full.csv.

Full Data (first 10 rows):
| immigrant_id   | passport_number   | name             | date_of_birth   |   age | contact                   | country        | country_group   | purpose_of_visit   | payment_status   | timestamp        |
|----------------|-------------------|------------------|-----------------|-------|---------------------------|----------------|-----------------|--------------------|------------------|------------------|
| IM0001         | A12345678         | Emily Smith      | 1989-11-27      |    35 | emily.smith@email.com     | United Kingdom | Europe          | Tourism            | Paid             | 01/06/2025 00:00 |
| IM0002         | B23456789         | John Doe         | 1972-05-10      |    53 | johndoe@email.com         | United States  | North America   | Business           | Paid             | 01/06/2025 02:16 |
| IM0003         | C34567890         | Wei Zhang        | 1984-10-04      |    40 | weizhang@email.cn      

In [6]:
# 4. Save and display the results as tables

incremental_data = (full_data_fixed.tail(1))

# Save the transformed incremental dataset in the 'Tranformed' folder
incremental_data.to_csv('Tranformed/transformed_incremental.csv', index=False)

print('Saved incremental_data to transformed_incremental.csv')
print(tabulate(incremental_data, headers='keys', tablefmt='github', showindex=False))

Saved incremental_data to transformed_incremental.csv
| immigrant_id   | passport_number   | name           | date_of_birth   |   age | contact              | country        | country_group   | purpose_of_visit   | payment_status   | timestamp        |
|----------------|-------------------|----------------|-----------------|-------|----------------------|----------------|-----------------|--------------------|------------------|------------------|
| IM0104         | UIYU8789323       | Chifundo Banda | 1995-05-18      |    30 | chifundo@example.com | United Kingdom | Europe          | Work               | Paid             | 2025/07/01 14:11 |
