# CRC Development #

In [1039]:
# Libraries
import os
import yaml #handling yaml files. 
import pandas as pd
from psycopg2 import sql
from sqlalchemy import create_engine
from dotenv import load_dotenv
from directory_config import DATA_CONFIGURATION_DIRECTORY, DATABASE_CONFIGURATION_DIRECTORY #import root directory and configuraiton.json file. 

#Mapping SQL data types from YAML to Pandas/Numpy types. 
SQL_TO_PANDAS_DTYPES  = {
    "FLOAT": "float64",
    "INTEGER": "Int64", #Nullable integer type 
    "VARCHAR(50)": "string"
}


## Step 1: Defining functions.   ##

### 1.1 Function to load the datasets configuration file (e.g. irish_data.yaml) ###

In [1040]:
def load_config(config_file_path): #file path of irish_data.yaml file
    """Load YAML configuration file."""
    with open(config_file_path, 'r') as file: #opening the file in the read mode.
        print(f"Following Configuration files has been loaded successfully: \n{file}")
        return yaml.safe_load(file) #loading the cofiguration_yaml file. 
        

### 1.2 Function to create the database from database configuration file (e.g. irish_db.yaml) ###

Connection to the PostgreSQL database is made. Then, the a cursor is iterated throughout the configuration file ```irish_db.yaml``` and schema is created in PostgreSQL, column definitions are extracted from configuration file, and tables are created accordingly. 

In [1041]:

def create_database(database_config):
    """Create database tables using SQLAlchemy to align with df.to_sql in write_to_postgres."""
    try:
        # Create database connection using SQLAlchemy
        engine = create_engine(
            f"postgresql://{database_config['database']['user']}:{database_config['database']['password']}@"
            f"{database_config['database']['host']}:{database_config['database']['port']}/{database_config['database']['dbname']}"
        )
        
        print("Database connected successfully!")

        # Iterate over tables to create them dynamically
        for table_name, table_data in database_config["tables"].items():
            columns = table_data["columns"]
            
            # Convert schema dictionary to DataFrame for df.to_sql()
            df_schema = pd.DataFrame(columns=columns.keys())
            df_schema.to_sql(table_name, engine, if_exists='replace', index=False)
            
            print(f"Table {table_name} has been created successfully.")

        print("All tables have been successfully created using df.to_sql.")
    except Exception as e:
        print(f"Error creating database tables: {e}")


### 1.3 Function to import the dataset into the dataframe and preprocess. ###

In [1042]:
def convert_column_type(df, column_types):
    """Convert DataFrame columns to expected data types based on YAML config."""
    for col, dtype in column_types.items():
        if col in df.columns:
            try:
                # Convert to FLOAT (coerce invalid values to NaN)
                if dtype == "float64":
                    df[col] = pd.to_numeric(df[col], errors="coerce")

                # Convert to Nullable INTEGER (avoids NaN conversion error)
                elif dtype == "Int64":
                    df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")  # Nullable integer type

                # Convert to STRING
                elif dtype.startswith("string"):
                    df[col] = df[col].astype("string").fillna("")

            except Exception as e:
                print(f"⚠️ Warning: Could not convert column {col} to {dtype}. Error: {e}")

    return df


In [1043]:
def import_dataset(data_config):
    """Read and load all datasets specified in the YAML file into Pandas DataFrames."""
    try:
        datasets = {} #Declare a dictionary for storing datasets names and respective dataframes. 
        
        for dataset_name, dataset_info in data_config["datasets"].items():
            file_name = dataset_info["file_name"]
            file_path = dataset_info["path"]
            column_name = dataset_info["columns"].items()
            file_type = dataset_info["file_type"]
            delimiter = dataset_info["delimiter"]
            

            #Read CSV and Excel files
            if file_type == "csv":
                df = pd.read_csv(file_path, delimiter = delimiter, dtype=str) #Read all as string (by default).
            elif file_type == "excel":
                df = pd.read_excel(file_path, engine="openpyxl", dtype=str)
            else:
                raise ValueError("Unsupported file type: {file_type}")
            
            #Transformations (if any). It seems that file_specific script can also be associated to the file here for preprocessing and related transformations.
            
            #Example: Convert all column names to lowercase and replace '.' with '_' in names. 
            df.columns = [col.lower() for col in df.columns]
            df.columns = [col.replace('.', '_') for col in df.columns]
            df["file_origin"] = file_name #Addding the filenames in the new column.
            print(f"Following is the preprocessed column name:{df.columns}\n")
            
            # Extract expected data types from YAML
            column_types = {
                col: SQL_TO_PANDAS_DTYPES.get(dtype.split()[0], "string")
                for col, dtype in column_name
            }

            print(f"🔹 Applying data types for {dataset_name}: {column_types}\n")

            # Convert DataFrame columns to match expected types
            df = convert_column_type(df, column_types)


            #Store the dataset_name and respective dataframe in dictionary. 
            datasets[dataset_name] = df
            print(f"Loaded dataset: {dataset_name} | Shape: {df.shape}\n")
            print(f"Datasets data: {datasets}\n")

        return datasets
    
    except Exception as e:
        print(f"Error loading data file: {e}")
        return None

