# Project -- Data Pipelines with **Python**

## Project Deliverables



Telecom companies often have to extract billing data from multiple CSV files generated from
various systems and transform it into a structured format for analysis and revenue reporting.
This process can be time-consuming, error-prone, and hinder decision-making. Manually
analyzing and reconciling billing data from different sources is a tedious task and often leads to
delays in generating revenue reports.<br><br>Thus, there is a need for an automated data pipeline that
can extract billing data from multiple sources and transform it into a structured format for
efficient analysis and revenue reporting.

##1. Determine the requirements

Source of the data: Here are three sample datasets (https://bit.ly/416WE1X) with billing data.

Destination of the data: A local csv database

The transformations that need to be applied:
<br>  1.The datasets can be joined using Customer ID, Date of purchase/payment/refund, and country of purchase/payment/refund as keys.
<br>2. The datasets may contain missing values and outliers for some fields, such as the total amount billed or refund amount.
<br>3. The payment status may be missing or incomplete for some of the transactions.
<br>4. The promo code field may be empty for some of the purchases.
<br>5. The reason for the refund may be missing for some of the refund transactions.


##2. Extract the data

In [None]:
import pandas as pd 
import logging

#Setup logger
logging.basicConfig(filename='pipeline.log', level=logging.DEBUG)

#Initialize the dataframes to be used
dset1 = pd.DataFrame()
dset2 = pd.DataFrame()
dset3 = pd.DataFrame()




In [None]:
#Use Python to read the CSV files and extract the data.


def readCsv(f1, f2, f3):

  """
  Function to read data from csv files and return df for each file
  """

  try:
    dset1 = pd.read_csv(f1)
    dset2 = pd.read_csv(f2)
    dset3 = pd.read_csv(f3)

  except Exception as e:
    logging.debug("Method readCsv exception occurred")
  
  return dset1,dset2,dset3



In [None]:
dset1, dset2,dset3 = readCsv('dataset1.csv','dataset2.csv', 'dataset3.csv' )
#dset1.shape

##3. Clean the data

Perform data cleaning on the extracted data to remove any missing
values and outliers. 
For example, we can replace missing values with an appropriate
value or remove them altogether

In [None]:

def cleanDF(df):
  """
  Function to fix nulls in dset1 promo column and return clean df
  """

  try:
    dset11 = df.fillna('PROMO0')

  except Exception as e:
    logging.debug("Method cleanDF exception occurred")
  
  return dset11

dset11 = cleanDF(dset1)

In [None]:
dset11.shape
dset11.isnull().sum()


customer_id            0
date_of_purchase       0
total_amount_billed    0
payment_status         0
payment_method         0
promo_code             0
country_of_purchase    0
dtype: int64

In [None]:
dset2.isnull().sum()

customer_id           0
date_of_payment       0
amount_paid           0
payment_method        0
payment_status        0
late_payment_fee      0
country_of_payment    0
dtype: int64

In [None]:
dset3.isnull().sum()

customer_id          0
date_of_refund       0
refund_amount        0
reason_for_refund    0
country_of_refund    0
dtype: int64

##4. Transform the data 

Apply any necessary transformations on the data, such as data
type conversion, data aggregation, and data filtering, to prepare the data for analysis

In [None]:

def aggregator(dset11, dset2, dset3):
  """
  Data aggregation function to get average spend. Returns df + new csv file created with average billed, paid, refunded
  """
  try:
    mean_billed_df = dset11.groupby('country_of_purchase')['total_amount_billed'].mean()
    mean_paid_df = dset2.groupby('country_of_payment')['amount_paid'].mean()
    mean_refund_df = dset3.groupby('country_of_refund')['refund_amount'].mean()

    frame = { 'Average_Billed': mean_billed_df, 'Average_Paid': mean_paid_df, 'Average_Refund':mean_refund_df  }

    mean_df = pd.DataFrame(frame)
    mean_df.to_csv("means1.csv", index=True)

  except Exception as e:
    logging.debug("Method aggregator exception occurred")

  return mean_df

mean = aggregator(dset11, dset2, dset3)
mean

Unnamed: 0,Average_Billed,Average_Paid,Average_Refund
UK,95.833333,95.833333,100.0
USA,108.333333,108.333333,105.555556


In [None]:
def transformDF(dset11, dset2, dset3):

  """ 
  Data conversion for these columns on dataset1:date_of_purchase, payment_status,	payment_method,	promo_code,	country_of_purchase
  Data conversion for these columns on dataset2:date_of_payment, payment_method , payment_status, country_of_payment 
  Data conversion for these columns on dataset3:date_of_refund, reason_for_refund, country_of_refund 
  Returns 3 cleaned dfs 
  """
  try:

    #dset11
    toCategoryType = ['payment_status',	'payment_method',	'promo_code',	'country_of_purchase']

    for c in toCategoryType:
      dset11[c] = dset11[c].astype("category") 

    dset11['date_of_purchase'] = pd.to_datetime(dset11['date_of_purchase'])

    #dset2
    toCategoryType = [ 'payment_method' , 'payment_status', 'country_of_payment' ]

    for c in toCategoryType:
      dset2[c] = dset2[c].astype("category") 

    dset2['date_of_payment'] = pd.to_datetime(dset2['date_of_payment'])

    #dset3
    toCategoryType = [ 'reason_for_refund', 'country_of_refund'  ]

    for c in toCategoryType:
      dset3[c] = dset3[c].astype("category") 

    dset3['date_of_refund'] = pd.to_datetime(dset3['date_of_refund'])

    #Country column in all 3 datasets identical. Drop 2 and retain 1

    dset3 = dset3.drop(columns=['country_of_refund'])
    dset2 = dset2.drop(columns=['country_of_payment'])

    #rename remaining country column
    dset11 = dset11.rename(columns={"country_of_purchase": "country"})

  except Exception as e:
    logging.debug("Method transformDF exception occurred")

  return dset11, dset2, dset3


In [None]:
dset11, dset2, dset3 = transformDF(dset11, dset2, dset3)

##5. Merge the datasets

Join the different datasets into a single dataset that can be used for
analysis.

In [None]:
def mergeDF(dset11, dset2, dset3):
  """
  Join the 3 dfs based on customer id column and return merged df
  """

  try:
    df_merge = pd.merge(dset11, dset2, how='inner', on = 'customer_id')
    df_merge = pd.merge(df_merge, dset3, how='inner', on = 'customer_id')

  except Exception as e:
    logging.debug("Method mergeDF exception occurred")

  return df_merge

df_merge = mergeDF(dset11, dset2, dset3)

##6. Load the dataset

Load the transformed data into a database or a file, such as a CSV file,
that can be easily analyzed.

In [None]:
df_merge.to_csv("merge1.csv", index=False)

##7. Automate the process

Automate the data pipeline by scheduling it to run at a specific
time, such as daily or weekly so that it can update the analysis data automatically.


In [None]:
import os.path
from os import path

if __name__ == '__main__':

    #Extract Data
    dset1, dset2,dset3 = readCsv('dataset1.csv','dataset2.csv', 'dataset3.csv' )

    #Use assert test to confirm only dataframes returned
    assert(type(dset1).__name__ == 'DataFrame')
    assert(type(dset2).__name__ == 'DataFrame')
    assert(type(dset3).__name__ == 'DataFrame')

    #Clean the data 
    dset11 = cleanDF(dset1)

    #Test that a df has been returned 
    assert(type(dset11).__name__ == 'DataFrame')

    #-Transfrom Data --
    #Aggregation
    mean = aggregator(dset11, dset2, dset3)
    assert(type(mean).__name__ == 'DataFrame')

    #Data types conversion and column renaming
    dset11, dset2, dset3 = transformDF(dset11, dset2, dset3)
    assert(type(dset11).__name__ == 'DataFrame')
    assert(type(dset2).__name__ == 'DataFrame')
    assert(type(dset3).__name__ == 'DataFrame')

    #Merge the datasets
    df_merge = mergeDF(dset11, dset2, dset3)
    assert(type(df_merge).__name__ == 'DataFrame')

    #Load data to csv
    df_merge.to_csv("merge1.csv", index=False)

    #Check that df contents have been loaded to csv file 
    assert True is (path.isfile("merge1.csv"))




Use steps below to setup cron job.
This cron job will execute above pipeline script every Monday at 9am.

1- run 'crontab -e'
<br>2- append below text, save and close file 

0 9 * * 1 /path/to/this/pipelinescript.py # Run Mondays 0900hrs


##8. Test the pipeline

*Tested the data pipeline to ensure it produces the correct results by using assertion to confirm expected output*


##9. Optimize the pipeline

*Optimized the pipeline using data conversion e.g: switching object types to category type where feasible*


##10. Monitor the pipeline

*This was implemented using try/except code blocks within the functions.
Logging to file was also enabled*