# üõ†Ô∏è ETL Pipeline Project

This notebook demonstrates a clean and professional Extract, Transform, Load (ETL) pipeline using Python. The objective is to retrieve data, process it for analysis, and load it into a structured format ready for downstream tasks such as reporting or modeling.

**Author:** [Melek SFAXI]  
**Date:** June 2025  
**Tools Used:** Python, Pandas, SQLAlchemy, etc.

In [1]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv
load_dotenv()  # Load .env variables


True

In [2]:
def extract_data(file_path):
    """
    Extracts data from a CSV file and returns a DataFrame.
    """
    try:
        data = pd.read_csv(file_path)
        return data
    except Exception as e:
        return None

In [3]:
data= extract_data("Functional Task - OLTP_Subscription.csv")

In [4]:
# Get full structure summary

In [5]:
data[data['StudentGender']=="Unknown"].count()
# Get the count

SessionName               274
TrackName                 274
Hackerspace               274
Country                   274
GroupName                 274
ProductSchedule           274
Student                   274
StudentGender             274
InstructorFullName        274
InstructorEmail           274
SubscriptionStartDate     274
SubscriptionEndDate       274
SubscriptionProgress      274
SubscriptionHasDiploma    274
DiplomaDate               171
instructor_diploma        171
StudentBirthDate          228
professionalExperience    274
Industry                  274
dtype: int64

In [6]:
# Standardizing the 'ProductSchedule' column in the DataFrame
# This function will categorize the 'ProductSchedule' values into standardized categories.
def standardize_schedule(value):
    val = value.strip().lower()  # normalize text
    # Morning
    if val == 'morning':
        return 'Morning'
    # Afternoon
    if 'afternoon' in val or 'fridayeveningtn' in val:
        return 'Afternoon'
    # Night
    if 'night' in val :
        return 'Night'
    # Full Day
    if val == 'full day':
        return 'Full Day'
    # Kids
    if 'kids' in val:
        return 'Kids'
    # B2B
    if 'b2b' in val:
        return 'B2B'
    # Experimental
    if 'exp' in val:
        return 'Experimental'
    # Code Schedule (P, F, W, N patterns)
    if val.startswith(('p', 'f', 'w', 'n')):
        return 'Code Schedule'
    # Other (catch all)
    return 'Other'
# Apply function to entire column:
data['ProductScheduleStandardized'] = data['ProductSchedule'].apply(standardize_schedule)

In [7]:
data["ProductScheduleStandardized"].unique()
# Display the standardized schedule counts

array(['Code Schedule', 'Night', 'Other', 'Morning', 'Afternoon', 'Kids',
       'Experimental', 'Full Day', 'B2B'], dtype=object)

In [8]:
data["SubscriptionStartDate"] = pd.to_datetime(data["SubscriptionStartDate"], errors='coerce')
data["SubscriptionEndDate"] = pd.to_datetime(data["SubscriptionEndDate"], errors='coerce')

In [9]:
# Remove the '%' character
data["SubscriptionProgress"] = data["SubscriptionProgress"].str.replace('%', '', regex=True)

# Convert to float
data["SubscriptionProgress"] = data["SubscriptionProgress"].astype(float)
# Convert 'SubscriptionProgress' to numeric

In [10]:
data["SubscriptionHasDiploma"].unique()

array([ True, False])

In [11]:
data['DiplomaDate'] = pd.to_datetime(data['DiplomaDate'], errors='coerce')
data['DiplomaDate'] = data['DiplomaDate'].fillna('No Diploma').astype(str)
data["DiplomaDate"].isnull().sum()

0

In [12]:
data["Industry"].unique()
# Function to standardize the 'Industry' column

array(['Sales', 'Research And Development', 'Marketing', 'Design',
       'Management', 'Engineering And Technology', 'Unknown',
       'Operations', 'Finance And Accounting And Legal'], dtype=object)

In [13]:
data['instructor_diploma'] = data['instructor_diploma'].fillna('No Diploma').astype(str)

In [14]:
data["StudentBirthDate"].isna().sum()
data['StudentBirthDate'] = data['StudentBirthDate'].fillna('Unknown')

Olap

In [15]:
data['SubscriptionMonth'] = data['SubscriptionStartDate'].dt.to_period('M').astype(str)

In [16]:
data['DiplomaFlag'] = data['SubscriptionHasDiploma'].astype(int)