### 1.4 Function to write the DataFrames to PostgreSQL ###

In [1044]:
def write_to_postgres(db_config, datasets_dict):
    """Write DataFrame to PostgreSQL using bulk ingestion for efficiency."""
    try:
        # Create database connection using SQLAlchemy for bulk operations
        engine = create_engine(
            f"postgresql://{db_config['database']['user']}:{db_config['database']['password']}@"
            f"{db_config['database']['host']}:{db_config['database']['port']}/{db_config['database']['dbname']}"
        )
        
        print("Connected to the database successfully!")

        # Iterate over tables
        for table_name in db_config['tables'].keys():
            if table_name in datasets_dict:
                df = datasets_dict[table_name]
                
                # Use df.to_sql for bulk insertion
                df.to_sql(table_name, engine, if_exists='append', index=False, method='multi')
                
                print(f"Bulk inserted data into {table_name} successfully.")

        print("Data written to PostgreSQL successfully.")
    except Exception as e:
        print(f"Error writing to PostgreSQL: {e}")


## Step 2: Defining ```main``` function ##

Here the ```load_config``` function are called to load the datasets from the datasets_configuration file. ```create_database``` function is called to create the databse tables, and ```import_dataset``` function is called to process and import the datasets. Finally, the data is written to PostgreSQL using ```write_to_postgres``` function.

In [1045]:
def main():
    #Load the configuration file
    db_config = load_config(DATABASE_CONFIGURATION_DIRECTORY)
    data_config = load_config(DATA_CONFIGURATION_DIRECTORY)
    print(db_config)
    print(data_config)

    #Load the database configuration (irish_db.yaml) and create database tables.

    load_dotenv() #load the .env file for the credentials. 
    db_config["database"]["user"] = os.getenv("DB_USER")
    db_config["database"]["password"] = os.getenv("DB_PASSWORD")
    
    create_database(db_config)
    print(create_database)

    #Imports datasets and preprocess it. 
    datasets_dict = import_dataset(data_config)

    #Write data to PostgreSQL
    write_to_postgres(db_config, datasets_dict)

In [1046]:
if __name__ == "__main__":
    main()

Following Configuration files has been loaded successfully: 
<_io.TextIOWrapper name='./config/irish_db.yaml' mode='r' encoding='UTF-8'>
Following Configuration files has been loaded successfully: 
<_io.TextIOWrapper name='./config/irish_data.yaml' mode='r' encoding='UTF-8'>
{'database': {'host': 'localhost', 'port': 5444, 'dbname': 'irish_db', 'user': '${DB_USER}', 'password': '${DB_PASSWORD}'}, 'tables': {'irish': {'columns': {'file_origin': 'VARCHAR(50)', 'sepal_length': 'FLOAT', 'sepal_width': 'FLOAT', 'petal_length': 'FLOAT', 'petal_width': 'FLOAT', 'variety': 'VARCHAR(50)'}}, 'mtcars': {'columns': {'file_origin': 'VARCHAR(50)', 'model': 'VARCHAR(50)', 'mpg': 'FLOAT', 'cyl': 'INTEGER', 'disp': 'FLOAT', 'hp': 'INTEGER', 'drat': 'FLOAT', 'wt': 'FLOAT', 'qsec': 'FLOAT', 'vs': 'INTEGER', 'am': 'INTEGER', 'gear': 'INTEGER', 'carb': 'INTEGER'}}, 'weather': {'columns': {'file_origin': 'VARCHAR(50)', 'mintemp': 'FLOAT', 'maxtemp': 'FLOAT', 'rainfall': 'FLOAT', 'evaporation': 'FLOAT', 'sun

## Step 3: Run and verify the pipeline ###

Ensure that you are connected to the VM with PostgreSQL database. Also, the tunneling (```ssh```) is done so that the database created by you can be pushed to the database. 