In [11]:
from datetime import datetime, timedelta
import datetime
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import boto3 
from dotenv import load_dotenv
from io import StringIO
import os


def import_raw(bucket = 'tpprogra'):
    load_dotenv()  # Carga las variables de entorno del archivo .env

    ACCESS_KEY = os.getenv('ACCESS_KEY')
    SECRET_KEY=os.getenv('SECRET_KEY')

    s3 = boto3.client( 's3',region_name='us-east-1', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
    bucket_name = bucket 

    ads_views = 'ads_views.csv'
    file_object_ads_views = s3.get_object(Bucket=bucket_name, Key=ads_views) 
    df_ads_views = pd.read_csv(file_object_ads_views['Body'])


    advertiser_ids = 'advertiser_ids.csv'
    file_object_advertiser_ids = s3.get_object(Bucket=bucket_name, Key=advertiser_ids) 
    df_adverts = pd.read_csv(file_object_advertiser_ids['Body'])


    product_views = 'product_views.csv'
    file_object_product_views = s3.get_object(Bucket=bucket_name, Key=product_views) 
    df_products = pd.read_csv(file_object_product_views['Body'])

    return df_ads_views, df_adverts, df_products

In [12]:
#Definimos las tareas
# tarea nro 1 filtrar los datos 

def filter_raw():
    df_ads_views, df_adverts, df_products=import_raw()
    
    advertisers_activos = df_adverts
    product_view = df_products
    ads_views = df_ads_views
    active_product_views = product_view [product_view["advertiser_id"].isin(advertisers_activos["advertiser_id"])]
    active_ads_views = ads_views [ads_views["advertiser_id"].isin(advertisers_activos["advertiser_id"])]
    
    return active_product_views, active_ads_views


def save_to_bucket(bucket = 'tpprogra'):
    ACCESS_KEY = os.getenv('ACCESS_KEY')
    SECRET_KEY=os.getenv('SECRET_KEY')
    active_product_views, active_ads_views=filter_raw()
    # Convertir el DataFrame a CSV
    active_product_views_csv = StringIO()
    active_product_views.to_csv(active_product_views_csv, index=False)

    active_ads_views_csv = StringIO()
    active_ads_views.to_csv(active_ads_views_csv, index=False)

    #guardo temporalmente los archivos 
    client = boto3.client('s3',region_name='us-east-1', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)

    client.put_object(Body=active_product_views_csv.getvalue().encode('utf-8'), Bucket=bucket, Key='active_product_views.csv')
    client.put_object(Body=active_ads_views_csv.getvalue().encode('utf-8'), Bucket=bucket, Key='active_ads_views.csv')


with DAG(
    dag_id= "analisis_datos_publicidad",
    start_date= datetime.datetime(2024, 4, 29),
    schedule=timedelta(days=1),
    catchup=False
    ) as dag: 
    
    filtrar_datos_task = PythonOperator(
    task_id='filtrar_datos',
    python_callable=filtrar_datos
)

In [13]:
save_to_bucket()