In [26]:
import pandas as pd

In [27]:
raw_data = [
# customer_id, call_minutes, data_gb, timestamp, region
[1001, 120, 10.5, "2025-09-25 10:30", "Delhi"],
[1002, 450, 55.0, "2025-09-25 11:00", "Mumbai"],
[1003, 200, 20.0, "25-09-2025 09:15", "Chennai"], # different date format
[1004, 600, 70.0, "2025/09/25 08:00", "Delhi"], # different date format
[1005, 50, 5.0, "2025-09-25 12:00", "Kolkata"],
[1005, 50, 5.0, "2025-09-25 12:00", "Kolkata"], # duplicate
[1006, None, 60.0, "2025-09-25 13:00", "Hyderabad"],# missing call_minutes
[1007, 510, None, "2025-09-25 14:00", "Delhi"], # missing data_gb
]
columns = ["customer_id", "call_minutes", "data_gb", "timestamp", "region"]
df_raw = pd.DataFrame(raw_data, columns=columns)
df_raw.to_csv("6_telecom_raw.csv", index=False)
print("Saved telecom_raw.csv")
df_raw

Saved telecom_raw.csv


Unnamed: 0,customer_id,call_minutes,data_gb,timestamp,region
0,1001,120.0,10.5,2025-09-25 10:30,Delhi
1,1002,450.0,55.0,2025-09-25 11:00,Mumbai
2,1003,200.0,20.0,25-09-2025 09:15,Chennai
3,1004,600.0,70.0,2025/09/25 08:00,Delhi
4,1005,50.0,5.0,2025-09-25 12:00,Kolkata
5,1005,50.0,5.0,2025-09-25 12:00,Kolkata
6,1006,,60.0,2025-09-25 13:00,Hyderabad
7,1007,510.0,,2025-09-25 14:00,Delhi


In [28]:
import sqlite3
import logging
import schedule
import time
import os

In [29]:
DB_PATH = "telecom_warehouse.db"
# Basic logging configuration
logging.basicConfig(
filename="telecom_pipeline.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
print("Database path:", DB_PATH)

Database path: telecom_warehouse.db


In [30]:
def extract_raw_data(csv_path="6_telecom_raw.csv"):
    """
    Step 1: Extract raw data from CSV file.
    Returns a pandas DataFrame.
    """
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"{csv_path} not found in current folder.")
    df = pd.read_csv(csv_path)
    logging.info("Extracted %d rows from %s", len(df), csv_path)
    return df

In [37]:
def transform_and_clean(df_raw):
    df = df_raw.copy()
    # 1) Remove duplicates
    before = len(df)
    df = df.drop_duplicates()
    logging.info("Removed %d duplicate rows", before - len(df))

    # 2) Standardize timestamp to datetime
    df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
    bad_dates = df["timestamp"].isna().sum()
    if bad_dates > 0:
        logging.warning("Found %d bad timestamps; filling with default date", bad_dates)
        df["timestamp"] = df["timestamp"].fillna(pd.Timestamp("2025-09-25 00:00"))
    
    # 3) Handle missing numeric values: fill with column mean
    for col in ["call_minutes", "data_gb"]:
        if df[col].isna().sum() > 0:
            mean_val = df[col].mean()
            df[col] = df[col].fillna(mean_val)
            logging.info("Filled missing values in %s with mean=%.2f", col, mean_val)
    
    # 4) Aggregate total per customer
    agg = (
        df.groupby(["customer_id", "region"])
        .agg(
            total_call_minutes=("call_minutes", "sum"),
            total_data_gb=("data_gb", "sum"),
            last_activity=("timestamp", "max"),
        )
        .reset_index()
    )
    
    # 5) Flag heavy users
    agg["heavy_user"] = (agg["total_data_gb"] > 50) | (agg["total_call_minutes"] > 500)
    logging.info("Transformed data to %d aggregated customer rows", len(agg))
    return agg

In [38]:
df_raw = extract_raw_data("6_telecom_raw.csv") # raw CSV is only read, not modified
df_clean = transform_and_clean(df_raw)
# Save cleaned/aggregated data to a separate CSV
clean_csv_path = "telecom_cleaned.csv"
df_clean.to_csv(clean_csv_path, index=False)
print("Cleaned dataset (first few rows):")
print(df_clean.head())
print(f"\nCleaned data has been saved to: {clean_csv_path}")

