In [37]:
import pandas as pd
import numpy as np
import fastapi as fast
from database_conntecion_miware import connect_to_postgresql
from dotenv import load_dotenv
import os
load_dotenv()
import psycopg2
import logging
from psycopg2.extras import execute_values
from fastapi import FastAPI, HTTPException

In [36]:


# Set up logging
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    filename="app.log",  # Save logs to a file
    filemode="w"         # Overwrite the file each time (use "a" for appending)
)

logger = logging.getLogger(__name__)



def create_table_if_not_exists(conn, df: pd.DataFrame, table_name: str) -> None:
    """Create table if it doesn't exist using pure SQL."""
    columns = [f"{col} TEXT" for col in df.columns]
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        {', '.join(columns)}
    )
    """
    
    with conn.cursor() as cur:
        try:
            cur.execute(create_table_sql)
            conn.commit()
            logger.info(f"Table {table_name} is ready")
        except Exception as e:
            conn.rollback()
            logger.error(f"Error creating table: {str(e)}")
            raise

def insert_data_in_batches(conn, df: pd.DataFrame, table_name: str, batch_percentage: int) -> None:
    """Insert data using psycopg2 execute_values in percentage-based batches."""
    # Convert all data to strings
    df = df.astype(str)
    
    # Calculate batch size based on percentage
    total_rows = len(df)
    batch_size = int(total_rows * (batch_percentage / 100))
    num_batches = (total_rows + batch_size - 1) // batch_size  # Round up division
    
    logger.info(f"Total rows: {total_rows}")
    logger.info(f"Batch size ({batch_percentage}%): {batch_size} rows")
    logger.info(f"Number of batches: {num_batches}")
    
    # Prepare the insert SQL
    columns = df.columns.tolist()
    insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES %s"
    
    with conn.cursor() as cur:
        for batch_num in range(num_batches):
            try:
                # Calculate start and end indices for this batch
                start_idx = batch_num * batch_size
                end_idx = min((batch_num + 1) * batch_size, total_rows)
                
                # Get the batch data
                batch_df = df.iloc[start_idx:end_idx]
                
                # Convert batch to list of tuples
                batch_values = [tuple(x) for x in batch_df.values]
                
                # Insert batch
                execute_values(cur, insert_sql, batch_values)
                conn.commit()
                
                logger.info(f"Batch {batch_num + 1}/{num_batches} inserted successfully ({start_idx} to {end_idx} rows)")
                
            except Exception as e:
                conn.rollback()
                logger.error(f"Error inserting batch {batch_num + 1}: {str(e)}")
                raise

def read_data(filename: str) -> pd.DataFrame:
    """Read data from CSV or Excel files."""
    if not os.path.exists(filename):
        raise FileNotFoundError(f"File not found: {filename}")
        
    try:
        if filename.endswith(".csv"):
            data = pd.read_csv(filename)
            logger.info(f"Reading CSV file: {filename}")
        elif filename.endswith(".xlsx"):
            data = pd.read_excel(filename)
            logger.info(f"Reading Excel file: {filename}")
        elif filename.endswith(".xls"):
            data = pd.read_excel(filename, engine="xlrd")
            logger.info(f"Reading legacy Excel file: {filename}")
        else:
            raise ValueError("Unsupported file format!")
        return data
    except Exception as e:
        logger.error(f"Error reading file {filename}: {str(e)}")
        raise

def process_excel_to_postgres(filename: str, table_name: str, database_url: str) -> None:
    """Main function to process file and load it into CockroachDB."""
    try:
        # Read the file
        df = read_data(filename)
        
        if df.empty:
            raise ValueError("The file is empty")
            
        # Connect to database
        conn = connect_to_postgresql(database_url)
        
        # Create table if needed
        create_table_if_not_exists(conn, df, table_name)
        
        # Insert data
        insert_data_in_batches(conn, df, table_name,30)
        
        # Close connection
        conn.close()
        logger.info("Process completed successfully")
        
    except Exception as e:
        logger.error(f"Error processing file: {str(e)}")
        raise

if __name__ == "__main__":
    try:
        # Get database credentials
        Username = os.getenv("Username_database")
        Host = os.getenv("Host")
        Port = os.getenv("Port")
        password = os.getenv("Password")
        Database = os.getenv("Database")
        
        if not all([Username, Host, Port, password, Database]):
            raise ValueError("Missing required environment variables")
            
        # Construct database URL
        database_url = f"postgresql://{Username}:{password}@{Host}:{Port}/{Database}?sslmode=require"
        
        # Process file
        process_excel_to_postgres(
            filename=r"C:\Users\DELL\Desktop\MI-ware\Development\seed_data\customer_transactions.csv",
            table_name="customer_transaction",
            database_url=database_url
        )
        
    except Exception as e:
        logger.error(f"Failed to process data: {e}")

INFO:__main__:Reading CSV file: C:\Users\DELL\Desktop\MI-ware\Development\seed_data\customer_transactions.csv


INFO:__main__:Successfully connected to CockroachDB
INFO:__main__:Table customer_transaction is ready
INFO:__main__:Total rows: 75
INFO:__main__:Batch size (30%): 22 rows
INFO:__main__:Number of batches: 4
INFO:__main__:Batch 1/4 inserted successfully (0 to 22 rows)
INFO:__main__:Batch 2/4 inserted successfully (22 to 44 rows)
INFO:__main__:Batch 3/4 inserted successfully (44 to 66 rows)
INFO:__main__:Batch 4/4 inserted successfully (66 to 75 rows)
INFO:__main__:Process completed successfully


In [None]:

from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from pydantic import BaseModel
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
import logging
import os
from typing import Optional
import shutil
from datetime import datetime

# Set up logging
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    filename="app.log",
    filemode="w"
)
logger = logging.getLogger(__name__)

app = FastAPI(title="Excel to CockroachDB Loader")

class DatabaseParams(BaseModel):
    username: str
    password: str
    host: str
    port: int
    database: str
    table_name: str
    batch_percentage: int = 30

def connect_to_postgresql(database_url: str):
    """Create database connection using psycopg2."""
    try:
        conn = psycopg2.connect(database_url)
        logger.info("Successfully connected to CockroachDB")
        return conn
    except Exception as e:
        logger.error(f"Connection error: {str(e)}")
        raise

def create_table_if_not_exists(conn, df: pd.DataFrame, table_name: str) -> None:
    """Create table if it doesn't exist using pure SQL."""
    columns = [f"{col} TEXT" for col in df.columns]
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        {', '.join(columns)}
    )
    """
    
    with conn.cursor() as cur:
        try:
            cur.execute(create_table_sql)
            conn.commit()
            logger.info(f"Table {table_name} is ready")
        except Exception as e:
            conn.rollback()
            logger.error(f"Error creating table: {str(e)}")
            raise

def insert_data_in_batches(conn, df: pd.DataFrame, table_name: str, batch_percentage: int) -> dict:
    """Insert data using psycopg2 execute_values in percentage-based batches."""
    status_updates = []
    
    # Convert all data to strings
    df = df.astype(str)
    
    # Calculate batch size based on percentage
    total_rows = len(df)
    batch_size = int(total_rows * (batch_percentage / 100))
    num_batches = (total_rows + batch_size - 1) // batch_size
    
    logger.info(f"Total rows: {total_rows}")
    logger.info(f"Batch size ({batch_percentage}%): {batch_size} rows")
    logger.info(f"Number of batches: {num_batches}")
    
    # Prepare the insert SQL
    columns = df.columns.tolist()
    insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES %s"
    
    with conn.cursor() as cur:
        for batch_num in range(num_batches):
            try:
                start_idx = batch_num * batch_size
                end_idx = min((batch_num + 1) * batch_size, total_rows)
                batch_df = df.iloc[start_idx:end_idx]
                batch_values = [tuple(x) for x in batch_df.values]
                
                execute_values(cur, insert_sql, batch_values)
                conn.commit()
                
                status_message = f"Batch {batch_num + 1}/{num_batches} inserted ({start_idx} to {end_idx} rows)"
                logger.info(status_message)
                status_updates.append(status_message)
                
            except Exception as e:
                conn.rollback()
                error_msg = f"Error inserting batch {batch_num + 1}: {str(e)}"
                logger.error(error_msg)
                raise HTTPException(status_code=500, detail=error_msg)
    
    return {
        "total_rows": total_rows,
        "batches_completed": num_batches,
        "status_updates": status_updates
    }

@app.post("/upload/")
async def upload_file(
    file: UploadFile = File(...),
    db_params: DatabaseParams = Form(...)
):
    try:
        # Save uploaded file temporarily
        temp_file_path = f"temp_{datetime.now().strftime('%Y%m%d_%H%M%S')}{os.path.splitext(file.filename)[1]}"
        with open(temp_file_path, "wb") as buffer:
            shutil.copyfileobj(file.file, buffer)
        
        # Construct database URL
        database_url = f"postgresql://{db_params.username}:{db_params.password}@{db_params.host}:{db_params.port}/{db_params.database}?sslmode=require"
        
        # Read the file
        df = pd.read_csv(temp_file_path) if temp_file_path.endswith('.csv') else pd.read_excel(temp_file_path)
        
        if df.empty:
            raise HTTPException(status_code=400, detail="The uploaded file is empty")
        
        # Connect to database
        conn = connect_to_postgresql(database_url)
        
        # Create table if needed
        create_table_if_not_exists(conn, df, db_params.table_name)
        
        # Insert data and get status
        result = insert_data_in_batches(conn, df, db_params.table_name, db_params.batch_percentage)
        
        # Clean up
        conn.close()
        os.remove(temp_file_path)
        
        return {
            "status": "success",
            "message": "Data processed successfully",
            "details": result
        }
        
    except Exception as e:
        # Clean up temp file if it exists
        if 'temp_file_path' in locals() and os.path.exists(temp_file_path):
            os.remove(temp_file_path)
            
        logger.error(f"Error processing upload: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "healthy"}
