In [38]:
import pandas as pd
from airflow import DAG
from datetime import datetime, timedelta

#Loading the datasets
df1 = pd.read_csv('dataset1.csv')
df2 = pd.read_csv('dataset2.csv')
df3 = pd.read_csv('dataset3.csv')



In [39]:
from airflow.operators.python_operator import PythonOperator

In [40]:
pip install apache-airflow

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [41]:
# Remove missing values
df1 = df1.dropna()
df2 = df2.dropna()
df3 = df3.dropna()


In [42]:
# Remove outliers
df1 = df1[df1['total_amount_billed'] < 100000]
df2 = df2[df2['amount_paid'] < 100000]
df3 = df3[df3['refund_amount'] < 100000]



In [43]:
# Convert date columns to datetime
df1['date_of_purchase'] = pd.to_datetime(df1['date_of_purchase'])
df2['date_of_payment'] = pd.to_datetime(df2['date_of_payment'])
df3['date_of_refund'] = pd.to_datetime(df3['date_of_refund'])


In [16]:
# Rename columns
df1 = df1.rename(columns={'total_amount_billed': 'amount', 'payment_status': 'status', 'country_of_purchase': 'country'})
df2 = df2.rename(columns={'amount_paid': 'amount', 'payment_status': 'status', 'country_of_payment': 'country'})
df3 = df3.rename(columns={'refund_amount': 'amount', 'country_of_refund': 'country'})


In [18]:
# Merge the datasets using customer_id, date, and country as keys
df = pd.merge(df1, df2, on=['customer_id'], how='outer')
df_merged = pd.merge(df, df3, on=['customer_id'], how='outer')


In [20]:
# Replace missing promo code with 'none'
df_merged['promo_code'] = df_merged['promo_code'].fillna('none')


In [25]:
# Join the datasets
df_joined = pd.concat([df1, df2, df3], axis=0)


In [22]:
# Loading the data 
def load_data(df):
    filename = datetime.today().strftime("%Y%m%d") + '_cdr.csv'
    df.to_csv(filename, index=False)
    return filename


In [None]:
# Automating the data 
default_args = {
    'owner': 'telecom',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 4),
    'email': ['telecom@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'cdr_pipeline',
    default_args=default_args,
    description='ETL pipeline for call detail records',
    schedule_interval=timedelta(days=1),
)


In [None]:
#Testing the pipeline 
# Calculate the net amount (amount - late payment fee - refund amount)
df_merged['net_amount'] = df_merged['amount_paid'] - df_merged['late_payment_fee'] - df_merged['refund_amount']


In [31]:
# Writing the transformed data to a CSV file
df_merged.to_csv("transformed_data.csv", index=False)