In [6]:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

import pandas as pd
import os


"""
vists_dags_func

"""
folder_path = '/root/airflow_final/visits_folder'

#A function that scan tables from excel workbook
def extract_visits_data(folder_path):
    """
    Reads all Excel files (.xlsx) in a folder and returns a dictionary of DataFrames, where each key is the filename
    (without extension) and the value is the corresponding DataFrame.
    """
    excel_files = [f for f in os.listdir(folder_path) if f.endswith('.xlsx')]
    dataframes = {}
    for file in excel_files:
        file_path = os.path.join(folder_path, file)
        dfs = pd.read_excel(file_path, engine='openpyxl', sheet_name=None)      
        for sheet_name, df in dfs.items():
            dataframes[f'{file[:-5]}_{sheet_name}'] = df
    
    return dataframes

def load_workbooks(folder_path):
    """
    Reads all Excel workbooks (.xlsx) in a folder and returns a dictionary of DataFrames, where each key is the facility
    name and the value is a dictionary of DataFrames, where each key is the sheet name (except 'main info') and the value
    is the corresponding DataFrame.
    
    Args:
    - folder_path: A string with the path of the folder containing the Excel workbooks.
    
    Returns:
    - A dictionary of DataFrames, where each key is the facility name and the value is a dictionary of DataFrames, where
    each key is the sheet name (except 'main info') and the value is the corresponding DataFrame.
    
    Raises:
    - ValueError if the folder_path argument is None or empty.
    - Exception if there's an error reading the Excel workbooks or extracting the data.
    
    The function reads all Excel workbooks (.xlsm) in the folder_path directory and extracts the data from the sheets
    (except 'main info') into a dictionary of DataFrames, where each key is the sheet name (except 'main info') and the
    value is the corresponding DataFrame. The function also reads the 'main info' sheet in each workbook to get the
    facility names, which will be used as the keys in the dictionary of DataFrames. If a facility doesn't have data in
    any of the sheets, its corresponding value in the dictionary of DataFrames will be an empty DataFrame. The function
    returns the resulting dictionary of DataFrames.
    """


    excel_files = [f for f in os.listdir(folder_path) if f.endswith('.xlsm')]
    workbooks = []
    for file in excel_files:
        file_path = os.path.join(folder_path, file)
        excel_file = pd.ExcelFile(file_path)
        dfs = {}
        for sheet_name in excel_file.sheet_names:
            if sheet_name != 'Main Info':
                df = pd.read_excel(excel_file, sheet_name, index_col='Facility Name')
                dfs[sheet_name] = df
        main_info_df = pd.read_excel(excel_file, 'Main Info')
        workbook_dfs = [main_info_df]
        for sheet_name, df in dfs.items():
            joined_df = main_info_df.join(df, on='Facility Name', how='left', rsuffix='_'+sheet_name)
            workbook_dfs.append(joined_df)
        workbook_df = pd.concat(workbook_dfs, axis=1)
        workbooks.append(workbook_df)
    return workbooks



In [7]:
# result = load_workbooks(folder_path)
# main_info_df = result.workbook_df[0].loc[:, 'Main Info']
try:
    workbooks = load_workbooks(folder_path)
    for workbook in workbooks:
        print(workbook)
except Exception as e:
    print(f"Error: {str(e)}")


Error: Index Facility Name invalid


In [22]:
# Define the DAG

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 4, 10)
}

dag = DAG('visits_dag_func', default_args=default_args, schedule_interval="@daily",catchup=False)


# Define the first task

extract_data = PythonOperator(
    task_id='extract_data',
    python_callable= extract_visits_data,
    op_kwargs={'folder_path': folder_path},
    dag=dag
)

# Define the second task

check_for_row_updates = BranchPythonOperator(
    task_id='check_for_row_updates',
    python_callable=check_for_row_updates,
    dag=dag
)




In [None]:
def update_facility_data(dataframe, table_name):
    """
    Updates facility data in a database table based on changes in a pandas DataFrame.
    
    Args:
    - dataframe: A pandas DataFrame with facility data, where the index corresponds to the facility name.
    - table_name: A string with the name of the database table to update.
    
    Returns:
    - A tuple with the number of rows updated and the number of rows added to the database table.
    
    Raises:
    - ValueError if the DataFrame or table_name argument is None or empty.
    - Exception if there's an error connecting to or updating the database table.
    
    The function checks if each facility in the DataFrame has updates in the corresponding table in the database,
    based on the facility name (which should act as the primary key). If there are updates, the function replaces
    the facility data in the table with the data from the DataFrame. If a facility in the DataFrame doesn't exist
    in the table, the function adds a new row to the table with the facility data from the DataFrame. The function
    returns a tuple with the number of rows updated and the number of rows added to the database table.
    """

    if dataframe is None or dataframe.empty:
        raise ValueError('The dataframe argument is empty.')
    if table_name is None or table_name == '':
        raise ValueError('The table_name argument is empty.')
    
    # Connect to the database
    connection = connect_to_database()
    
    # Get the column names from the database table
    cursor = connection.cursor()
    cursor.execute(f'SELECT TOP 0 * FROM {table_name}')
    column_names = [column[0] for column in cursor.description]
    
    # Create a list of tuples with the facility data from the DataFrame
    values = []
    for index, row in dataframe.iterrows():
        values.append(tuple(row[column] for column in column_names))
    
    # Create a list of column names to use in the SQL query
    columns = ', '.join(column_names)
    
    # Create a list of column names to use in the SQL query
    placeholders = ', '.join('?' * len(column_names))
    
    # Create a list of column names to use in the SQL query
    updates = ', '.join(f'{column} = excluded.{column}' for column in column_names)
    
    # Create the SQL query
    sql = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders}) ON CONFLICT (facility_name) DO UPDATE SET {updates}'
    
    # Execute the SQL query
    cursor.executemany(sql, values)
    
    # Commit the changes to the database
    connection.commit()
    
    # Return the number of rows updated and the number of rows added
    return cursor.rowcount, len(values)