In [1]:
!pip install apache-airflow

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting apache-airflow
  Downloading apache_airflow-2.6.0-py3-none-any.whl (12.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.1/12.1 MB[0m [31m48.4 MB/s[0m eta [36m0:00:00[0m
Collecting flask-caching>=1.5.0
  Downloading Flask_Caching-2.0.2-py3-none-any.whl (28 kB)
Collecting unicodecsv>=0.14.1
  Downloading unicodecsv-0.14.1.tar.gz (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting marshmallow-oneofschema>=2.0.1
  Downloading marshmallow_oneofschema-3.0.1-py2.py3-none-any.whl (5.8 kB)
Collecting gunicorn>=20.1.0
  Downloading gunicorn-20.1.0-py3-none-any.whl (79 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.5/79.5 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
Collecting sqlalchemy<2.0,>=1.4
  Downloading SQLAlchemy-1.4.48-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x

In [3]:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta

In [4]:
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 4, 19),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

In [6]:
logging.basicConfig(filename='extract_data.log', level=logging.ERROR)
dag = DAG('data_pipeline', default_args=default_args, schedule ='@daily')

In [7]:
def extract_data():
    try:
        # extracting data from CSV files
        customers = pd.read_csv('customer_data.csv')
        orders = pd.read_csv('order_data.csv')
        payments = pd.read_csv('payment_data.csv')

        # loading the CSV data into Pandas dataframes for later transformation
        return customers, orders, payments
    except Exception as e:
        logging.error(f"An error occurred while extracting data: {e}")

In [8]:
customers, orders, payments = extract_data()

In [9]:
#Checking the customers dataframe
customers

Unnamed: 0,customer_id,first_name,last_name,email,country,gender,date_of_birth
0,1,John,Doe,john.doe@example.com,USA,Male,1990-01-01
1,2,Jane,Smith,jane.smith@example.com,USA,Female,1992-05-12
2,3,Paul,Mukasa,paul.mukasa@example.com,Rwanda,Male,1985-07-28
3,4,Grace,Uwase,grace.uwase@example.com,Rwanda,Female,1995-12-31
4,5,Emmanuel,Mukiza,emmanuel.mukiza@example.com,Rwanda,Male,1987-03-15
5,6,Angela,Mirembe,angela.mirembe@example.com,Uganda,Female,1991-09-03
6,7,Joseph,Ndung'u,joseph.ndungu@example.com,Kenya,Male,1983-11-11
7,8,Esther,Kimani,esther.kimani@example.com,Kenya,Female,1998-02-20
8,9,David,Mwanza,david.mwanza@example.com,Tanzania,Male,1982-08-08
9,10,Fatma,Said,fatma.said@example.com,Tanzania,Female,1993-06-22


In [10]:
#Checking the payments dataframe
payments

Unnamed: 0,payment_id,customer_id,order_id,payment_date,amount
0,1,1,1,2023-01-01,500
1,2,2,2,2023-01-02,700
2,3,3,3,2023-01-03,1000
3,4,4,4,2023-01-04,100
4,5,5,5,2023-01-05,250


In [11]:
#Checking the orders dataframe
orders

Unnamed: 0,order_id,customer_id,order_date,product,price
0,1,1,2023-01-01,Phone,500
1,2,2,2023-01-02,Tablet,700
2,3,3,2023-01-03,TV,1000
3,4,4,2023-01-04,Headphones,100
4,5,5,2023-01-05,Smartwatch,250


In [12]:
def transform_data(customers, orders, payments):
    # Convert date fields to the correct format using pd.to_datetime
    customers['date_of_birth'] = pd.to_datetime(customers['date_of_birth'])
    orders['order_date'] = pd.to_datetime(orders['order_date'])
    payments['payment_date'] = pd.to_datetime(payments['payment_date'])
    
    # Merge customer and order dataframes on the customer_id column
    customer_orders = pd.merge(customers, orders, on='customer_id')
    
    # Merge payment dataframe with the merged dataframe on the order_id and customer_id columns
    customer_orders_payments = pd.merge(customer_orders, payments, on=['order_id', 'customer_id'])
    
    # Drop unnecessary columns like customer_id and order_id
    customer_orders_payments = customer_orders_payments.drop(['customer_id', 'order_id'], axis=1)
    
    # # Group the data by customer and aggregate the amount paid using sum
    grouped = customer_orders_payments.groupby(['first_name', 'last_name', 'email', 'country', 'gender', 'date_of_birth'])
    #grouped_agg = grouped.agg({'amount': 'sum', 'payment_id': 'nunique', 'order_date': 'max'})
    grouped_agg = grouped.agg({'amount': 'sum', 'payment_id': 'nunique', 'order_date': 'max', 'date_of_birth': 'first'})

    
    # # Create a new column to calculate the total value of orders made by each customer
    grouped_agg['total_order_value'] = grouped_agg['amount'] / grouped_agg['payment_id']
    
    # # Calculate the customer lifetime value using the formula CLV = (average order value) x (number of orders made per year) x (average customer lifespan)
    today = datetime.now()
    grouped_agg['average_customer_lifespan'] = (today - grouped_agg['date_of_birth']).dt.days / 365
    grouped_agg['number_of_orders_per_year'] = grouped_agg['payment_id'] / ((today - grouped_agg['order_date']).dt.days / 365)
    grouped_agg['clv'] = grouped_agg['total_order_value'] * grouped_agg['number_of_orders_per_year'] * grouped_agg['average_customer_lifespan']
  
    #return grouped_agg
    return grouped_agg

In [13]:
transform_data(customers, orders, payments)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,amount,payment_id,order_date,date_of_birth,total_order_value,average_customer_lifespan,number_of_orders_per_year,clv
first_name,last_name,email,country,gender,date_of_birth,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
Emmanuel,Mukiza,emmanuel.mukiza@example.com,Rwanda,Male,1987-03-15,250,1,2023-01-05,1987-03-15,250.0,36.167123,3.016529,27274.793388
Grace,Uwase,grace.uwase@example.com,Rwanda,Female,1995-12-31,100,1,2023-01-04,1995-12-31,100.0,27.364384,2.991803,8186.885246
Jane,Smith,jane.smith@example.com,USA,Female,1992-05-12,700,1,2023-01-02,1992-05-12,700.0,31.00274,2.943548,63880.645161
John,Doe,john.doe@example.com,USA,Male,1990-01-01,500,1,2023-01-01,1990-01-01,500.0,33.364384,2.92,48712.0
Paul,Mukasa,paul.mukasa@example.com,Rwanda,Male,1985-07-28,1000,1,2023-01-03,1985-07-28,1000.0,37.79726,2.96748,112162.601626


In [None]:
def load_data(grouped_agg):
    # Connect to the Postgres database

    """
    POSTGRES_ADDRESS = '35.237.226.12'
    POSTGRES_PORT = '5432'
    POSTGRES_USERNAME = 'postgres'
    POSTGRES_PASSWORD = 'password'
    POSTGRES_DBNAME = 'telecommunications_data'
    """
    
    try:
        conn = psycopg2.connect(database="telecommunications_data", user="postgres", password="password", host="35.237.226.12", port="5432")
    except psycopg2.Error as e:
        logging.error("Unable to connect to the database: %s", e)
        return

    # Create the necessary tables if it does not already exist
    with conn.cursor() as cur:
        try:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS payments_customers_orders (
                    first_name VARCHAR(50),
                    last_name VARCHAR(50),
                    email VARCHAR(255),
                    country VARCHAR(50),
                    gender VARCHAR(10),
                    date_of_birth DATE,
                    amount INTEGER,
                    payment_id INTEGER,
                    order_date DATE,
                    total_order_value INTEGER,
                    average_customer_lifespan FLOAT,
                    number_of_orders_per_year FLOAT,
                    clv FLOAT
                )
            """)
            conn.commit()
        except psycopg2.Error as e:
            logging.error("Error creating table: %s", e)
            return

    # Loop through the dataframe and insert the data into the table using SQL INSERT statements
    with conn.cursor() as cur:
        try:
            for row in grouped_agg.itertuples():
                cur.execute("""
                    INSERT INTO payments_customers_orders (
                        first_name,
                        last_name,
                        email,
                        country,
                        gender,
                        date_of_birth,
                        amount,
                        payment_id,
                        order_date,
                        total_order_value,
                        average_customer_lifespan,
                        number_of_orders_per_year,
                        clv
                    ) VALUES (
                        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
                    )
                """, row[1:])
            conn.commit()
        except psycopg2.Error as e:
            logging.error("Error inserting data into table: %s", e)
            return

    # commit the changes and close the connection
    cur.close()
    conn.close()

In [None]:
# define the tasks
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag
)

load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag
)

# define the dependencies
extract_task >> transform_task >> load_task