Cleaned dataset (first few rows):
   customer_id   region  total_call_minutes  total_data_gb  \
0         1001    Delhi               120.0           10.5   
1         1002   Mumbai               450.0           55.0   
2         1003  Chennai               200.0           20.0   
3         1004    Delhi               600.0           70.0   
4         1005  Kolkata                50.0            5.0   

        last_activity  heavy_user  
0 2025-09-25 10:30:00       False  
1 2025-09-25 11:00:00        True  
2 2025-09-25 00:00:00       False  
3 2025-09-25 00:00:00        True  
4 2025-09-25 12:00:00       False  

Cleaned data has been saved to: telecom_cleaned.csv


# Load to SQLite “warehouse”

In [39]:
def load_to_warehouse(df_clean, db_path=DB_PATH):
    """
    Load cleaned data into SQLite as 'usage_summary' table.
    Creates an index on customer_id for fast lookups.
    """
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    # Write DataFrame to table (replace for each run)
    df_clean.to_sql("usage_summary", conn, if_exists="replace", index=False)
    # Create index on customer_id
    cur.execute("CREATE INDEX IF NOT EXISTS idx_usage_summary_cust ON usage_summary(customer_id);")
    conn.commit()
    conn.close()
    logging.info("Loaded %d rows into usage_summary table", len(df_clean))
load_to_warehouse(df_clean)

    
    # Quick check: read from DB and show
conn = sqlite3.connect(DB_PATH)
check_df = pd.read_sql_query("SELECT * FROM usage_summary;", conn)
conn.close()
print("Data inside SQLite usage_summary table:")
check_df

Data inside SQLite usage_summary table:


Unnamed: 0,customer_id,region,total_call_minutes,total_data_gb,last_activity,heavy_user
0,1001,Delhi,120.0,10.5,2025-09-25 10:30:00,0
1,1002,Mumbai,450.0,55.0,2025-09-25 11:00:00,1
2,1003,Chennai,200.0,20.0,2025-09-25 00:00:00,0
3,1004,Delhi,600.0,70.0,2025-09-25 00:00:00,1
4,1005,Kolkata,50.0,5.0,2025-09-25 12:00:00,0
5,1006,Hyderabad,321.666667,60.0,2025-09-25 13:00:00,1
6,1007,Delhi,510.0,36.75,2025-09-25 14:00:00,1


# Applying Governance

In [40]:
def enrich_with_pii(db_path=DB_PATH):
    """
    Adds fake PII columns to usage_summary to demonstrate role-based masking.
    """
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()

    # Add columns if they don't exist
    cur.execute("PRAGMA table_info(usage_summary);")
    cols = [row[1] for row in cur.fetchall()]
    if "customer_name" not in cols:
        cur.execute("ALTER TABLE usage_summary ADD COLUMN customer_name TEXT;")
    if "phone" not in cols:
        cur.execute("ALTER TABLE usage_summary ADD COLUMN phone TEXT;")

    # Simple mapping for demo
    name_map = {
        1001: "Asha Mehta",
        1002: "Ravi Kumar",
        1003: "Sneha Rao",
        1004: "Manoj Singh",
        1005: "Divya Jain",
        1006: "Rahul Roy",
        1007: "Neha Gupta",
    }
    # Update rows with names and fake phone numbers
    for cust_id, name in name_map.items():
        cur.execute(
        """
        UPDATE usage_summary
        SET customer_name = ?, phone = ?
        WHERE customer_id = ?;
        """,
        (name, f"9{cust_id}000000", cust_id)
        )

    conn.commit()
    conn.close()
    logging.info("Enriched usage_summary with fake PII fields.")
    
enrich_with_pii()

# Create role-based SQL views

In [41]:
def create_role_based_views(db_path=DB_PATH):
    """
    Create role-based views:
    - vw_usage_junior: masked PII
    - vw_usage_manager: full access
    """
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    # Junior view: NO real name/phone, only masked info
    cur.execute("""
    CREATE VIEW IF NOT EXISTS vw_usage_junior AS

    SELECT
    customer_id,
    region,
    total_call_minutes,
    total_data_gb,
    last_activity,
    heavy_user,
    'ANON' AS customer_name,
    'XXXXXX' AS phone
    FROM usage_summary;
    """)
    # Manager view: full details
    cur.execute("""
    CREATE VIEW IF NOT EXISTS vw_usage_manager AS
    SELECT
    customer_id,
    region,
    total_call_minutes,
    total_data_gb,
    last_activity,
    heavy_user,
    customer_name,
    phone
    FROM usage_summary;
    """)
    conn.commit()
    conn.close()
    logging.info("Created role-based views for junior and manager roles.")
