In [None]:
!pip install apache-airflow

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from sqlalchemy import create_engine
import logging

In [None]:
default_args = {
    'owner': 'MTN Rwanda',
    'depends_on_past': False,
    'start_date': datetime(2023, 4, 18),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('data_pipeline', default_args=default_args, schedule_interval='@daily')

In [20]:
  # Data Extraction
def extract_data():
    customer_data = pd.read_csv('customer_data.csv')
    order_data = pd.read_csv('order_data.csv')
    payment_data = pd.read_csv('payment_data.csv')
    
    # load the CSV data into Pandas dataframes
    
    return customer_data, order_data, payment_data

In [21]:
  # Data Transformation 
def transform_data(customer_data, order_data, payment_data):
        
    # convert date fields to the correct format using pd.to_datetime
    customer_data['date_of_birth'] = pd.to_datetime(customer_data['date_of_birth'])
    order_data['order_date'] = pd.to_datetime(order_data['order_date'])
    payment_data['payment_date'] = pd.to_datetime(payment_data['payment_date'])
    
    # merging the data
    customer_order_data = pd.merge(customer_data, order_data, on='customer_id')
    
    # merge payment dataframe with the merged dataframe on the order_id and customer_id columns
    payment_customer_order_data = pd.merge(payment_data, customer_order_data, on=['order_id', 'customer_id'])
    
    # drop unnecessary columns like customer_id and order_id.axis 
    payment_customer_order_data.drop(['customer_id', 'order_id'], axis=1, inplace=True)
    
    # group the data by customer and aggregate the amount paid using sum
    customer_lifetime_value = payment_customer_order_data.groupby(['email', 'country', 'gender', 'date_of_birth']).agg(total_amount_paid=('amount', 'sum'), number_of_orders=('product', 'count')).reset_index()
    
    # create a new column to calculate the total value of orders made by each customer
    customer_lifetime_value['total_order_value'] = customer_lifetime_value['total_amount_paid'] / customer_lifetime_value['number_of_orders']
    
    # calculate the customer lifetime value using the formula CLV = (average order value) x (number of orders made per year) x (average customer lifespan)
    customer_lifetime_value['lifespan'] = (pd.Timestamp.today() - customer_lifetime_value['date_of_birth']).dt.days / 365.25
    customer_lifetime_value['average_order_value'] = customer_lifetime_value['total_order_value']
    customer_lifetime_value['clv'] = customer_lifetime_value['average_order_value'] * customer_lifetime_value['number_of_orders'] * customer_lifetime_value['lifespan']
    
    return customer_lifetime_value

In [22]:
# Data Loading 
def load_data(transformed_data):
    try:
        
       # load the transformed data into Postgres database
        engine = create_engine('postgresql://username:password@localhost:5432/dbname')
        connection = engine.connect()
        table_name = 'mtnrwanda_data'

        # create table if it doesn't exist
        if not engine.has_table(table_name):
            transformed_data.iloc[0:0].to_sql(table_name, engine, if_exists='replace', index=False)

        # insert data into table
        transformed_data.to_sql(table_name, connection, if_exists='append', index=False)

        connection.close()
        
        logging.info('Data loaded into Postgres database')
        
    except Exception as e:
        logging.error(f'Error loading data into Postgres database: {e}')
        raise


with dag:
    
    extract_data_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data
    )

    transform_data_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
    )

    load_data_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data
    )

    # define dependencies
    extract_data_task >> transform_data_task >> load_data_task

**Pipeline Documentation**

The data pipeline consists of three tasks:

**extract_data_task**: reads in CSV files containing customer, order, and payment data and returns them as Pandas dataframes. 
**transform_data_task**: takes the three dataframes as inputs and performs data cleaning and transformation operations, such as merging, grouping, and aggregating, to compute the customer lifetime value for each customer. **load_data_task**: takes the transformed data as input and loads it into a Postgres database. The data is inserted into a table named "mtnrwanda_data". The three tasks are defined as PythonOperator instances in an Airflow DAG named "data_pipeline". The DAG is scheduled to run daily.

**Best practices**:

**Modularization**: The pipeline is split into three tasks, each responsible for a specific part of the data processing. This approach makes the code easier to understand, test, and maintain.

**Error handling**: The load_data_task is wrapped in a try/except block, which catches any exceptions that may occur during the loading process and logs an error message. This practice makes the pipeline more resilient to errors and helps to prevent data loss.

**Logging**: The load_data_task logs its progress and any errors that occur during the data loading process. This practice makes it easier to debug the pipeline and monitor its performance.

**Recommendations for deployment and running the pipeline in a cloud-based provider: **

**Use a managed service**: Consider using a managed service like Amazon Redshift or Google BigQuery for your data storage and processing needs. These services are highly scalable, fault-tolerant, and can handle large volumes of data. They also have built-in security and data governance features.

**Containerize your application**: Use containers to package your application and its dependencies so that it can be easily deployed on any cloud-based provider. This will allow you to move your application between different environments without worrying about compatibility issues.

**Use a managed workflow service**: Use a managed workflow service like AWS Step Functions or Google Cloud Composer to manage the execution of your data pipeline. These services provide a visual interface for building, scheduling, and monitoring workflows, and can automatically retry failed tasks.

