# Data Pipelines with Python

I am developing an automated data pipeline that can extract billing data from multiple sources and transform it into a structured format for efficient analysis and revenue reporting.

## Pre-requisites

In [1]:
# Pre-requisite 1
# ---
# Importing pandas library for data manipulation
import pandas as pd
# Importing numpy library for scientific computations
import numpy as np

## 1. Data Exploration

### Load and Review the datasets

In [22]:
# Dataset url = https://bit.ly/416WE1X
# Dataset1 - subscriptions
subscription_df = pd.read_csv('https://raw.githubusercontent.com/wambasisamuel/DE_Week07_Monday/main/dataset1.csv')
subscription_df.head(5)

Unnamed: 0,customer_id,date_of_purchase,total_amount_billed,payment_status,payment_method,promo_code,country_of_purchase
0,101,04/01/2021,100,paid,credit card,PROMO1,USA
1,102,04/02/2021,200,paid,bank transfer,PROMO2,USA
2,103,04/02/2021,50,overdue,credit card,,UK
3,104,04/03/2021,75,disputed,e-wallet,PROMO3,UK
4,105,04/04/2021,125,paid,credit card,PROMO4,USA


In [27]:
# Dataset2 - Payments
payment_df = pd.read_csv('https://raw.githubusercontent.com/wambasisamuel/DE_Week07_Monday/main/dataset2.csv')
payment_df.head(5)

Unnamed: 0,customer_id,date_of_payment,amount_paid,payment_method,payment_status,late_payment_fee,country_of_payment
0,101,04/01/2021,100,credit card,paid,0,USA
1,102,04/03/2021,200,bank transfer,paid,0,USA
2,103,04/03/2021,75,credit card,paid,10,UK
3,104,04/04/2021,50,e-wallet,overdue,0,UK
4,105,04/05/2021,125,credit card,paid,0,USA


In [24]:
# Dataset3 - Refunds
refund_df = pd.read_csv('https://raw.githubusercontent.com/wambasisamuel/DE_Week07_Monday/main/dataset3.csv')
refund_df.head(5)

Unnamed: 0,customer_id,date_of_refund,refund_amount,reason_for_refund,country_of_refund
0,101,04/03/2021,100,product not as described,USA
1,102,04/06/2021,200,defective product,USA
2,103,04/07/2021,75,change of mind,UK
3,104,04/08/2021,50,product not received,UK
4,105,04/09/2021,25,product not as described,USA


## 2. Data Preparation

### Get column data types

In [30]:
# Function that prints the data types of columns for a given dataframe
def get_datatypes(df):
  df_name =[x for x in globals() if globals()[x] is df][0]
  print("\n" + df_name)
  print("=================")
  print(df.dtypes)

df_list = [subscription_df, payment_df, refund_df]

for df in df_list:
  get_datatypes(df)


subscription_df
customer_id             int64
date_of_purchase       object
total_amount_billed     int64
payment_status         object
payment_method         object
promo_code             object
country_of_purchase    object
dtype: object

df
customer_id            int64
date_of_payment       object
amount_paid            int64
payment_method        object
payment_status        object
late_payment_fee       int64
country_of_payment    object
dtype: object

refund_df
customer_id           int64
date_of_refund       object
refund_amount         int64
reason_for_refund    object
country_of_refund    object
dtype: object


### Missing Values

In [41]:
# Function that prints the sum of missing values per columns for a given dataframe
def get_nulls(df):
  df_name =[x for x in globals() if globals()[x] is df][0]
  print("\n" + df_name)
  print("=================\n")
  print(df.isna().sum())

for df in df_list:
  get_nulls(df)


subscription_df

customer_id            0
date_of_purchase       0
total_amount_billed    0
payment_status         0
payment_method         0
promo_code             3
country_of_purchase    0
dtype: int64

df

customer_id           0
date_of_payment       0
amount_paid           0
payment_method        0
payment_status        0
late_payment_fee      0
country_of_payment    0
dtype: int64

refund_df

customer_id          0
date_of_refund       0
refund_amount        0
reason_for_refund    0
country_of_refund    0
dtype: int64


### Duplicate data

In [32]:
# Function that prints number of duplicate records
def get_duplicates(df):
  df_name =[x for x in globals() if globals()[x] is df][0]
  print("\n" + df_name)
  print("=================\n")
  print(sum(df.duplicated()))

for df in df_list:
  get_duplicates(df)


subscription_df

0

df

0

refund_df

0


There are missing values for the Promo_code

## 3. Data Transformation 

### Date Formatting

