Data Pipelines with Python Project-simeon omeda
---



#Project Deliverable
Telecom companies often have to extract billing data from multiple CSV files generated from
various systems and transform it into a structured format for analysis and revenue reporting.
This process can be time-consuming, error-prone, and hinder decision-making. Manually
analyzing and reconciling billing data from different sources is a tedious task and often leads to
delays in generating revenue reports. Thus, there is a need for an automated data pipeline that
can extract billing data from multiple sources and transform it into a structured format for
efficient analysis and revenue reporting.
# Guidelines
Here are some guidelines and hints to help you create the data pipeline:
● Determine the requirements: First, you need to define the requirements of the data
pipeline, including the source and destination of the data, the type of data that needs to
be processed, the transformations that need to be applied, and the output format.
● Extract the data: Use Python to read the CSV files and extract the data.
● Clean the data: Perform data cleaning on the extracted data to remove any missing
values and outliers. For example, you can replace missing values with an appropriate
value or remove them altogether.
● Transform the data: Apply any necessary transformations on the data, such as data
type conversion, data aggregation, and data filtering, to prepare the data for analysis.
● Merge the datasets: Join the different datasets into a single dataset that can be used for
analysis.
● Load the data: Load the transformed data into a database or a file, such as a CSV file,
that can be easily analyzed.
● Automate the process: Automate the data pipeline by scheduling it to run at a specific
time, such as daily or weekly so that it can update the analysis data automatically.
● Test the pipeline: Test the data pipeline to ensure it produces the correct results. This
can be done by comparing the results with the expected output or using a test dataset.
● Optimize the pipeline: Optimize the data pipeline to improve performance and reduce
errors. This can be done by optimizing the code, parallel processing, and reducing the
data size.
● Monitor the pipeline: Monitor the data pipeline to ensure that it runs smoothly and that
there are no errors or issues.
#Datasets
Here are three sample datasets (https://bit.ly/416WE1X) with billing data that can be joined. The
datasets contain some missing values and outliers:
Dataset 1:
● Customer ID (numeric)
● Date of purchase (MM/DD/YYYY)
● Total amount billed (numeric)
● Payment status (categorical - paid, overdue, disputed)
● Payment method (categorical - credit card, bank transfer, e-wallet)
● Promo code (text)
● Country of purchase (categorical)
Dataset 2:
● Customer ID (numeric)
● Date of payment (MM/DD/YYYY)
● Amount paid (numeric)
● Payment method (categorical - credit card, bank transfer, e-wallet)
● Payment status (categorical - paid, overdue, disputed)
● Late payment fee (numeric)
● Country of payment (categorical)
Dataset 3:
● Customer ID (numeric)
● Date of refund (MM/DD/YYYY)
● Refund amount (numeric)
● Reason for refund (text)
● Country of refund (categorical)
Notes:
1. The datasets can be joined using Customer ID, Date of purchase/payment/refund, and
country of purchase/payment/refund as keys.
2. The datasets may contain missing values and outliers for some fields, such as the total
amount billed or refund amount.
3. The payment status may be missing or incomplete for some of the transactions.
4. The promo code field may be empty for some of the purchases.
5. The reason for the refund may be missing for some of the refund transactions.

**Extract the data**: Use Python to read the CSV files and extract the data.


In [2]:
import pandas as pd
import os

In [3]:

# Load the datasets
df1 = pd.read_csv('dataset1.csv')
df2 = pd.read_csv('dataset2.csv')
df3 = pd.read_csv('dataset3.csv')

In [4]:
df1.head()

Unnamed: 0,customer_id,date_of_purchase,total_amount_billed,payment_status,payment_method,promo_code,country_of_purchase
0,101,04/01/2021,100,paid,credit card,PROMO1,USA
1,102,04/02/2021,200,paid,bank transfer,PROMO2,USA
2,103,04/02/2021,50,overdue,credit card,,UK
3,104,04/03/2021,75,disputed,e-wallet,PROMO3,UK
4,105,04/04/2021,125,paid,credit card,PROMO4,USA


In [None]:
df2.head()

In [None]:
df3.head()

**Clean the data:** Perform data cleaning on the extracted data to remove any missing
values and outliers. For example, you can replace missing values with an appropriate
value or remove them altogether.


In [5]:
df1.isnull().sum()

customer_id            0
date_of_purchase       0
total_amount_billed    0
payment_status         0
payment_method         0
promo_code             3
country_of_purchase    0
dtype: int64

In [6]:
df1= df1.dropna()


In [7]:
df1.isnull().sum()

customer_id            0
date_of_purchase       0
total_amount_billed    0
payment_status         0
payment_method         0
promo_code             0
country_of_purchase    0
dtype: int64

In [8]:
df3.isnull().sum()

customer_id          0
date_of_refund       0
refund_amount        0
reason_for_refund    0
country_of_refund    0
dtype: int64

Transform the data: Apply any necessary transformations on the data, such as data
type conversion, data aggregation, and data filtering, to prepare the data for analysis.
● Merge the datasets: Join the different datasets into a single dataset that can be used for
analysis.


In [9]:
df_merged = df1.merge(df2,on = "customer_id").merge(df3, on="customer_id" )
df_merged
     

Unnamed: 0,customer_id,date_of_purchase,total_amount_billed,payment_status_x,payment_method_x,promo_code,country_of_purchase,date_of_payment,amount_paid,payment_method_y,payment_status_y,late_payment_fee,country_of_payment,date_of_refund,refund_amount,reason_for_refund,country_of_refund
0,101,04/01/2021,100,paid,credit card,PROMO1,USA,04/01/2021,100,credit card,paid,0,USA,04/03/2021,100,product not as described,USA
1,102,04/02/2021,200,paid,bank transfer,PROMO2,USA,04/03/2021,200,bank transfer,paid,0,USA,04/06/2021,200,defective product,USA
2,104,04/03/2021,75,disputed,e-wallet,PROMO3,UK,04/04/2021,50,e-wallet,overdue,0,UK,04/08/2021,50,product not received,UK
3,105,04/04/2021,125,paid,credit card,PROMO4,USA,04/05/2021,125,credit card,paid,0,USA,04/09/2021,25,product not as described,USA
4,107,04/06/2021,75,overdue,e-wallet,PROMO5,USA,04/07/2021,75,e-wallet,overdue,20,USA,04/12/2021,150,change of mind,USA
5,108,04/06/2021,100,overdue,bank transfer,PROMO6,USA,04/07/2021,100,bank transfer,overdue,30,USA,04/13/2021,75,product not as described,USA
6,110,04/07/2021,25,overdue,credit card,PROMO7,USA,04/08/2021,25,credit card,paid,0,USA,04/14/2021,50,product not received,USA
7,111,04/08/2021,175,paid,e-wallet,PROMO8,UK,04/09/2021,175,e-wallet,paid,0,UK,04/15/2021,175,defective product,UK
8,112,04/08/2021,200,paid,bank transfer,PROMO9,USA,04/10/2021,200,bank transfer,paid,0,USA,04/16/2021,200,change of mind,USA
9,113,04/09/2021,50,disputed,credit card,PROMO10,USA,04/10/2021,50,credit card,disputed,0,USA,04/16/2021,50,product not as described,USA


● Load the data: Load the transformed data into a database or a file, such as a CSV file,
that can be easily analyzed.


In [10]:

df_merged.to_csv("merged.csv", index=False)

Automate the process: Automate the data pipeline by scheduling it to run at a specific
time, such as daily or weekly so that it can update the analysis data automatically.


In [13]:
import os.path
from os import path
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import psycopg2

In [12]:
pip install apache-airflow

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting apache-airflow
  Downloading apache_airflow-2.5.2-py3-none-any.whl (11.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.6/11.6 MB[0m [31m92.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting mdit-py-plugins>=0.3.0
  Downloading mdit_py_plugins-0.3.5-py3-none-any.whl (52 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.1/52.1 KB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting flask-login>=0.6.2
  Downloading Flask_Login-0.6.2-py3-none-any.whl (17 kB)
Collecting flask-session>=0.4.0
  Downloading Flask_Session-0.4.0-py2.py3-none-any.whl (7.5 kB)
Collecting argcomplete>=1.10
  Downloading argcomplete-3.0.3-py3-none-any.whl (39 kB)
Collecting pathspec~=0.9.0
  Downloading pathspec-0.9.0-py2.py3-none-any.whl (31 kB)
Collecting pendulum>=2.0
  Downloading pendulum-2.1.2-cp39-cp39-manylinux1_x86_64.whl (155 kB)
[2K     [90m━━

In [None]:
default_args = {
    'owner': 'telecom',
    'depends_on_past': False,
    'start_date': datetime(2022, 2, 18),
    'email': ['someda@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

run 'crontab -e'

0 0 * * 7 /path/to/this/pipelinescript.py 

● **Test the pipeline**: Test the data pipeline to ensure it produces the correct results. This
can be done by comparing the results with the expected output or using a test dataset.

● **Optimize the pipeline**: Optimize the data pipeline to improve performance and reduce
errors. This can be done by optimizing the code, parallel processing, and reducing the
data size.

● **Monitor the pipeline:** Monitor the data pipeline to ensure that it runs smoothly and that
there are no errors or issues.
