#Data Pipelines with Python Project

##Project Brief

Telecom companies often have to extract billing data from multiple CSV files generated from various systems and transform it into a structured format for analysis and revenue reporting. 

This process can be time-consuming, error-prone, and hinder decision-making. Manually analyzing and reconciling billing data from different sources is a tedious task and often leads to
delays in generating revenue reports. 

Thus, there is a need for an automated data pipeline that can extract billing data from multiple sources and transform it into a structured format for efficient analysis and revenue reporting.

##Extract the data

In [1]:
# import libraries
import pandas as pd
import numpy as np
import os

In [2]:
# Define the paths to the CSV files
dataset1_path = "dataset1.csv"
dataset2_path = "dataset2.csv"
dataset3_path = "dataset3.csv"

# Define the output path for the transformed data
output_path = "output.csv"

###Transform the data: Apply any necessary transformations on the data, such as data type conversion,

In [3]:
# Define the data types for each column in the datasets
dataset1_dtypes = {
    "customer_id": np.int64,
    # "date_of_purchase": np.datetime64,
    "total_amount_billed": np.float64,
    "payment_status": "category",
    "payment_method": "category",
    "promo_code": str,
    "country_of_purchase": "category"
}

dataset2_dtypes = {
    "customer_id": np.int64,
    # "date_of_payment": np.datetime64,
    "amount_paid": np.float64,
    "payment_method": "category",
    "payment_status": "category",
    "late_payment_fee": np.float64,
    "country_of_payment": "category"
}

dataset3_dtypes = {
    "customer_id": np.int64,
    # "date_of_refund": np.datetime64,
    "refund_amount": np.float64,
    "reason_for_refund": str,
    "country_of_refund": "category"
}

# Define the date parsers for each dataset
dataset1_date_parser = lambda x: pd.to_datetime(x, format="%m/%d/%Y", errors="coerce")
dataset2_date_parser = lambda x: pd.to_datetime(x, format="%m/%d/%Y", errors="coerce")
dataset3_date_parser = lambda x: pd.to_datetime(x, format="%m/%d/%Y", errors="coerce")




In [4]:
# Read the datasets into pandas dataframes
dataset1 = pd.read_csv(dataset1_path, dtype=dataset1_dtypes, parse_dates=["date_of_purchase"], date_parser=dataset1_date_parser)
dataset2 = pd.read_csv(dataset2_path, dtype=dataset2_dtypes, parse_dates=["date_of_payment"], date_parser=dataset2_date_parser)
dataset3 = pd.read_csv(dataset3_path, dtype=dataset3_dtypes, parse_dates=["date_of_refund"], date_parser=dataset3_date_parser)



#Clean the data

In [5]:
# Clean the data by replacing missing values with NaN
dataset1.replace(["", " ", "-"], np.nan, inplace=True)
dataset2.replace(["", " ", "-"], np.nan, inplace=True)
dataset3.replace(["", " ", "-"], np.nan, inplace=True)



#Merge the datasets:

In [6]:
# prepare the data for merging by checking their column names to determine which columns can be merged
print(f'Dataset 1\n{dataset1.columns}')
print(f'Dataset 2\n{dataset1.columns}')
print(f'Dataset 3\n{dataset1.columns}')

Dataset 1
Index(['customer_id', 'date_of_purchase', 'total_amount_billed',
       'payment_status', 'payment_method', 'promo_code',
       'country_of_purchase'],
      dtype='object')
Dataset 2
Index(['customer_id', 'date_of_purchase', 'total_amount_billed',
       'payment_status', 'payment_method', 'promo_code',
       'country_of_purchase'],
      dtype='object')
Dataset 3
Index(['customer_id', 'date_of_purchase', 'total_amount_billed',
       'payment_status', 'payment_method', 'promo_code',
       'country_of_purchase'],
      dtype='object')


