## Subscriber Cancellation Data Pipeline Project
---

### Project Overview

This projects builds a fast, reliable, and scalable data pipeline from data of subscriber that have cancelled their subscription on a learning platform. The data is stored in a SQLite database, and it is messy. So, the project aims to take the raw and messy tables (*extract*), clean and normalize them to industry standard (*transform*), and then create a neat database for data engineers, and an analytics ready csv for analysts (*load*).

### Key goals of this project:
    
- Clean and standardize raw students and courses data
- Normalize entities into dimension and fact tables
- Enforce schema consistency between pandas and SQLite
- Load validated data into a relational database and flat csv for downstream analysis

---

The notebook us designed with exploration in mind. This is where the cleaning and transformation decisions will take place. However, the **ETL** steps will be inside python scripts, which will allow for better automation

##### Import and configurations

In [1]:
import pandas as pd
import sqlite3
from pathlib import Path
import json

pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)

CWD = Path.cwd()
ROOTDIR = CWD.resolve().parents[0]
DATADIR = ROOTDIR / "data" 
RAWDB = DATADIR / "raw" / "cademycode.db"

##### Connect to the Database and Verify Schema

In [2]:
# db connection
conn = sqlite3.connect(RAWDB)
cur = conn.cursor()

In [3]:
# Tables' name
tables_qry = """
    SELECT name 
    FROM sqlite_master
    WHERE type = 'table';
"""
cur.execute(tables_qry)

table_names = [row[0] for row in cur.fetchall()]
table_names

['cademycode_students', 'cademycode_courses', 'cademycode_student_jobs']

In [19]:
# Tables' Schema
# SQLite table info has the following columns
columns = ("column_id", "name", "type", "nullable", "default", "primary_key")

for table in table_names:
    print(f"Table: {table}")
    print(columns)
    cur.execute(f"PRAGMA table_info({table});")
    
    for row in cur.fetchall():
        print(row)
    print("-"*50)

Table: cademycode_students
('column_id', 'name', 'type', 'nullable', 'default', 'primary_key')
(0, 'uuid', 'INTEGER', 0, None, 0)
(1, 'name', 'VARCHAR', 0, None, 0)
(2, 'dob', 'VARCHAR', 0, None, 0)
(3, 'sex', 'TEXT', 0, None, 0)
(4, 'contact_info', 'JSON', 0, None, 0)
(5, 'job_id', 'VARCHAR', 0, None, 0)
(6, 'num_course_taken', 'VARCHAR', 0, None, 0)
(7, 'current_career_path_id', 'VARCHAR', 0, None, 0)
(8, 'time_spent_hrs', 'VARCHAR', 0, None, 0)
--------------------------------------------------
Table: cademycode_courses
('column_id', 'name', 'type', 'nullable', 'default', 'primary_key')
(0, 'career_path_id', 'BIGINT', 0, None, 0)
(1, 'career_path_name', 'TEXT', 0, None, 0)
(2, 'hours_to_complete', 'BIGINT', 0, None, 0)
--------------------------------------------------
Table: cademycode_student_jobs
('column_id', 'name', 'type', 'nullable', 'default', 'primary_key')
(0, 'job_id', 'BIGINT', 0, None, 0)
(1, 'job_category', 'TEXT', 0, None, 0)
(2, 'avg_salary', 'BIGINT', 0, None, 0)
--

##### Load Raw Data

In [20]:
std_qry = """
    SELECT *
    FROM cademycode_students;
"""

students = pd.read_sql(std_qry, conn)

In [21]:
courses_qry = """
    SELECT *
    FROM cademycode_courses;
"""

courses = pd.read_sql(courses_qry, conn)

In [22]:
jobs_qry = """
    SELECT *
    FROM cademycode_student_jobs;
"""

jobs = pd.read_sql(jobs_qry, conn)

##### Initial Inspection

##### Students

In [23]:
students.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 9 columns):
 #   Column                  Non-Null Count  Dtype 
---  ------                  --------------  ----- 
 0   uuid                    5000 non-null   int64 
 1   name                    5000 non-null   object
 2   dob                     5000 non-null   object
 3   sex                     5000 non-null   object
 4   contact_info            5000 non-null   object
 5   job_id                  4995 non-null   object
 6   num_course_taken        4749 non-null   object
 7   current_career_path_id  4529 non-null   object
 8   time_spent_hrs          4529 non-null   object
dtypes: int64(1), object(8)
memory usage: 351.7+ KB


