In [11]:
import os
import logging
import pandas as pd
from dotenv import load_dotenv
# from engine import create_connection_string, get_engine
# from query_loader import load_queries, load_sql_file
# from update_or_append import update_or_append
from sqlalchemy import exc as sqlalchemy_exc


In [12]:
import os
from sqlalchemy import create_engine
from sqlalchemy.engine import URL

def create_connection_string(server: str) -> str:
    """Create a connection string based on the server type."""
    address = os.getenv(f'{server}_ADDRESS')
    database = os.getenv(f'{server}_DATABASE')
    driver = '{SQL Server}'
    if server == 'DW':
        return f'DRIVER={driver};SERVER={address};DATABASE={database};Trusted_Connection=yes;'
    elif server == 'QN':
        username = os.getenv(f'{server}_USERNAME')
        password = os.getenv(f'{server}_PASSWORD')
        return f'DRIVER={driver};SERVER={address};DATABASE={database};UID={username};PWD={password};'
    elif server == "DM":
        username = os.getenv(f'{server}_USERNAME')
        password = os.getenv(f'{server}_PASSWORD')
        return f'DRIVER={driver};SERVER={address};DATABASE={database};UID={username};PWD={password};'
    else:
        raise ValueError(f"Server {server} configuration not found")

def get_engine(connection_string: str):
    """Get a SQLAlchemy engine."""
    url = URL.create("mssql+pyodbc", query={"odbc_connect": connection_string})
    return create_engine(url, fast_executemany=True)


In [13]:
import json
from typing import Dict

def load_queries(filepath: str) -> Dict[str, Dict]:
    """Load queries from a JSON file."""
    with open(filepath) as json_file:
        return json.load(json_file)

def load_sql_file(filepath: str) -> str:
    """Load SQL query from a file."""
    with open(filepath) as sql_file:
        return sql_file.read()

In [14]:
import smartsheet
import pandas as pd
from typing import Any
import logging
import os
from dotenv import load_dotenv

load_dotenv()

def update_or_append(sheet_id: str, df: pd.DataFrame, primary_column_name: str) -> bool:
    """
    Update or append rows to a Smartsheet based on the primary column value.

    Args:
        sheet_id (str): ID of the Smartsheet to update/append.
        df (pd.DataFrame): DataFrame containing the data to be updated/appended.
        primary_column_name (str): Name of the primary column in the DataFrame and Smartsheet.

    This function updates rows in a Smartsheet if a matching primary column value is found.
    If no match is found, it appends the row as a new entry. Errors are logged for troubleshooting.
    """
    try:
        api_key = os.getenv('SMARTSHEET_API_KEY')

        # Initialize client
        smartsheet_client = smartsheet.Smartsheet(api_key)

        # Load the entire sheet
        sheet = smartsheet_client.Sheets.get_sheet(sheet_id)

        # Log the sheet ID and name
        # If the sheet is not found, log an error
        if not sheet:
            logging.error(f"Smartsheet ID {sheet_id} not found.")
            return False
        else:
            logging.info(f"Loaded Smartsheet {sheet.name} with ID {sheet_id}")

        # Mapping DataFrame columns to Smartsheet column IDs
        column_map = {col.title: col.id for col in sheet.columns}

        # Get the primary columns IDs
        primary_column_id = column_map.get(primary_column_name)

        # If not all columns in the DataFrame are in the Smartsheet, log an error
        if not all(col in column_map for col in df.columns):
            logging.error(f"Not all columns in the DataFrame are in the Smartsheet. Please check the column names.")
            return False

        # Prepare rows to update or add
        rows_to_update = []
        rows_to_add = []

        for index, row in df.iterrows():
            # Find the row in the Smartsheet that matches the primary column value
            existing_row = next((r for r in sheet.rows if r.get_column(primary_column_id).value == row[primary_column_name]), None)

            new_smartsheet_row = smartsheet.models.Row()

            # Add the cells to the row
            new_smartsheet_row.cells = [smartsheet.models.Cell({
                'column_id': column_map[col],
                'value': row[col]
            }) for col in df.columns]

            if existing_row:
                # If the values are the same, skip the row. Get columns using the column_map
                if all(existing_row.get_column(column_map[col]).value == row[col] for col in df.columns):
                    # Log the skipped row
                    logging.info(f"{sheet.name}: Skipped row with {primary_column_name} value {row[primary_column_name]}")
                    continue
                new_smartsheet_row.id = existing_row.id
                rows_to_update.append(new_smartsheet_row)
            else:
                new_smartsheet_row.to_bottom = True
                rows_to_add.append(new_smartsheet_row)

        # Update existing rows
        if rows_to_update:
            smartsheet_client.Sheets.update_rows(sheet_id, rows_to_update)
            logging.info(f"Updated {len(rows_to_update)} rows in Smartsheet ID {sheet_id}")

        # Add new rows
        if rows_to_add:
            smartsheet_client.Sheets.add_rows(sheet_id, rows_to_add)
            logging.info(f"Added {len(rows_to_add)} new rows to Smartsheet ID {sheet_id}")

        return True

    except Exception as e:
        logging.error(f"An unexpected error occurred in update_or_append: {e}", exc_info=True)
        return False

In [15]:

load_dotenv()

logging.basicConfig(filename='logs.txt', level=logging.INFO, format='%(asctime)s:%(levelname)s:%(message)s')

def app():
    queries = load_queries('queries/queries.json')

    # Create database engines
    dw_engine = get_engine(create_connection_string('DW'))
    qnxt_engine = get_engine(create_connection_string('QN'))
    dm_engine = get_engine(create_connection_string('DM'))

    # Iterate over queries
    for name, query in queries.items():
        try:
            process_query(name, query, dw_engine, qnxt_engine, dm_engine)
        except Exception as e:
            logging.error(f"Error processing {name}: {e}")

def process_query(name: str, query: dict, dw_engine, qnxt_engine, dm_engine):
    """Process a single query."""
    if query['database'] == 'DW':
        engine = dw_engine
    elif query['database'] == 'QN':
        engine = qnxt_engine
    elif query['database'] == 'DM':
        engine = dm_engine
        
    sql = load_sql_file(f'queries/{query["sql"]}')

    try:
        df = pd.read_sql(sql, engine)
        update_or_append(query['id'], df, query['primary_column'])
    except sqlalchemy_exc.SQLAlchemyError as e:
        logging.error(f"SQLAlchemyError in query {name}: {e}")



In [16]:
app()