In [7]:
# Drop the index columns
dataset1 = dataset1.reset_index(drop=True)
dataset2 = dataset2.reset_index(drop=True)
dataset3 = dataset3.reset_index(drop=True)


# Rename the columns
dataset1 = dataset1.rename(columns={"date_of_purchase": "date", "country_of_purchase": "country"})
dataset2 = dataset2.rename(columns={"date_of_payment": "date", "country_of_payment": "country"})
dataset3 = dataset3.rename(columns={"date_of_refund": "date", "country_of_refund": "country"})



In [8]:
# Merge the datasets by Customer ID, Date of purchase/payment/refund, and country of purchase/payment/refund
merged_data = pd.merge(dataset1, dataset2, on=['customer_id','date','country'],
                          how='outer', suffixes = ('_purchase', '_payment'))
merged_data = pd.merge(merged_data, dataset3, on=['customer_id','date','country'], 
                          how='outer', suffixes = ('_payment', '_refund'))

merged_data.head()
     


Unnamed: 0,customer_id,date,total_amount_billed,payment_status_purchase,payment_method_purchase,promo_code,country,amount_paid,payment_method_payment,payment_status_payment,late_payment_fee,refund_amount,reason_for_refund
0,101,2021-04-01,100.0,paid,credit card,PROMO1,USA,100.0,credit card,paid,0.0,,
1,102,2021-04-02,200.0,paid,bank transfer,PROMO2,USA,,,,,,
2,103,2021-04-02,50.0,overdue,credit card,,UK,,,,,,
3,104,2021-04-03,75.0,disputed,e-wallet,PROMO3,UK,,,,,,
4,105,2021-04-04,125.0,paid,credit card,PROMO4,USA,,,,,,


##Clean the data further

In [14]:
# Clean the data further to prepair it for anlysis 

# Replace missing values in 'amount_paid' with 0
merged_data['amount_paid'].fillna(value=0, inplace=True)

# Replace missing values in 'late_payment_fee' with 0
merged_data['late_payment_fee'].fillna(value=0, inplace=True)


# Replace missing values in 'refund_amount' with 0
merged_data['refund_amount'].fillna(value=0, inplace=True)

In [15]:
# Calculate the revenue by subtracting the refund amount and late payment fee from the total amount billed
merged_data["revenue"] = merged_data["total_amount_billed"] - merged_data["refund_amount"] - merged_data["late_payment_fee"]

#Load the data to an external file

In [16]:
# Load the transformed data into a CSV file
merged_data.to_csv(output_path, index=False)

# Print the first 5 rows of the transformed data
merged_data.head()

Unnamed: 0,customer_id,date,total_amount_billed,payment_status_purchase,payment_method_purchase,promo_code,country,amount_paid,payment_method_payment,payment_status_payment,late_payment_fee,refund_amount,reason_for_refund,revenue
0,101,2021-04-01,100.0,paid,credit card,PROMO1,USA,100.0,credit card,paid,0.0,0.0,,100.0
1,102,2021-04-02,200.0,paid,bank transfer,PROMO2,USA,0.0,,,0.0,0.0,,200.0
2,103,2021-04-02,50.0,overdue,credit card,,UK,0.0,,,0.0,0.0,,50.0
3,104,2021-04-03,75.0,disputed,e-wallet,PROMO3,UK,0.0,,,0.0,0.0,,75.0
4,105,2021-04-04,125.0,paid,credit card,PROMO4,USA,0.0,,,0.0,0.0,,125.0


#Automate the process:

In [None]:
# We can auotmate the process by setting up a cron job to execute automatically as desired 
# Here's an example of how to set up a cron job to run the data pipeline script every day at 3:00 AM:

0 3 * * * /path/to/python /path/to/data_pipeline.py

In [None]:
# We can monitor the pipeline by:
# Monitor logs: Logs can provide detailed information on pipeline activity and errors. 
# Monitor the pipeline logs regularly to detect and resolve any issues quickly.