create_role_based_views()

# Helper function to read data for a given role

In [42]:
def get_usage_for_role(role, db_path=DB_PATH, limit=10):
    """
    Utility: return a DataFrame for the requested role ('junior' or 'manager').
    """
    view_map = {
    "junior": "vw_usage_junior",
    "manager": "vw_usage_manager",
    }
    view_name = view_map.get(role.lower())
    if view_name is None:
        raise ValueError("Role must be 'junior' or 'manager'")
    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query(f"SELECT * FROM {view_name} LIMIT {limit};", conn)
    conn.close()
    return df

print("Junior analyst view:")
display(get_usage_for_role("junior"))
print("\nManager view:")
display(get_usage_for_role("manager"))


Junior analyst view:


Unnamed: 0,customer_id,region,total_call_minutes,total_data_gb,last_activity,heavy_user,customer_name,phone
0,1001,Delhi,120.0,10.5,2025-09-25 10:30:00,0,ANON,XXXXXX
1,1002,Mumbai,450.0,55.0,2025-09-25 11:00:00,1,ANON,XXXXXX
2,1003,Chennai,200.0,20.0,2025-09-25 00:00:00,0,ANON,XXXXXX
3,1004,Delhi,600.0,70.0,2025-09-25 00:00:00,1,ANON,XXXXXX
4,1005,Kolkata,50.0,5.0,2025-09-25 12:00:00,0,ANON,XXXXXX
5,1006,Hyderabad,321.666667,60.0,2025-09-25 13:00:00,1,ANON,XXXXXX
6,1007,Delhi,510.0,36.75,2025-09-25 14:00:00,1,ANON,XXXXXX



Manager view:


Unnamed: 0,customer_id,region,total_call_minutes,total_data_gb,last_activity,heavy_user,customer_name,phone
0,1001,Delhi,120.0,10.5,2025-09-25 10:30:00,0,Asha Mehta,91001000000
1,1002,Mumbai,450.0,55.0,2025-09-25 11:00:00,1,Ravi Kumar,91002000000
2,1003,Chennai,200.0,20.0,2025-09-25 00:00:00,0,Sneha Rao,91003000000
3,1004,Delhi,600.0,70.0,2025-09-25 00:00:00,1,Manoj Singh,91004000000
4,1005,Kolkata,50.0,5.0,2025-09-25 12:00:00,0,Divya Jain,91005000000
5,1006,Hyderabad,321.666667,60.0,2025-09-25 13:00:00,1,Rahul Roy,91006000000
6,1007,Delhi,510.0,36.75,2025-09-25 14:00:00,1,Neha Gupta,91007000000


In [45]:
def run_pipeline():
    """
    Runs the full pipeline once:
    1) Extract raw data from CSV
    2) Transform & clean
    3) Load to warehouse (SQLite)
    4) Enrich with PII and create role-based views
    """
    try:
        logging.info("Starting ETL pipeline run...")
        # 1 & 2: Extract + Transform
        df_raw = extract_raw_data("6_telecom_raw.csv")
        df_clean = transform_and_clean(df_raw)
        # 3: Load
        load_to_warehouse(df_clean, DB_PATH)
        # 4: Governance
        enrich_with_pii(DB_PATH)
        create_role_based_views(DB_PATH)
        logging.info("ETL pipeline run completed successfully.")
    except Exception as e:
        logging.exception("ETL pipeline run FAILED: %s", e)
        print("Pipeline failed, see log file for details.")

In [46]:
import time, schedule
schedule.clear()
schedule.every(20).seconds.do(run_pipeline)
runs = 3 # run 3 times then stop
for _ in range(runs):
    schedule.run_pending()
    time.sleep(10) # wait for the next tick
print(" Done. Scheduler exited after", runs, "runs.")

 Done. Scheduler exited after 3 runs.
