# PulseFit ETL Process: From Raw to Clean Data
**Objective**: Extract data from raw CSV files, perform cleaning and transformations, and load the results into a processed format suitable for BI analysis.

## Phase 1: Extraction & Initial Inspection
In this phase, we load our datasets and perform a high-level scan to understand the quality of the data we are working with.

In [38]:
import pandas as pd
import numpy as np
import os

# Set display options
pd.set_option('display.max_columns', None)

# Paths
RAW_DATA_PATH = "data/raw/"
PROCESSED_DATA_PATH = "data/processed/"

### 1.1 Loading the datasets

In [39]:
members_raw = pd.read_csv(os.path.join(RAW_DATA_PATH, "members_raw.csv"))
visits_raw = pd.read_csv(os.path.join(RAW_DATA_PATH, "visits_raw.csv"))
payments_raw = pd.read_csv(os.path.join(RAW_DATA_PATH, "payments_raw.csv"))

print(f"- Members: {members_raw.shape[0]} rows")
print(f"- Visits: {visits_raw.shape[0]} rows")
print(f"- Payments: {payments_raw.shape[0]} rows")

- Members: 500 rows
- Visits: 1996 rows
- Payments: 1500 rows


### 1.2 Initial Data Quality Audit
Let's look at the structure and find basic issues like missing values and wrong data types.

In [40]:
print("--- Members Info ---")
print(members_raw.info())
print("\n--- Visits Info ---")
print(visits_raw.info())
print("\n--- Payments Info ---")
print(payments_raw.info())

