# Subscriber Pipeline Portfolio Project (Data Engineer)

## Scenario

We will be working with a mock database of long-term cancelled subscribers for a fictional subscription company. This database is regularly updated from multiple sources, and needs to be routinely cleaned and transformed into usable shape with as little human intervention as possible.

## Inspect and clean the data

In [1]:
import pandas as pd
import sqlite3

In [2]:
database = sqlite3.connect('/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/cademycode.db')
print('Load database is succesfull')

Load database is succesfull


In [8]:
database_df = pd.read_sql_query('SELECT * FROM sqlite_master', database)
database_df

Unnamed: 0,type,name,tbl_name,rootpage,sql
0,table,cademycode_students,cademycode_students,2,CREATE TABLE cademycode_students (\n\tuuid INT...
1,table,cademycode_courses,cademycode_courses,5,CREATE TABLE cademycode_courses (\n\tcareer_pa...
2,table,cademycode_student_jobs,cademycode_student_jobs,6,CREATE TABLE cademycode_student_jobs (\n\tjob_...


In [15]:
def inspect_dataframe(dataframe):
    for table_name, df in dataframe.items():
        print(f"\nTable: {table_name}")
        display(df.head())
        print('\nDataFrame Info:')
        display(df.info())
        print('\nDataFrame Describe:')
        display(df.describe())
        for column in df.columns:
            print(f'\nValue Counts {column}:')
            display(df[column].value_counts())
            
table_name = database_df['name'].tolist()
df = {table: pd.read_sql_query(f'SELECT * FROM {table}', database) for table in table_name}

inspect_dataframe(df)


Table: cademycode_students


Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
0,1,Annabelle Avery,1943-07-03,F,"{""mailing_address"": ""303 N Timber Key, Irondal...",7.0,6.0,1.0,4.99
1,2,Micah Rubio,1991-02-07,M,"{""mailing_address"": ""767 Crescent Fair, Shoals...",7.0,5.0,8.0,4.4
2,3,Hosea Dale,1989-12-07,M,"{""mailing_address"": ""P.O. Box 41269, St. Bonav...",7.0,8.0,8.0,6.74
3,4,Mariann Kirk,1988-07-31,F,"{""mailing_address"": ""517 SE Wintergreen Isle, ...",6.0,7.0,9.0,12.31
4,5,Lucio Alexander,1963-08-31,M,"{""mailing_address"": ""18 Cinder Cliff, Doyles b...",7.0,14.0,3.0,5.64



DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 9 columns):
 #   Column                  Non-Null Count  Dtype 
---  ------                  --------------  ----- 
 0   uuid                    5000 non-null   int64 
 1   name                    5000 non-null   object
 2   dob                     5000 non-null   object
 3   sex                     5000 non-null   object
 4   contact_info            5000 non-null   object
 5   job_id                  4995 non-null   object
 6   num_course_taken        4749 non-null   object
 7   current_career_path_id  4529 non-null   object
 8   time_spent_hrs          4529 non-null   object
dtypes: int64(1), object(8)
memory usage: 351.7+ KB


None


DataFrame Describe:


Unnamed: 0,uuid
count,5000.0
mean,2500.5
std,1443.520003
min,1.0
25%,1250.75
50%,2500.5
75%,3750.25
max,5000.0



Value Counts uuid:


uuid
1       1
3331    1
3338    1
3337    1
3336    1
       ..
1667    1
1666    1
1665    1
1664    1
5000    1
Name: count, Length: 5000, dtype: int64


Value Counts name:


name
Melvin Felt              2
Robbie Davies            2
Annabelle Avery          1
Kelley Munnickhuijsen    1
Rachel de Kruijff        1
                        ..
Cleopatra Singleton      1
Linwood Patrick          1
Marcia Beeldhouwer       1
Arnoldo Rodgers          1
Elton Otway              1
Name: count, Length: 4998, dtype: int64


Value Counts dob:


dob
1993-08-03    4
1955-05-27    4
1953-12-05    3
1995-06-13    3
1956-05-22    3
             ..
1960-09-21    1
1992-05-08    1
1979-03-22    1
1966-12-30    1
1994-12-23    1
Name: count, Length: 4492, dtype: int64


Value Counts sex:


sex
M    1995
F    1990
N    1015
Name: count, dtype: int64


Value Counts contact_info:


contact_info
{"mailing_address": "303 N Timber Key, Irondale, Wisconsin, 84736", "email": "annabelle_avery9376@woohoo.com"}      1
{"mailing_address": "768 Silent Skyway, Impact, Idaho, 90270", "email": "orval_devos4502@coldmail.com"}             1
{"mailing_address": "409 SW First Pike, Leary, North Carolina, 43428", "email": "rachel886@woohoo.com"}             1
{"mailing_address": "554 Broad Rose, Flagler Beach, Florida, 65799", "email": "allan8306@inlook.com"}               1
{"mailing_address": "872 Wagon Land, Guthrie Center, Colorado, 86498", "email": "isidro3025@woohoo.com"}            1
                                                                                                                   ..
{"mailing_address": "470 Essex Curve, Copan, Mississippi, 86309", "email": "cleopatra_singleton7791@inlook.com"}    1
{"mailing_address": "162 Iron Chase, Campbell Station, Oklahoma, 78795", "email": "patrick6416@inlook.com"}         1
{"mailing_address": "889 Mountain Alley, Qu


Value Counts job_id:


job_id
2.0    706
1.0    693
7.0    680
3.0    675
4.0    671
5.0    660
6.0    657
8.0    253
Name: count, dtype: int64


Value Counts num_course_taken:


num_course_taken
5.0     341
12.0    332
2.0     312
15.0    309
10.0    306
7.0     303
13.0    297
0.0     296
8.0     291
11.0    289
4.0     285
6.0     282
14.0    280
3.0     279
1.0     279
9.0     268
Name: count, dtype: int64


Value Counts current_career_path_id:


current_career_path_id
5.0     476
3.0     469
10.0    460
1.0     459
6.0     454
2.0     450
7.0     449
9.0     441
8.0     437
4.0     434
Name: count, dtype: int64


Value Counts time_spent_hrs:


time_spent_hrs
5.93     8
17.47    8
11.9     7
7.05     7
2.91     7
        ..
27.53    1
8.07     1
27.51    1
29.66    1
23.54    1
Name: count, Length: 2192, dtype: int64


Table: cademycode_courses


Unnamed: 0,career_path_id,career_path_name,hours_to_complete
0,1,data scientist,20
1,2,data engineer,20
2,3,data analyst,12
3,4,software engineering,25
4,5,backend engineer,18



DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 3 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   career_path_id     10 non-null     int64 
 1   career_path_name   10 non-null     object
 2   hours_to_complete  10 non-null     int64 
dtypes: int64(2), object(1)
memory usage: 372.0+ bytes


None


DataFrame Describe:


Unnamed: 0,career_path_id,hours_to_complete
count,10.0,10.0
mean,5.5,21.9
std,3.02765,6.707376
min,1.0,12.0
25%,3.25,18.5
50%,5.5,20.0
75%,7.75,26.5
max,10.0,35.0



Value Counts career_path_id:


career_path_id
1     1
2     1
3     1
4     1
5     1
6     1
7     1
8     1
9     1
10    1
Name: count, dtype: int64


Value Counts career_path_name:


career_path_name
data scientist               1
data engineer                1
data analyst                 1
software engineering         1
backend engineer             1
frontend engineer            1
iOS developer                1
android developer            1
machine learning engineer    1
ux/ui designer               1
Name: count, dtype: int64


Value Counts hours_to_complete:


hours_to_complete
20    3
27    2
12    1
25    1
18    1
35    1
15    1
Name: count, dtype: int64


Table: cademycode_student_jobs


Unnamed: 0,job_id,job_category,avg_salary
0,1,analytics,86000
1,2,engineer,101000
2,3,software developer,110000
3,4,creative,66000
4,5,financial services,135000



DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13 entries, 0 to 12
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   job_id        13 non-null     int64 
 1   job_category  13 non-null     object
 2   avg_salary    13 non-null     int64 
dtypes: int64(2), object(1)
memory usage: 444.0+ bytes


None


DataFrame Describe:


Unnamed: 0,job_id,avg_salary
count,13.0,13.0
mean,4.384615,89230.769231
std,2.662657,34727.879881
min,0.0,10000.0
25%,3.0,66000.0
50%,4.0,86000.0
75%,6.0,110000.0
max,9.0,135000.0



Value Counts job_id:


job_id
3    2
4    2
5    2
1    1
2    1
6    1
7    1
8    1
9    1
0    1
Name: count, dtype: int64


Value Counts job_category:


job_category
software developer    2
creative              2
financial services    2
analytics             1
engineer              1
education             1
HR                    1
student               1
healthcare            1
other                 1
Name: count, dtype: int64


Value Counts avg_salary:


avg_salary
110000    2
66000     2
135000    2
80000     2
86000     1
101000    1
61000     1
10000     1
120000    1
Name: count, dtype: int64

### insight from cademycode_students

1. Data Types: All columns, except uuid are object type even though they keep different types of data (date, numbers, etc.).
2. Null Data: Columns job_id, num_course_taken, current_career_path_id and time_spent_hrs have null data.
3. Doubled data: Two names (Melvin Felt and Robbie Davies) have been counted twice.
4. Gender Distribution: We have the same proportion of men (M) and women (F) in the courses, with a significant number of N (non specified).
5. Contact Information: Column contact_info has a mailing_address and an email.
6. Job IDs: Distinct values for job_id, probably related to cademycode_student_jobs table.
7. Number of courses taken: num_course_taken indicates the number of courses users took.
8. Time Spent: time_spent_hrs shows time spent from users in courses.

### insight from cademycode_courses

1. Table Data: There are 10 courses added, with the necessarty hours to complete each.
2. Data Integrity: Values are complete and coherent, as there is no apparent issues.

### insight from cademycode_student_jobs

1. Category and Pay: Table contains job category and average pay for each profession.
2. Pay Statistics: Average pay is 89,230.77, with a standard deviation of 34,727.88, with a minimum of 10,000.00 and a maximum of 135,000.00
3. Doubled Values: There are doubled values for job_id.

### After this we need to treat the data

1. Data Tidying: 

a. Correct data type in columns for cademycode_students.

b. Treat null values for columns job_id, num_course_taken, current_career_path_id and time_spent_hrs.
c. Solve doubled data in names and job id.


2. Data Transformation:

a. Dividing contact_info into separate columns for mailing_address and email.

b. Join related tables (i.e., joining job_id to cademycode_students and cademycode_student_jobs).

3. Final Table Creation:

Build one unique final table, with joined data tidy data.

In [66]:
df_students = df['cademycode_students'].copy()

df_students['dob'] = pd.to_datetime(df_students['dob'], errors='coerce')
df_students['job_id'] = pd.to_numeric(df_students['job_id'], errors='coerce')
df_students['num_course_taken'] = pd.to_numeric(df_students['num_course_taken'], errors='coerce')
df_students['current_career_path_id'] = pd.to_numeric(df_students['current_career_path_id'], errors='coerce')
df_students['time_spent_hrs'] = pd.to_numeric(df_students['time_spent_hrs'], errors='coerce')


df_students.dtypes


uuid                               int64
name                              object
dob                       datetime64[ns]
sex                               object
contact_info                      object
job_id                           float64
num_course_taken                 float64
current_career_path_id           float64
time_spent_hrs                   float64
dtype: object

In [36]:
df_students.loc[:,'job_id'] = df_students['job_id'].fillna(0)
df_students.loc[:, 'num_course_taken'] = df_students['num_course_taken'].fillna(0)
df_students.loc[:, 'current_career_path_id'] = df_students['current_career_path_id'].fillna(df_students['current_career_path_id'].median())
df_students.loc[:, 'time_spent_hrs'] = df_students['time_spent_hrs'].fillna(df_students['time_spent_hrs'].median())

df_students.isnull().sum()

uuid                      0
name                      0
dob                       0
sex                       0
contact_info              0
job_id                    0
num_course_taken          0
current_career_path_id    0
time_spent_hrs            0
dtype: int64

In [67]:
import json

def extract_contact(contact_info):
    try:
        info = json.loads(contact_info)
        return pd.Series([info.get('mailing_address'), info.get('email')])
    except json.JSCONDecoderError:
        return pd.Series([None, None])

df_students[['Mailing_address', 'email']] = df_students['contact_info'].apply(extract_contact)

display(df_students.head())

Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs,Mailing_address,email
0,1,Annabelle Avery,1943-07-03,F,"{""mailing_address"": ""303 N Timber Key, Irondal...",7.0,6.0,1.0,4.99,"303 N Timber Key, Irondale, Wisconsin, 84736",annabelle_avery9376@woohoo.com
1,2,Micah Rubio,1991-02-07,M,"{""mailing_address"": ""767 Crescent Fair, Shoals...",7.0,5.0,8.0,4.4,"767 Crescent Fair, Shoals, Indiana, 37439",rubio6772@hmail.com
2,3,Hosea Dale,1989-12-07,M,"{""mailing_address"": ""P.O. Box 41269, St. Bonav...",7.0,8.0,8.0,6.74,"P.O. Box 41269, St. Bonaventure, Virginia, 83637",hosea_dale8084@coldmail.com
3,4,Mariann Kirk,1988-07-31,F,"{""mailing_address"": ""517 SE Wintergreen Isle, ...",6.0,7.0,9.0,12.31,"517 SE Wintergreen Isle, Lane, Arkansas, 82242",kirk4005@hmail.com
4,5,Lucio Alexander,1963-08-31,M,"{""mailing_address"": ""18 Cinder Cliff, Doyles b...",7.0,14.0,3.0,5.64,"18 Cinder Cliff, Doyles borough, Rhode Island,...",alexander9810@hmail.com


In [71]:
df_job = df['cademycode_student_jobs'].copy()
df_job_clean = df_job.drop_duplicates()
df_job_clean

Unnamed: 0,job_id,job_category,avg_salary
0,1,analytics,86000
1,2,engineer,101000
2,3,software developer,110000
3,4,creative,66000
4,5,financial services,135000
5,6,education,61000
6,7,HR,80000
7,8,student,10000
8,9,healthcare,120000
9,0,other,80000


In [73]:
df_courses = df['cademycode_courses'].copy()
df_courses

Unnamed: 0,career_path_id,career_path_name,hours_to_complete
0,1,data scientist,20
1,2,data engineer,20
2,3,data analyst,12
3,4,software engineering,25
4,5,backend engineer,18
5,6,frontend engineer,20
6,7,iOS developer,27
7,8,android developer,27
8,9,machine learning engineer,35
9,10,ux/ui designer,15


In [76]:
df_merge = pd.merge(df_students, df_job_clean, how='left', left_on= 'job_id', right_on= 'job_id')
df_merge_final = pd.merge(df_merge, df_courses, how='left', left_on= 'current_career_path_id', right_on= 'career_path_id')
df_merge_final

Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs,Mailing_address,email,job_category,avg_salary,career_path_id,career_path_name,hours_to_complete
0,1,Annabelle Avery,1943-07-03,F,"{""mailing_address"": ""303 N Timber Key, Irondal...",7.0,6.0,1.0,4.99,"303 N Timber Key, Irondale, Wisconsin, 84736",annabelle_avery9376@woohoo.com,HR,80000.0,1.0,data scientist,20.0
1,2,Micah Rubio,1991-02-07,M,"{""mailing_address"": ""767 Crescent Fair, Shoals...",7.0,5.0,8.0,4.40,"767 Crescent Fair, Shoals, Indiana, 37439",rubio6772@hmail.com,HR,80000.0,8.0,android developer,27.0
2,3,Hosea Dale,1989-12-07,M,"{""mailing_address"": ""P.O. Box 41269, St. Bonav...",7.0,8.0,8.0,6.74,"P.O. Box 41269, St. Bonaventure, Virginia, 83637",hosea_dale8084@coldmail.com,HR,80000.0,8.0,android developer,27.0
3,4,Mariann Kirk,1988-07-31,F,"{""mailing_address"": ""517 SE Wintergreen Isle, ...",6.0,7.0,9.0,12.31,"517 SE Wintergreen Isle, Lane, Arkansas, 82242",kirk4005@hmail.com,education,61000.0,9.0,machine learning engineer,35.0
4,5,Lucio Alexander,1963-08-31,M,"{""mailing_address"": ""18 Cinder Cliff, Doyles b...",7.0,14.0,3.0,5.64,"18 Cinder Cliff, Doyles borough, Rhode Island,...",alexander9810@hmail.com,HR,80000.0,3.0,data analyst,12.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4995,4996,Quentin van Harn,1967-07-07,N,"{""mailing_address"": ""591 Blue Berry, Coulee, I...",5.0,5.0,2.0,13.82,"591 Blue Berry, Coulee, Illinois, 65199",vanharn2778@woohoo.com,financial services,135000.0,2.0,data engineer,20.0
4996,4997,Alejandro van der Sluijs,1964-11-03,M,"{""mailing_address"": ""30 Iron Divide, Pewaukee ...",4.0,13.0,1.0,7.86,"30 Iron Divide, Pewaukee village, California, ...",alejandro4080@coldmail.com,creative,66000.0,1.0,data scientist,20.0
4997,4998,Brock Mckenzie,2004-11-25,M,"{""mailing_address"": ""684 Rustic Rest Avenue, C...",8.0,10.0,3.0,12.10,"684 Rustic Rest Avenue, Carmine, California, 5...",brock_mckenzie2025@inlook.com,student,10000.0,3.0,data analyst,12.0
4998,4999,Donnetta Dillard,1943-02-12,N,"{""mailing_address"": ""900 Indian Oval, Euclid, ...",3.0,6.0,5.0,14.86,"900 Indian Oval, Euclid, Iowa, 59683",dillard7526@inlook.com,software developer,110000.0,5.0,backend engineer,18.0


In [78]:
ori_length_df = len(df['cademycode_students'])
final_length_df = len(df_merge_final)

ori_length_df, final_length_df

(5000, 5000)

## Create the Output CSV

In [82]:
db_clean_con = sqlite3.connect('/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/clean_cademycode.db')
df_merge_final.to_sql('final_table', db_clean_con, if_exists= 'replace', index=False)
print('ok')

ok


In [83]:
df_db_final = pd.read_sql_query("SELECT * FROM final_table", db_clean_con)
df_db_final.to_csv('/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/final_output_subscriber_pipeline.csv', index=False)
print('oke csv saved')

oke csv saved


## Develop Unit Tests and Logs

In [84]:
import os, time

db_path = 'dev/cademycode.db'
last_mod_time = None

def is_database_updated():
    global last_mod_time
    current_mod_time = os.path.getmtime(db_path)
    if last_mod_time is None or current_mod_time > last_mod_time:
        last_mod_time = current_mod_time
        return True
    return False

In [85]:
import unittest

class TestDataCleaning(unittest.TestCase):
    def test_no_null_values(self):
        self.assertFalse(df_merge_final.isnull().values.any(), 'There are null values in final table')

    def test_correct_number_of_rows(self):
        original_length = len(df['cademycode_students'])
        final_length = len(df_merge_final)
        self.assertEqual(original_length, final_length, 'Number of lines differ after the join')

    def test_schema_consistency(self):
        original_schema = set(df['cademycode_students'].columns)
        final_schema = set(df_merge_final.columns)
        self.assertEqual(original_schema, final_schema, 'Final table schema differs from original')

In [87]:
import logging

logging.basicConfig(filename='/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/data_pipeline.log', level=logging.INFO, format='%(asctime)s:%(levelname)s:%(message)s')

def log_update(message):
    logging.info(message)

def log_error(message):
    logging.error(message)

In [88]:
changelog_path = '/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/changelog.txt'

def write_changelog(version, new_rows_count, missing_data_count):
    with open(changelog_path, 'a') as f:
        f.write(f"Version: {version}\n")
        f.write(f"New rows added: {new_rows_count}\n")
        f.write(f"Missing data count: {missing_data_count}\n")
        f.write("\n")

In [94]:
import unittest

logging.basicConfig(filename='/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/data_pipeline.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s')

def log_update(message):
    logging.info(message)

def log_error(message):
    logging.error(message)

def extract_contact_info(contact_info):
    try:
        info = json.loads(contact_info.replace("'", '"'))
        return pd.Series([info.get('mailing_address'), info.get('email')])
    except json.JSONDecodeError:
        return pd.Series([None, None])

def is_database_updated():
    global last_mod_time
    current_mod_time = os.path.getmtime(db_path)
    if last_mod_time is None or current_mod_time > last_mod_time:
        last_mod_time = current_mod_time
        return True
    return False

def write_changelog(version, new_rows_count, missing_data_count):
    with open(changelog_path, 'a') as f:
        f.write(f"Version: {version}\n")
        f.write(f"New rows added: {new_rows_count}\n")
        f.write(f"Missing data count: {missing_data_count}\n")
        f.write("\n")

try:
    db_path = '/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/cademycode.db'
    changelog_path = '/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/changelog.txt'
    last_mod_time = None
    
    if is_database_updated():
        log_update("Updated database. Running the pipeline...")

        con = sqlite3.connect(db_path)
        print('Database connection established successfully.')

        tables = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table';", con)
        table_names = tables['name'].tolist()
        df = {table: pd.read_sql_query(f"SELECT * FROM {table}", con) for table in table_names}

        students_df = df['cademycode_students'].copy()
        students_df['dob'] = pd.to_datetime(students_df['dob'], errors='coerce')
        students_df['job_id'] = pd.to_numeric(students_df['job_id'], errors='coerce')
        students_df['num_course_taken'] = pd.to_numeric(students_df['num_course_taken'], errors='coerce')
        students_df['current_career_path_id'] = pd.to_numeric(students_df['current_career_path_id'], errors='coerce')
        students_df['time_spent_hrs'] = pd.to_numeric(students_df['time_spent_hrs'], errors='coerce')
        students_df.loc[:, 'job_id'] = students_df['job_id'].fillna(0)
        students_df.loc[:, 'current_career_path_id'] = students_df['current_career_path_id'].fillna(0)
        students_df.loc[:, 'num_course_taken'] = students_df['num_course_taken'].fillna(students_df['num_course_taken'].median())
        students_df.loc[:, 'time_spent_hrs'] = students_df['time_spent_hrs'].fillna(students_df['time_spent_hrs'].median())
        students_df[['mailing_address', 'email']] = students_df['contact_info'].apply(extract_contact_info)
        students_df.drop(columns=['contact_info'], inplace=True)

        jobs_df_cleaned = df['cademycode_student_jobs'].drop_duplicates()

        merged_df_cleaned = pd.merge(students_df, jobs_df_cleaned, how='left', left_on='job_id', right_on='job_id')
        final_df_cleaned = pd.merge(merged_df_cleaned, df['cademycode_courses'], how='left', left_on='current_career_path_id', right_on='career_path_id')

        final_df_cleaned = final_df_cleaned.assign(
            career_path_id=final_df_cleaned['career_path_id'].fillna(0),
            career_path_name=final_df_cleaned['career_path_name'].fillna('Unknown'),
            hours_to_complete=final_df_cleaned['hours_to_complete'].fillna(0)
        )

        clean_db_conn = sqlite3.connect('/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/clean_cademycode.db')
        final_df_cleaned.to_sql('final_table', clean_db_conn, if_exists='replace', index=False)
        final_df_cleaned.to_csv('/Users/rismansudarmaji/Downloads/subscriber-pipeline-starter-kit/dev/final_output_subscriber_pipeline.csv', index=False)

        log_update("Pipeline executed successfully.")

        original_length = len(df['cademycode_students'])
        final_length = len(final_df_cleaned)

        class TestDataCleaning(unittest.TestCase):
            def test_no_null_values(self):
                self.assertFalse(final_df_cleaned.isnull().values.any(), "There are null values in the final table")

            def test_correct_number_of_rows(self):
                self.assertEqual(original_length, final_length, "The number of rows differs after the merges")

            def test_schema_consistency(self):
                original_schema = set(df['cademycode_students'].columns)
                final_schema = set(final_df_cleaned.columns)
                original_schema.discard('contact_info')
                original_schema.update(['mailing_address', 'email'])
                self.assertTrue(original_schema.issubset(final_schema), "The final table schema does not include all original columns")

        if __name__ == '__main__':
            unittest.main(argv=['first-arg-is-ignored'], exit=False)

        new_rows_count = len(final_df_cleaned) - original_length
        missing_data_count = final_df_cleaned.isnull().sum().sum()
        write_changelog("1.0.0", new_rows_count, missing_data_count)

    else:
        log_update("No updates to the database. Pipeline not executed.")

except Exception as e:
    log_error(f"Error running the pipeline: {e}")
    raise

Database connection established successfully.


...
----------------------------------------------------------------------
Ran 3 tests in 0.006s

OK


### Create a Bash Script