In [None]:
import boto3
import json
from io import StringIO
from airflow import DAG
from airflow.models import Variable
from datetime import datetime, timedelta, time, date
from bs4 import BeautifulSoup
from airflow.operators.python_operator import PythonOperator
import requests
import pandas as pd
from ftplib import FTP

# Set default arguments for the DAG
default_args = {
    'owner': 'me',
    'start_date': datetime.combine(datetime(2023, 1, 20), time(hour=23)),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create a new DAG
dag = DAG(
    'read_file_from_website_and_upload_to_s3',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

# Define a function that reads the file from the website
def read_file_from_website():

    # URL of the website folder containing the CSV files
    folder_url = 'https://alphainsights.000webhostapp.com/uploads/'

    # Make a request to the website to get the list of files in the folder
    response = requests.get(folder_url)
    soup = BeautifulSoup(response.content, 'html.parser')

    # Extract the list of files
    files = [link.get('href') for link in soup.find_all('a')]

    # create an empty dictionary to store the DataFrames
    df_dict = {}
    df_dict_hist = {}

    # delete files from server
    ftp = FTP("files.000webhost.com")
    ftp.login("alphainsights", "alphainsights")

    # Iterate through the list of files and read each CSV file using pandas
    for file in files:
        if "csv" in file:
            file_url = folder_url + file
            df = pd.read_csv(file_url)
            name = file.split('.')[0]
            df_dict[name] = df
            name = name +"-"+ str(date.today())
            df_dict_hist[name] = df
            ftp.delete("/public_html/uploads/"+file)

    ftp.quit()
    df_dict = {name: df.to_json() for name, df in df_dict.items()}
    Variable.set("df_dict", json.dumps(df_dict))
    df_dict_hist = {name: df.to_json() for name, df in df_dict_hist.items()}
    Variable.set("df_dict_hist", json.dumps(df_dict_hist))
    #return df_dict

# Define a function that performs the data transformation
def transform_data():
    df_dict = json.loads(Variable.get("df_dict"))
    df_dict = {name: pd.read_json(df_json) for name, df_json in df_dict.items()}
    df_dict_cleaned = {}
    for key, df in df_dict.items():
        # do some transformation on the DataFrame
        if 'closed' in key:
            df.drop(['has_company', 'has_gtin', 'average_stock', 'declared_product_catalog_size', 'declared_monthly_revenue'], axis=1, inplace=True)
            #df['won_date'] = pd.to_datetime(df['won_date']).dt.date
            df['won_date'] = df['won_date'].astype(str)
            df['won_date'] = df['won_date'].str.split(" ").str[0]
            name = "closed_deals"
            df_dict_cleaned[name] = df
        elif 'sellers' in key:
            df.drop(['seller_city','seller_state'], axis=1, inplace=True)
            name = "sellers"
            df_dict_cleaned[name] = df
        elif 'customers' in key:
            df.drop(['customer_city','customer_state'], axis=1, inplace=True)
            name = "customers"
            df_dict_cleaned[name] = df
        else:
            pass
    df_dict_cleaned = {name: df.to_json() for name, df in df_dict_cleaned.items()}
    Variable.set("df_dict_cleaned", json.dumps(df_dict_cleaned))
    #return df_dict_cleaned

# Define a function that uploads the file to S3
def upload_to_s3():
    df_dict = json.loads(Variable.get("df_dict_cleaned"))
    df_dict = {name: pd.read_json(df_json) for name, df_json in df_dict.items()}
    df_dict_hist = json.loads(Variable.get("df_dict_hist"))
    df_dict_hist = {name: pd.read_json(df_json) for name, df_json in df_dict_hist.items()}
    session = boto3.Session(
        aws_access_key_id="AKIA3L5SBRUK2Z4GN7PL",
        aws_secret_access_key="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",
    )
    s3_res = session.resource("s3")
    bucket_name = "incremental-load-bucket"
    for key, df in df_dict.items():
        csv_buffer = StringIO()
        df.to_csv(csv_buffer, index=False)
        file_name = key+".csv"
        s3_res.Object(bucket_name, file_name).put(Body=csv_buffer.getvalue())
    
    bucket_name = "incremental-load-bucket-historicals"
    for key, df in df_dict_hist.items():
        csv_buffer = StringIO()
        df.to_csv(csv_buffer, index=False)
        file_name = key+".csv"
        s3_res.Object(bucket_name, file_name).put(Body=csv_buffer.getvalue())

# Create the Airflow tasks using the defined functions
read_file_task = PythonOperator(
    task_id='read_file_from_website',
    python_callable=read_file_from_website,
    provide_context=True,
    dag=dag
)

transform_data_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    provide_context=True,
    #op_kwargs={'df_dict': read_file_task.output()},
    dag=dag
)

upload_to_s3_task = PythonOperator(
    task_id='upload_to_s3',
    python_callable=upload_to_s3,
    provide_context=True,
    #op_kwargs={'df_dict': transform_data_task.output()},
    dag=dag
)

# Set the task dependencies
read_file_task >> transform_data_task >> upload_to_s3_task