--- Members Info ---
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500 entries, 0 to 499
Data columns (total 10 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   member_id        500 non-null    object
 1   full_name        500 non-null    object
 2   email            476 non-null    object
 3   phone_number     500 non-null    object
 4   gender           494 non-null    object
 5   join_date        500 non-null    object
 6   membership_type  500 non-null    object
 7   status           500 non-null    object
 8   home_branch      500 non-null    object
 9   birth_year       500 non-null    int64 
dtypes: int64(1), object(9)
memory usage: 39.2+ KB
None

--- Visits Info ---
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1996 entries, 0 to 1995
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   visit_id        1996 non-null   object
 1   member_id     

### 1.3 Identifying Inconsistencies
Scanning for the 'human errors' like branch name typos or gender inconsistencies.

In [41]:
print("Unique Branches in Members:", members_raw['home_branch'].unique())
print("\nUnique Genders in Members:", members_raw['gender'].unique())
print("\nMembership Types:", members_raw['membership_type'].unique())

Unique Branches in Members: ['Nabeul Plage' 'Sousse Corniche' 'Sfax Center' 'Nabeul' 'Bizerte Port'
 'Sfax_Center' 'tunis lake' 'Sousse' 'Tunis Lake']

Unique Genders in Members: ['M' 'F' 'Male' nan 'Femal']

Membership Types: ['Annual' 'VIP' '6-Month' 'Student' 'Monthly' 'MONTHLY' 'ANNUAL' 'STUDENT'
 '6-MONTH']


### 1.4 Detailed Quality Audit

In [42]:
print("     Missing Values Check:")
print(members_raw.isnull().sum())

print("\n   Duplicate Members Check:")
dups = members_raw[members_raw.duplicated(subset=['full_name', 'email'], keep=False)]
print(f"Found {len(dups)} potential duplicate records.")

print("\n   Value Range Checks:")
print("Payments Amount Min/Max:", payments_raw['amount'].min(), "/", payments_raw['amount'].max())
print("Visits Check-out Missing:", visits_raw['check_out_time'].isnull().sum())

     Missing Values Check:
member_id           0
full_name           0
email              24
phone_number        0
gender              6
join_date           0
membership_type     0
status              0
home_branch         0
birth_year          0
dtype: int64

   Duplicate Members Check:
Found 307 potential duplicate records.

   Value Range Checks:
Payments Amount Min/Max: -450 / 8000
Visits Check-out Missing: 198


## Phase 2: Data Cleaning & Transformation

### 2.1 Cleaning Members Data
- Normalize Branch Names
- Fix Gender Categories
- Deduplicate members
- Handle missing emails

In [43]:
members_clean = members_raw.copy()

# 1. Standardize Branch names (title case and fix common typos)
branch_map = {
    'tunis lake': 'Tunis Lake', 
    'Sfax_Center': 'Sfax Center', 
    'Sousse': 'Sousse Corniche', # Assuming they're the same branch
    'Nabeul': 'Nabeul Plage'
}
members_clean['home_branch'] = members_clean['home_branch'].replace(branch_map)

# 2. Standardize Gender
members_clean['gender'] = members_clean['gender'].replace({'Male': 'M', 'Femal': 'F'})
members_clean['gender'] = members_clean['gender'].fillna('U') # U for Unknown

# 3. Remove Duplicates (Keeping first occurrence)
members_clean = members_clean.drop_duplicates(subset=['full_name', 'email'], keep='first')

print(f"Members dataset after cleaning: {members_clean.shape[0]} rows")

Members dataset after cleaning: 323 rows


### 2.2 Cleaning Visits Data
- Convert timestamps to datetime objects
- Impute missing check-out times (assume 1h duration)
- Fix logic errors (check-out < check-in)

In [44]:
# 1. Initialize the clean version
visits_clean = visits_raw.copy()
# 2. Convert raw strings to proper Datetime objects
visits_clean['check_in_time'] = pd.to_datetime(visits_clean['check_in_time'])
visits_clean['check_out_time'] = pd.to_datetime(visits_clean['check_out_time'])

In [45]:
# 1. Calculate duration for completed sessions only
temp_df = visits_clean.dropna(subset=['check_out_time']).copy()
temp_df['duration'] = temp_df['check_out_time'] - temp_df['check_in_time']
temp_df

Unnamed: 0,visit_id,member_id,branch,check_in_time,check_out_time,duration
0,c7b42034,26e8e627,Tunis Lake,2025-06-08 18:02:00,2025-06-08 18:40:00,0 days 00:38:00
1,9cb80428,b357874e,Nabeul,2023-12-14 09:27:00,2023-12-14 11:24:00,0 days 01:57:00
2,718c9145,a4ed006b,Nabeul,2023-03-29 10:03:00,2023-03-29 11:16:00,0 days 01:13:00
3,e0f08686,6971d7b6,Nabeul Plage,2024-09-25 13:16:00,2024-09-25 15:12:00,0 days 01:56:00
5,7b05197e,eb517e91,Sfax_Center,2025-06-14 08:05:00,2025-06-14 09:50:00,0 days 01:45:00
...,...,...,...,...,...,...
1990,36fee19b,fe773c65,tunis lake,2024-01-23 14:56:00,2024-01-23 15:53:00,0 days 00:57:00
1991,3efa1f11,966e2a16,Sfax Center,2024-06-24 06:10:00,2024-06-24 08:09:00,0 days 01:59:00
1992,bf5f7820,dd49cdfe,Sfax Center,2025-01-06 21:20:00,2025-01-06 23:08:00,0 days 01:48:00
1994,eadc0e36,cfcb50a1,Sfax_Center,2024-04-08 12:49:00,2024-04-08 13:24:00,0 days 00:35:00


In [46]:
# 2. Get median duration per branch
# transform() maps the median of the group back to every row based on its 'branch'
branch_medians = temp_df.groupby('branch')['duration'].transform('median')
branch_medians

0      0 days 01:10:30
1      0 days 01:16:30
2      0 days 01:16:30
3      0 days 01:12:30
5      0 days 01:14:30
             ...      
1990   0 days 01:17:00
1991   0 days 01:15:00
1992   0 days 01:15:00
1994   0 days 01:14:30
1995   0 days 01:10:30
Name: duration, Length: 1798, dtype: timedelta64[ns]

In [47]:
# 3. Create a mapping of Branch -> Median Duration
# This allows us to look up the duration for rows that are MISSING check_out_time
medians_map = temp_df.groupby('branch')['duration'].median()
medians_map

branch
Bizerte Port      0 days 01:11:30
Nabeul            0 days 01:16:30
Nabeul Plage      0 days 01:12:30
Sfax Center       0 days 01:15:00
Sfax_Center       0 days 01:14:30
Sousse            0 days 01:15:00
Sousse Corniche   0 days 01:14:00
Tunis Lake        0 days 01:10:30
tunis lake        0 days 01:17:00
Name: duration, dtype: timedelta64[ns]

In [48]:
# 3. Create a mapping of Branch -> Median Duration
# This allows us to look up the duration for rows that are MISSING check_out_time
medians_map = temp_df.groupby('branch')['duration'].median()
global_median = temp_df['duration'].median()
# 4. Apply the imputation
def get_branch_duration(row):
    # Try branch median, if branch has no data, use global median
    return medians_map.get(row['branch'], global_median)
mask = visits_clean['check_out_time'].isnull()
# We apply the specific duration to each missing row
visits_clean.loc[mask, 'check_out_time'] = visits_clean.loc[mask, 'check_in_time']+\
    visits_clean[mask].apply(get_branch_duration, axis=1)
print(f"Visits dataset after cleaning: {visits_clean.shape[0]} rows")

Visits dataset after cleaning: 1996 rows


In [49]:
print(visits_clean['check_out_time'].isnull().sum())

0


### 2.3 Cleaning Payments Data
- Remove negative amounts (assume refunds not needed for this BI dashboard)
- Correct 'Data Entry' outliers (e.g., if price > 1000 for a month, it's likely a typo)
- Normalize membership type naming

In [50]:
payments_clean = payments_raw.copy()

# 1. Remove negative amounts
payments_clean = payments_clean[payments_clean['amount'] > 0]

# 2. Handle 10x Entry Errors (Max expected price is roughly 1000 TND for Annual)
outliers = payments_clean[payments_clean['amount'] > 1500]
print(f"Found {len(outliers)} outliers. Correcting 10x entry errors...")
payments_clean.loc[payments_clean['amount'] > 1500, 'amount'] = payments_clean['amount'] / 10

# 3. Normalize Membership Types
payments_clean['membership_type'] = payments_clean['membership_type'].str.capitalize()

print(f"Payments dataset after cleaning: {payments_clean.shape[0]} rows")

Found 4 outliers. Correcting 10x entry errors...
Payments dataset after cleaning: 1484 rows


In [51]:
print("number of negative payments: ",payments_clean[payments_clean['amount'] < 0].shape[0])
print("number of null payments: ",payments_clean[payments_clean['amount'].isnull()].shape[0])
print("number of outlier payments: ", payments_clean[payments_clean['amount'] > 1500].shape[0])
print("unique membership types: ",payments_clean['membership_type'].unique())

number of negative payments:  0
number of null payments:  0
number of outlier payments:  0
unique membership types:  ['Monthly' 'Vip' 'Annual' 'Student' '6-month']


## Phase 3: Loading (Saving the results)
Now we save the clean datasets to `data/processed/` for downstream modeling.

In [52]:
members_clean.to_csv(os.path.join(PROCESSED_DATA_PATH, "members_clean.csv"), index=False)
visits_clean.to_csv(os.path.join(PROCESSED_DATA_PATH, "visits_clean.csv"), index=False)
payments_clean.to_csv(os.path.join(PROCESSED_DATA_PATH, "payments_clean.csv"), index=False)

print("ETL Complete! Clean datasets saved to", PROCESSED_DATA_PATH)

ETL Complete! Clean datasets saved to data/processed/