In [24]:
students.head()

Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
0,1,Annabelle Avery,1943-07-03,F,"{""mailing_address"": ""303 N Timber Key, Irondale, Wisconsin, 84736"", ""email"": ""annabelle_avery9376@woohoo.com""}",7.0,6.0,1.0,4.99
1,2,Micah Rubio,1991-02-07,M,"{""mailing_address"": ""767 Crescent Fair, Shoals, Indiana, 37439"", ""email"": ""rubio6772@hmail.com""}",7.0,5.0,8.0,4.4
2,3,Hosea Dale,1989-12-07,M,"{""mailing_address"": ""P.O. Box 41269, St. Bonaventure, Virginia, 83637"", ""email"": ""hosea_dale8084@coldmail.com""}",7.0,8.0,8.0,6.74
3,4,Mariann Kirk,1988-07-31,F,"{""mailing_address"": ""517 SE Wintergreen Isle, Lane, Arkansas, 82242"", ""email"": ""kirk4005@hmail.com""}",6.0,7.0,9.0,12.31
4,5,Lucio Alexander,1963-08-31,M,"{""mailing_address"": ""18 Cinder Cliff, Doyles borough, Rhode Island, 73737"", ""email"": ""alexander9810@hmail.com""}",7.0,14.0,3.0,5.64


In [25]:
students.duplicated().sum()

np.int64(0)

In [26]:
students["uuid"].duplicated().sum()

np.int64(0)

In [None]:
students["sex"].value_counts()

sex
M    1995
F    1990
N    1015
Name: count, dtype: int64

In [28]:
students.isna().sum()

uuid                        0
name                        0
dob                         0
sex                         0
contact_info                0
job_id                      5
num_course_taken          251
current_career_path_id    471
time_spent_hrs            471
dtype: int64

##### Courses

In [29]:
courses.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 3 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   career_path_id     10 non-null     int64 
 1   career_path_name   10 non-null     object
 2   hours_to_complete  10 non-null     int64 
dtypes: int64(2), object(1)
memory usage: 368.0+ bytes


In [30]:
courses.head()

Unnamed: 0,career_path_id,career_path_name,hours_to_complete
0,1,data scientist,20
1,2,data engineer,20
2,3,data analyst,12
3,4,software engineering,25
4,5,backend engineer,18


In [31]:
courses.duplicated().sum()

np.int64(0)

In [None]:
courses["career_path_id"].duplicated().sum()

np.int64(0)

In [None]:
courses["career_path_name"].duplicated().sum()

np.int64(0)

In [None]:
courses["career_path_name"].value_counts()

career_path_name
data scientist               1
data engineer                1
data analyst                 1
software engineering         1
backend engineer             1
frontend engineer            1
iOS developer                1
android developer            1
machine learning engineer    1
ux/ui designer               1
Name: count, dtype: int64

In [35]:
courses.isna().sum()

career_path_id       0
career_path_name     0
hours_to_complete    0
dtype: int64

##### Jobs

In [36]:
jobs.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13 entries, 0 to 12
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   job_id        13 non-null     int64 
 1   job_category  13 non-null     object
 2   avg_salary    13 non-null     int64 
dtypes: int64(2), object(1)
memory usage: 440.0+ bytes


In [37]:
jobs.head()

Unnamed: 0,job_id,job_category,avg_salary
0,1,analytics,86000
1,2,engineer,101000
2,3,software developer,110000
3,4,creative,66000
4,5,financial services,135000


In [38]:
jobs.duplicated().sum()

np.int64(3)

In [None]:
jobs["job_category"].value_counts()

job_category
creative              2
software developer    2
financial services    2
analytics             1
engineer              1
education             1
HR                    1
student               1
healthcare            1
other                 1
Name: count, dtype: int64

In [40]:
jobs.isna().sum()

job_id          0
job_category    0
avg_salary      0
dtype: int64

##### Cleaning

In [None]:
# json column validity check

def is_valid_json(val: str) -> bool:
    try:
        json.loads(val)
        return True
    except Exception:
        return False


students["contact_info"].apply(is_valid_json).value_counts()

contact_info
True    5000
Name: count, dtype: int64

In [None]:
contact_expanded = pd.json_normalize(
    students["contact_info"].map(json.loads)
)

contact_expanded.head()

Unnamed: 0,mailing_address,email
0,"303 N Timber Key, Irondale, Wisconsin, 84736",annabelle_avery9376@woohoo.com
1,"767 Crescent Fair, Shoals, Indiana, 37439",rubio6772@hmail.com
2,"P.O. Box 41269, St. Bonaventure, Virginia, 83637",hosea_dale8084@coldmail.com
3,"517 SE Wintergreen Isle, Lane, Arkansas, 82242",kirk4005@hmail.com
4,"18 Cinder Cliff, Doyles borough, Rhode Island, 73737",alexander9810@hmail.com