In [54]:
# Convert the date/time columns
subscription_df['date_of_purchase'] = pd.to_datetime(subscription_df['date_of_purchase'], infer_datetime_format=True)
payment_df['date_of_payment'] = pd.to_datetime(payment_df['date_of_payment'], infer_datetime_format=True)
refund_df['date_of_refund'] = pd.to_datetime(refund_df['date_of_refund'], infer_datetime_format=True)

### Merging the Datasets

In [64]:
# Merge the datasets
subscription_payment_merge = pd.merge(left=subscription_df, right=payment_df, how='left', left_on=['customer_id','payment_status','payment_method'], right_on= ['customer_id','payment_status','payment_method'])
final_merged_df = pd.merge(left=subscription_payment_merge, right=refund_df, how='left', left_on=['customer_id'], right_on= ['customer_id'])

final_merged_df

Unnamed: 0,customer_id,date_of_purchase,total_amount_billed,payment_status,payment_method,promo_code,country_of_purchase,date_of_payment_x,amount_paid,late_payment_fee,country_of_payment,date_of_refund,refund_amount,reason_for_refund,country_of_refund,date_of_payment_y
0,101,2021-01-04,100,paid,credit card,PROMO1,USA,2021-01-04,100.0,0.0,USA,2021-03-04,100,product not as described,USA,2021-03-04
1,102,2021-02-04,200,paid,bank transfer,PROMO2,USA,2021-03-04,200.0,0.0,USA,2021-06-04,200,defective product,USA,2021-06-04
2,103,2021-02-04,50,overdue,credit card,,UK,NaT,,,,2021-07-04,75,change of mind,UK,2021-07-04
3,104,2021-03-04,75,disputed,e-wallet,PROMO3,UK,NaT,,,,2021-08-04,50,product not received,UK,2021-08-04
4,105,2021-04-04,125,paid,credit card,PROMO4,USA,2021-05-04,125.0,0.0,USA,2021-09-04,25,product not as described,USA,2021-09-04
5,106,2021-05-04,150,paid,credit card,,UK,2021-06-04,150.0,0.0,UK,2021-11-04,125,defective product,UK,2021-11-04
6,107,2021-06-04,75,overdue,e-wallet,PROMO5,USA,2021-07-04,75.0,20.0,USA,2021-12-04,150,change of mind,USA,2021-12-04
7,108,2021-06-04,100,overdue,bank transfer,PROMO6,USA,2021-07-04,100.0,30.0,USA,2021-04-13,75,product not as described,USA,2021-04-13
8,109,2021-07-04,50,paid,bank transfer,,UK,2021-08-04,50.0,0.0,UK,2021-04-13,100,defective product,UK,2021-04-13
9,110,2021-07-04,25,overdue,credit card,PROMO7,USA,NaT,,,,2021-04-14,50,product not received,USA,2021-04-14


### Unpaid Bills

In [65]:
final_merged_df = final_merged_df[final_merged_df['payment_status'] != 'paid']
final_merged_df

Unnamed: 0,customer_id,date_of_purchase,total_amount_billed,payment_status,payment_method,promo_code,country_of_purchase,date_of_payment_x,amount_paid,late_payment_fee,country_of_payment,date_of_refund,refund_amount,reason_for_refund,country_of_refund,date_of_payment_y
2,103,2021-02-04,50,overdue,credit card,,UK,NaT,,,,2021-07-04,75,change of mind,UK,2021-07-04
3,104,2021-03-04,75,disputed,e-wallet,PROMO3,UK,NaT,,,,2021-08-04,50,product not received,UK,2021-08-04
6,107,2021-06-04,75,overdue,e-wallet,PROMO5,USA,2021-07-04,75.0,20.0,USA,2021-12-04,150,change of mind,USA,2021-12-04
7,108,2021-06-04,100,overdue,bank transfer,PROMO6,USA,2021-07-04,100.0,30.0,USA,2021-04-13,75,product not as described,USA,2021-04-13
9,110,2021-07-04,25,overdue,credit card,PROMO7,USA,NaT,,,,2021-04-14,50,product not received,USA,2021-04-14
12,113,2021-09-04,50,disputed,credit card,PROMO10,USA,2021-10-04,50.0,0.0,USA,2021-04-16,50,product not as described,USA,2021-04-16
14,115,2021-10-04,75,overdue,e-wallet,PROMO12,UK,2021-12-04,75.0,15.0,UK,2021-04-18,75,product not received,UK,2021-04-18


## 4. Data Loading

### Output to csv file

In [66]:
final_merged_df.to_csv('unpaid_bills.csv', header=True, index=False)

## 5. Automation

I will configure a cronjob to run the notebook everydat at 4am:

0 4 * * * /path/to/notebook.ipynb