## SECTION 1: API - Main

In [76]:
path_departments = "departments.csv"
path_hired_employees = "hired_employees.csv"
path_jobs = "jobs.csv"

table_departments = 'departments'
columns_departments = ['id_departments', 'department']
table_employees = 'employees'
columns_employees = ['id_hired_employees', 'hired_employees', 'hired_date', 'department', 'job']
table_jobs = 'jobs'
columns_jobs = ['id_job', 'job']

batch_size = 100

delimiter = ","

In [74]:
def main(path: str, delimiter: str, table_name: str, column_names: list ,batch_size) -> None:
    """
    Main function to read a CSV file and insert its data into a database table.

    Parameters:
    path (str): The path to the CSV file to read.
    delimiter (str): The delimiter used in the CSV file.
    table_name (str): The name of the database table where the data will be inserted.
    batch_size (int): The number of rows to insert in each batch. Default is 1000.

    Returns:
    None
    """
    df = read_csv_with_error_handling(path, delimiter)
    
    if df is not None:
        try:
            _data_to_db(table_name, df, batch_size, column_names)
        except Exception as error:
            print(f"Error inserting data into the database: {error}")
    else:
        print("Data was not read from the CSV file due to an error.")

In [81]:
main(path_jobs, delimiter, table_jobs, columns_jobs, batch_size)

Table successfully truncated.
Data inserted into the database successfully!


## SECTION 1: API - Functions

### 1. Extracción de datos

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.utils import AnalysisException
import psycopg2
import psycopg2.extras as extras
import logging
import pandas as pd

In [43]:
def _read_csv_with_error_handling(file_path: str, separator: str) -> DataFrame:
    """
    Reads a CSV file and displays its content in a DataFrame.
    Args:
        file_path (str): The path to the CSV file.
        separator (str): The delimiter used in the CSV file to separate fields.
    Returns:
        DataFrame: A DataFrame containing the content of the CSV file.
    Raises:
        FileNotFoundError: If the file is not found at the specified path.
    """
    try:
        # Create a Spark session
        logging.info("Initializing Spark session")
        spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

        # Read the CSV file into a DataFrame
        df = spark.read.csv(file_path, header=False, inferSchema=True, sep=separator)

        return df

    except AnalysisException as e:
        # Handle the exception if the file is not found
        raise FileNotFoundError(f"File not found at path: {file_path}")


### 2. Carga de datos

In [47]:
def _db_connection():
    """
    Open a connection to the PostgreSQL database.
    Returns:
        conn (psycopg2.extensions.connection): PostgreSQL database connection object.
    """
    # Establish a connection to the PostgreSQL database
    conn = psycopg2.connect(
        host="localhost",
        user="postgres",
        password="postgrespass",
        database="postgres",
        port="5432"
    )
    
    return conn

In [48]:
def _truncate_table(table_name: str) -> None:
    """
    Truncate (empty) an existing table in the database.
    Args:
        table_name (str): The name of the table to be truncated.
    Returns:
        None
    """
    # Open a connection to the database
    conn = db_connection()
    cursor = conn.cursor()
    
    # Empty the existing table
    truncate_query = f"TRUNCATE TABLE {table_name}"
    cursor.execute(truncate_query)
    conn.commit()
    print("Table successfully truncated.")
    
    # Close the connection
    conn.close()

In [80]:
def _data_to_db(table_name: str, df_data: DataFrame, batch_size: int, column_names: list) -> None:
    """
    Insert data from a Pandas DataFrame into a specified database table.

    Parameters:
    table_name (str): The name of the database table where the data will be inserted.
    df_data (DataFrame): The DataFrame containing the data to be inserted.

    Returns:
    None
    """
    
    if not (1 <= batch_size <= 1000):
        raise ValueError("batch_size must be between 1 and 1000.")
    
    # Open a connection to the database
    conn = db_connection()
    cursor = conn.cursor()
    
    df_data= df_data.toDF(*column_names)
    df_data = df_data.toPandas()

    # Empty the existing table
    truncate_table(table_name)

   # Create a list of tuples from the DataFrame
    tuples = [tuple(x) for x in df_data.to_numpy()]

    # Get the column names
    cols = ','.join(list(df_data.columns))

     # Create the SQL statement
    query = f"INSERT INTO {table_name}({cols}) VALUES %s"
    
    # Insert data in batches
    for i in range(0, len(tuples), batch_size):
        batch = tuples[i:i + batch_size]
        try:
            extras.execute_values(cursor, query, batch)
            conn.commit()            
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
    
    
    conn.close()
    print("Data inserted into the database successfully!")