In [48]:
# Enforcing Types
students["dob"] = pd.to_datetime(students["dob"], errors="coerce")
students["job_id"] = (
    pd.to_numeric(students["job_id"], errors="coerce")
        .astype("Int64")
)

students["num_course_taken"] = (
    pd.to_numeric(students["num_course_taken"], errors="coerce")
        .astype("Int64")
)

students["current_career_path_id"] = (
    pd.to_numeric(students["current_career_path_id"], errors="coerce")
        .astype("Int64")
)

students["time_spent_hrs"] = pd.to_numeric(students["time_spent_hrs"], errors="coerce")

In [44]:
students = pd.concat(
    [students.drop(columns="contact_info", errors="ignore"), contact_expanded],
    axis=1
)

In [62]:
students["dob"].isna().sum()

np.int64(0)

In [45]:
students.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 10 columns):
 #   Column                  Non-Null Count  Dtype         
---  ------                  --------------  -----         
 0   uuid                    5000 non-null   int64         
 1   name                    5000 non-null   object        
 2   dob                     5000 non-null   datetime64[ns]
 3   sex                     5000 non-null   object        
 4   job_id                  4995 non-null   Int64         
 5   num_course_taken        4749 non-null   Int64         
 6   current_career_path_id  4529 non-null   Int64         
 7   time_spent_hrs          4529 non-null   float64       
 8   mailing_address         5000 non-null   object        
 9   email                   5000 non-null   object        
dtypes: Int64(3), datetime64[ns](1), float64(1), int64(1), object(4)
memory usage: 405.4+ KB


In [46]:
students.head()

Unnamed: 0,uuid,name,dob,sex,job_id,num_course_taken,current_career_path_id,time_spent_hrs,mailing_address,email
0,1,Annabelle Avery,1943-07-03,F,7,6,1,4.99,"303 N Timber Key, Irondale, Wisconsin, 84736",annabelle_avery9376@woohoo.com
1,2,Micah Rubio,1991-02-07,M,7,5,8,4.4,"767 Crescent Fair, Shoals, Indiana, 37439",rubio6772@hmail.com
2,3,Hosea Dale,1989-12-07,M,7,8,8,6.74,"P.O. Box 41269, St. Bonaventure, Virginia, 83637",hosea_dale8084@coldmail.com
3,4,Mariann Kirk,1988-07-31,F,6,7,9,12.31,"517 SE Wintergreen Isle, Lane, Arkansas, 82242",kirk4005@hmail.com
4,5,Lucio Alexander,1963-08-31,M,7,14,3,5.64,"18 Cinder Cliff, Doyles borough, Rhode Island, 73737",alexander9810@hmail.com


In [50]:
jobs.drop_duplicates(inplace=True)

In [51]:
jobs["avg_salary"] = (
    pd.to_numeric(jobs["avg_salary"], errors="coerce")
      .astype("float64")
)

In [52]:
jobs.info()

<class 'pandas.core.frame.DataFrame'>
Index: 10 entries, 0 to 9
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   job_id        10 non-null     int64  
 1   job_category  10 non-null     object 
 2   avg_salary    10 non-null     float64
dtypes: float64(1), int64(1), object(1)
memory usage: 320.0+ bytes


In [53]:
jobs.head()

Unnamed: 0,job_id,job_category,avg_salary
0,1,analytics,86000.0
1,2,engineer,101000.0
2,3,software developer,110000.0
3,4,creative,66000.0
4,5,financial services,135000.0


##### Join Feasibility Checks

In [63]:
students["uuid"].nunique() == len(students)

True

In [64]:
jobs["job_id"].nunique() == len(jobs)

True

In [65]:
courses["career_path_id"].nunique() == len(courses)

True

In [None]:
job_id_mask = students["job_id"].notna()

students.loc[job_id_mask, "job_id"] \
    .isin(jobs["job_id"]) \
    .value_counts()

job_id
True    4995
Name: count, dtype: Int64

In [None]:
carrer_path_id_mask = students["current_career_path_id"].notna()

students.loc[carrer_path_id_mask, "current_career_path_id"] \
    .isin(courses["career_path_id"]) \
    .value_counts()

current_career_path_id
True    4529
Name: count, dtype: Int64

In [66]:
conn.close()

##### Cleaning Decisions

##### Students

- Expand contact
- Enforcing type (dob: datetime64[ns], job_id | num_course_taken | current_career_path_id: Int64, time_spent_hrs: float64)
- Drop contact_info


Jobs

- Drop Duplicates
- Enforcing Type (avg_salary: float64)