In [17]:
def experience_bucket(exp):
    if exp == 0:
        return 'No Experience'
    elif exp <= 2:
        return 'Junior'
    elif exp <= 5:
        return 'Mid-Level'
    else:
        return 'Senior'

data['ExperienceLevel'] = data['professionalExperience'].apply(experience_bucket)

In [18]:
data['StudentBirthDate'] = pd.to_datetime(data['StudentBirthDate'], errors='coerce')
data['StudentAge'] = ((pd.Timestamp('today') - data['StudentBirthDate']).dt.days // 365)

In [19]:
data['StudentAge'] = data['StudentAge'].apply(lambda x: x if 6 <= x <= 65 else np.nan)
data['StudentAge'].isna().sum()
def categorize_age(age):
    if pd.isna(age):
        return 'Unknown'
    if age < 18:
        return 'Under 18'
    elif age < 25:
        return '18-24'
    elif age < 35:
        return '25-34'
    elif age < 50:
        return '35-49'
    else:
        return '50+'

data['AgeGroup'] = data['StudentAge'].apply(categorize_age)

In [20]:
def experience_bucket(exp):
    if exp == 0:
        return 'No Experience'
    elif exp <= 2:
        return 'Junior'
    elif exp <= 5:
        return 'Mid-Level'
    else:
        return 'Senior'

data['ExperienceLevel'] = data['professionalExperience'].apply(experience_bucket)

In [21]:
def churn_logic(row):
    if row['DiplomaFlag'] == 1:
        return 'Completed'
    elif row['SubscriptionProgress'] < 25:
        return 'Likely Churned'
    else:
        return 'Active / In Progress'

data['ChurnFlag'] = data.apply(churn_logic, axis=1)

In [22]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5024 entries, 0 to 5023
Data columns (total 26 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   SessionName                  5024 non-null   object        
 1   TrackName                    5024 non-null   object        
 2   Hackerspace                  5024 non-null   object        
 3   Country                      5024 non-null   object        
 4   GroupName                    5024 non-null   object        
 5   ProductSchedule              5024 non-null   object        
 6   Student                      5024 non-null   object        
 7   StudentGender                5024 non-null   object        
 8   InstructorFullName           5024 non-null   object        
 9   InstructorEmail              5024 non-null   object        
 10  SubscriptionStartDate        5024 non-null   datetime64[ns]
 11  SubscriptionEndDate          5024 non-null 

# üìä OLAP Integration

This section demonstrates how OLAP (Online Analytical Processing) capabilities are integrated into the pipeline. You perform operations such as connecting to a database, creating a star schema, and preparing data for multidimensional analysis. This enables slicing, dicing, and efficient aggregation of data for business intelligence purposes.

In [23]:
group_cols = [
    'SubscriptionMonth',
    'Country',
    'Hackerspace',
    'TrackName',
    'ProductScheduleStandardized',
    'StudentGender',
    'Industry',
    'AgeGroup',
    'ExperienceLevel',
    'ChurnFlag'   # üÜï include churn state for better reporting
]
olap_table = data.groupby(group_cols).agg({
    'Student': 'count',
    'SubscriptionProgress': 'mean',
    'DiplomaFlag': 'sum',
    'professionalExperience': 'mean'
}).reset_index()
olap_table.rename(columns={
    'Student': 'TotalSubscriptions',
    'SubscriptionProgress': 'AvgProgress',
    'DiplomaFlag': 'TotalDiplomas',
    'professionalExperience': 'AvgExperience'
}, inplace=True)
olap_table['DiplomaRate'] = (olap_table['TotalDiplomas'] / olap_table['TotalSubscriptions']).round(2)

In [24]:
# Export backup CSV
olap_table.to_csv("OLAP_Subscription_Dataset.csv", index=False)
print("Exported OLAP table to OLAP_Subscription_Dataset.csv")
# PostgreSQL connection info
username = 'avnadmin'
password = os.getenv("AIVEN_PASSWORD")
host = 'olapsubscription-sfaximalek5-beeb.f.aivencloud.com'
port = '11936'
database = 'defaultdb'

# Create connection string
connection_string = f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}"
engine = create_engine(connection_string)

# Clean column names for SQL compatibility
olap_table.columns = olap_table.columns.str.lower().str.replace(' ', '_')

# ‚úÖ The fix using engine.connect()
# No engine.connect() tricks needed anymore
olap_table.to_sql(
    name='olap_subscription',
    con=engine,
    if_exists='replace',
    index=False
)
method='multi'   # optional but nice)

Exported OLAP table to OLAP_Subscription_Dataset.csv
