In [4]:
import pandas as pd
from database_utils import DatabaseConnector
import re 
from sqlalchemy import create_engine, text, insert 
from sqlalchemy.inspection import inspect
from sqlalchemy.exc import SQLAlchemyError

In [3]:
#START HERE FROM 28th JULY 
# update the shared column names so that they are primary keys 

# I've started creating a new funciton, but the explaination of what it does it in chatgpt 

In [13]:
# CURRENT CODE  


# HELPER FUNCTIONS 

def run_cleaning_operations():
    # Create instance of a DatabaseConnector
    instance = DatabaseConnector()
    # Create an engine by using the init_my_db_engine() method of DatabaseConnector
    engine = instance.init_my_db_engine()

    with engine.connect() as connection:
        with connection.begin():  # Ensure the transaction is committed
            try:
                # Run the cleaning functions
                clean_numbers(connection, 'dim_products', 'EAN')
                clean_store_or_product_codes(connection, 'dim_products', 'product_code')

                # Verify results by fetching data again
                ean_data = fetch_data(connection, 'dim_products', 10)
                product_code_data = fetch_data(connection, 'dim_products', 10)
                print(f"Sample EAN data after cleaning: {ean_data}")
                print(f"Sample product_code data after cleaning: {product_code_data}")

            except SQLAlchemyError as e:
                print(f"An error occurred: {e}")

    print('End of cleaning operations')





# Create function to fetch data from the table in my local SQL server so that I can get the data to perform checks on it 
def fetch_data(connection, table_name, limit=5):
    fetch_data_query = f"SELECT * FROM {table_name} LIMIT {limit};"
    result = connection.execute(text(fetch_data_query))
    return result.fetchall()

# Function to check the column data type to check if the column type has been correctly converted 
def check_column_type(connection, table_name, column_name):
    check_column_type_query = f"""
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_name = '{table_name}' AND column_name = '{column_name}';
    """
    result = connection.execute(text(check_column_type_query))
    return result.fetchone()

# Function to determine the maximum length of values in the column
def get_max_length(connection, table_name, column_name):
    try:
        max_length_sql = f"""
        SELECT MAX(LENGTH(CAST("{column_name}" AS TEXT))) AS max_length
        FROM {table_name};
        """
        result = connection.execute(text(max_length_sql)).fetchone()
        
        if result and result[0] is not None:
            return result[0]
        else:
            print(f"Warning: {table_name}.{column_name} has no non-null values or an error occurred.")
            return 255  # Default length
    except Exception as e:
        print(f"Error retrieving max length for {table_name}.{column_name}: {e}")
        return 255  # Default length in case of error


def remove_pound_symbol(connection, table_name, column_name):
    remove_pound_sql = f"""
    UPDATE {table_name}
    SET {column_name} = REPLACE({column_name}, '£', '')
    WHERE {column_name} LIKE '£%';
    """
    connection.execute(text(remove_pound_sql))

def add_weight_categories(connection, table_name, weight_column, new_column):
    add_column = f"""
    ALTER TABLE {table_name}
    ADD COLUMN {new_column} VARCHAR(20)
    """
    connection.execute(text(add_column))  
    
    update_weights = f"""
    UPDATE {table_name}
    SET {new_column} = CASE
        WHEN {weight_column} < 2 THEN 'Light'
        WHEN {weight_column} >= 2 AND weight_in_kg < 40 THEN 'Mid_Sized'
        WHEN {weight_column} >= 40 AND weight_in_kg < 140 THEN 'Heavy'
        WHEN {weight_column} >= 140 THEN 'Truck_Required'
        ELSE 'Unknown' 
    END;
    """
    connection.execute(text(update_weights))

def add_primary_key(connection, table_name, column_name):       
    # Check if the primary key constraint already exists
    check_pk_sql = f"""
    SELECT constraint_name
    FROM information_schema.table_constraints
    WHERE table_name = '{table_name}' AND constraint_type = 'PRIMARY KEY';
    """
    result = connection.execute(text(check_pk_sql))
    pk_exists = result.fetchone() is not None
    
    if pk_exists:
        print(f"Primary key already exists for {table_name}, skipping addition.")
        return
    
    # Remove rows with null values in the column
    remove_nulls_sql = f"""
    DELETE FROM {table_name}
    WHERE {column_name} IS NULL;
    """
    connection.execute(text(remove_nulls_sql))
        
    # Ensure the column has unique values
    check_unique_sql = f"""
    SELECT {column_name}, COUNT(*) 
    FROM {table_name}
    GROUP BY {column_name} 
    HAVING COUNT(*) > 1; 
    """
    result = connection.execute(text(check_unique_sql))
    
    duplicates = [row[0] for row in result]
    
    if duplicates:
        print("Duplicates found, cannot add primary key.")
    else:
        # Create constraint name  
        constraint_name = f"{table_name}_pk"

        # Add primary key to specified table 
        add_pk_sql = f"""
        ALTER TABLE {table_name}
        ADD CONSTRAINT {constraint_name} PRIMARY KEY ({column_name});
        """
        connection.execute(text(add_pk_sql))

        print(f"Primary key added to {table_name} on column {column_name}.")

def find_and_report_orphans(connection, table_name, column_name, referenced_table, referenced_column):
    find_orphans_sql = f"""
    SELECT {column_name}
    FROM {table_name}
    WHERE {column_name} IS NOT NULL
    AND {column_name} NOT IN (SELECT {referenced_column} FROM {referenced_table});
    """
    result = connection.execute(text(find_orphans_sql))
    orphans = result.fetchall()
    
    orphan_count = len(orphans)
    
    if orphan_count > 0:
        print(f"Found {orphan_count} orphaned records in {table_name} and column {column_name}:")
        for orphan in orphans:
            print(orphan)
    else:
        print(f"No orphaned records found in {table_name} and column {column_name}.")
    
    return orphan_count, orphans

def remove_orphans(connection, table_name, column_name, referenced_table, referenced_column):
    # Find and report orphaned records
    orphan_count, orphans = find_and_report_orphans(connection, table_name, column_name, referenced_table, referenced_column)
    
    if orphan_count > 0:
        # Remove orphaned records
        remove_orphans_sql = f"""
        DELETE FROM {table_name}
        WHERE {column_name} IS NOT NULL
        AND {column_name}::text NOT IN (SELECT {referenced_column}::text FROM {referenced_table});
        """
        connection.execute(text(remove_orphans_sql))
        print(f"Orphaned records removed from {table_name}.")
    else:
        print(f"No orphaned records to remove from {table_name}.")

def add_foreign_key(connection, table_name, column_name, referenced_table, referenced_column):
    # Check if the foreign key constraint already exists for the specific column
    check_fk_sql = f"""
    SELECT tc.constraint_name
    FROM information_schema.table_constraints tc
    JOIN information_schema.key_column_usage kcu
    ON tc.constraint_name = kcu.constraint_name
    WHERE tc.table_name = '{table_name}' AND kcu.column_name = '{column_name}' AND tc.constraint_type = 'FOREIGN KEY';
    """
    result = connection.execute(text(check_fk_sql))
    fk_exists = result.fetchone() is not None
    
    if fk_exists:
        print(f"Foreign key already exists for {table_name}.{column_name}, skipping addition.")
        return
    
    # Remove rows with null values in the column
    remove_nulls_sql = f"""
    DELETE FROM {table_name}
    WHERE {column_name} IS NULL;
    """
    connection.execute(text(remove_nulls_sql))
    
    # Remove orphaned records
    remove_orphans(connection, table_name, column_name, referenced_table, referenced_column)
    
    # Create constraint name  
    constraint_name = f"{table_name}_{column_name}_fk"

    # Add foreign key to specified table 
    add_fk_sql = f"""
    ALTER TABLE {table_name}
    ADD CONSTRAINT {constraint_name} FOREIGN KEY ({column_name}) REFERENCES {referenced_table} ({referenced_column});
    """
    connection.execute(text(add_fk_sql))

    print(f"Foreign key added to {table_name}.{column_name} referencing {referenced_table}.{referenced_column}.")


def get_primary_keys(connection, table_name):
    inspector = inspect(connection)
    primary_keys = inspector.get_pk_constraint(table_name)['constrained_columns']
    return primary_keys

def get_foreign_keys(connection, table_name):
    inspector = inspect(connection)
    foreign_keys = inspector.get_foreign_keys(table_name)
    return foreign_keys


def find_null_values_in_table(connection, table_name):
    # Get the column names of the table
    inspector = inspect(connection)
    columns = [column['name'] for column in inspector.get_columns(table_name)]
    
    # Construct the SQL query dynamically
    where_clause = " OR ".join([f"{column} IS NULL" for column in columns])
    find_nulls_sql = f"""
    SELECT *
    FROM {table_name}
    WHERE {where_clause};
    """
    
    result = connection.execute(text(find_nulls_sql))
    null_rows = result.fetchall()
    
    if null_rows:
        print(f"Rows with NULL values in table '{table_name}':")
        for row in null_rows:
            print(row)
    else:
        print(f"No NULL values found in table '{table_name}'.")


    

# CLEANING FUNCTIONS 

# Create function to clean uuid with regex #working regex:   WHERE TRIM(CAST({column_name} AS TEXT)) !~* '^[a-f0-9\\-]+$';
def clean_uuid(connection, table_name, column_name):
    clean_uuid = f"""
    UPDATE {table_name}
    SET {column_name} = NULL
    WHERE TRIM(CAST({column_name} AS TEXT)) !~* '^[a-f0-9]{{8}}-[a-f0-9]{{4}}-[a-f0-9]{{4}}-[a-f0-9]{{4}}-[a-f0-9]{{12}}$';
    """  
    # Convert the SQL string to a TextClause object and execute the query
    connection.execute(text(clean_uuid))

# Create function to clean numeric data (both integer and floats) data with regex by ensuring they are numbers 
def clean_numbers(connection, table_name, column_name):
    clean_numbers_sql = f"""
    UPDATE "{table_name}"
    SET "{column_name}" = NULL
    WHERE CAST("{column_name}" AS TEXT) !~ '^[-]?[0-9]*\\.?[0-9]+$';
    """
    connection.execute(text(clean_numbers_sql))

def clean_card_number(connection, table_name, column_name):
    # Identify and count invalid card numbers before cleaning
    count_invalid_card_number_sql = f"""
    SELECT COUNT(*), ARRAY_AGG("{column_name}")
    FROM "{table_name}"
    WHERE "{column_name}" IS NOT NULL AND CAST("{column_name}" AS TEXT) !~ '^[0-9]+$';
    """
    result = connection.execute(text(count_invalid_card_number_sql)).fetchone()
    count_invalid, invalid_values = result[0], result[1]
    
    print(f"Invalid card_number count: {count_invalid}")
    if count_invalid > 0:
        print(f"Invalid card_number values: {invalid_values}")

    # Update invalid card numbers to NULL
    clean_card_number_sql = f"""
    UPDATE "{table_name}"
    SET "{column_name}" = NULL
    WHERE CAST("{column_name}" AS TEXT) !~ '^[0-9]+$';
    """
    connection.execute(text(clean_card_number_sql))


def clean_ean(connection, table_name, column_name):
    # Identify and count invalid EANs before cleaning
    count_invalid_ean_sql = f"""
    SELECT COUNT(*), ARRAY_AGG("{column_name}")
    FROM "{table_name}"
    WHERE "{column_name}" IS NOT NULL AND CAST("{column_name}" AS TEXT) !~ '^[0-9]+$';
    """
    result = connection.execute(text(count_invalid_ean_sql)).fetchone()
    count_invalid, invalid_values = result[0], result[1]
    
    print(f"Invalid EAN count: {count_invalid}")
    if count_invalid > 0:
        print(f"Invalid EAN values: {invalid_values}")

    # Update invalid EANs to NULL
    clean_ean_sql = f"""
    UPDATE "{table_name}"
    SET "{column_name}" = NULL
    WHERE CAST("{column_name}" AS TEXT) !~ '^[0-9]+$';
    """
    connection.execute(text(clean_ean_sql))


def clean_exp_date(connection, table_name, column_name):
    clean_numbers_sql = f"""
    UPDATE {table_name}
    SET "{column_name}" = NULL
    WHERE CAST("{column_name}" AS TEXT) !~ '^\\d{2}/\\d{2}$';
    """
    connection.execute(text(clean_numbers_sql))

# Create function to clean store_code and product_code data with regex (e.g. store_code 'BL-8387506C', product_code 'R7-3126933h')
def clean_store_or_product_codes(connection, table_name, column_name):
    clean_numbers_sql = f"""
    UPDATE "{table_name}"
    SET "{column_name}" = NULL
    WHERE CAST("{column_name}" AS TEXT) !~ '^[A-Za-z0-9]+-[A-Za-z0-9]+$';
    """
    connection.execute(text(clean_numbers_sql))

def clean_product_code(connection, table_name, column_name):
    # Identify and count invalid product codes before cleaning
    count_invalid_product_code_sql = f"""
    SELECT COUNT(*), ARRAY_AGG("{column_name}")
    FROM "{table_name}"
    WHERE "{column_name}" IS NOT NULL AND CAST("{column_name}" AS TEXT) !~ '^[a-zA-Z0-9][a-zA-Z0-9]-[a-zA-Z0-9]+$';
    """
    result = connection.execute(text(count_invalid_product_code_sql)).fetchone()
    count_invalid, invalid_values = result[0], result[1]
    
    print(f"Invalid product_code count: {count_invalid}")
    if count_invalid > 0:
        print(f"Invalid product_code values: {invalid_values}")

    # Update invalid product codes to NULL
    clean_product_code_sql = f"""
    UPDATE "{table_name}"
    SET "{column_name}" = NULL
    WHERE CAST("{column_name}" AS TEXT) !~ '^[a-zA-Z0-9][a-zA-Z0-9]-[a-zA-Z0-9]+$';
    """
    connection.execute(text(clean_product_code_sql))



# cleaning text data  
def clean_text_data(connection, table_name, column_name):
    clean_text_sql = f"""
    UPDATE "{table_name}"
    SET "{column_name}" = NULL
    WHERE CAST("{column_name}" AS TEXT) !~ '^[A-Za-z_]+$';
    """
    connection.execute(text(clean_text_sql))

def clean_date_data(connection, table_name, column_name):
    clean_date_sql = f"""
    UPDATE {table_name}
    SET {column_name} = 
    TO_DATE(
        REGEXP_REPLACE(
            CAST({column_name} AS TEXT), 
            '\\((\\d+), (\\d+), (\\d+), \\d+, \\d+\\)', 
            '\\1-\\2-\\3'
        ), 
        'YYYY-MM-DD'
    )
    WHERE CAST({column_name} AS TEXT) ~ '\\(\\d+, \\d+, \\d+, \\d+, \\d+\\)';
    """
    connection.execute(text(clean_date_sql))

# CONVERTING FUNCTIONS  

# Create function to convert a specified column to UUID 
def convert_to_uuid(connection, table_name, column_name): 
    convert_date_uuid = f"""
        ALTER TABLE {table_name}
        ALTER COLUMN {column_name} TYPE UUID
        USING {column_name}::UUID;
        """
    # Convert the SQL string to a TextClause object and execute the query
    connection.execute(text(convert_date_uuid))

# Function to convert the column to VARCHAR with the determined maximum length // goes after USING CAST: ALTER COLUMN {column_name} DROP NOT NULL;

def convert_to_varchar(connection, table_name, column_name, length):
    convert_to_var_sql = f"""
    ALTER TABLE {table_name}
    ALTER COLUMN "{column_name}" TYPE VARCHAR({length}) USING CAST("{column_name}" AS VARCHAR({length})),
    ALTER COLUMN "{column_name}" DROP NOT NULL;
    """
    connection.execute(text(convert_to_var_sql))

# Create function to convert bigint to smalint 
def convert_to_smallint(connection, table_name, column_name): 
    convert_date_uuid = f"""
        ALTER TABLE {table_name}
        ALTER COLUMN {column_name} TYPE SMALLINT
        USING CAST({column_name} AS SMALLINT);
        """
    # Convert the SQL string to a TextClause object and execute the query
    connection.execute(text(convert_date_uuid))

# Create function to convert to date 
def convert_to_date(connection, table_name, column_name): 
    convert_date_sql = f"""
    ALTER TABLE {table_name}
    ALTER COLUMN {column_name} TYPE DATE
    USING {column_name}::DATE;
    """
    connection.execute(text(convert_date_sql))

# Create function to convert date to  to smalint 
def convert_to_float(connection, table_name, column_name): 
    convert_date_sql = f"""
    ALTER TABLE {table_name}
    ALTER COLUMN {column_name} TYPE FLOAT
    USING {column_name}::FLOAT;
    """
    connection.execute(text(convert_date_sql))

def convert_to_boolean(connection, table_name, column_name, new_column_name, condition_1, condition_2):
    # Check if the new column already exists
    check_column_sql = f"""
    SELECT column_name 
    FROM information_schema.columns 
    WHERE table_name = '{table_name}' AND column_name = '{new_column_name}';
    """
    result = connection.execute(text(check_column_sql)).fetchone()

    # If the column does not exist, add it
    if not result:
        add_column_sql = f"""
        ALTER TABLE {table_name}
        ADD COLUMN {new_column_name} BOOLEAN;
        """
        connection.execute(text(add_column_sql))
    
    convert_boolean_sql = f"""
    UPDATE {table_name}
    SET {new_column_name} = CASE 
        WHEN {column_name} = '{condition_1}' THEN TRUE 
        WHEN {column_name} = '{condition_2}' THEN FALSE 
        ELSE NULL 
    END; 
    """ 
    connection.execute(text(convert_boolean_sql))


# FUNCTIONS OF FUNCTIONS 

# clean and convert a integrer or float (must be a number with no extra characters) to varchar
def num_to_varchar_any(connection, table_name, column_name):
    connection = connection 
    table_name = table_name
    column_name = column_name
    clean_numbers(connection, table_name, column_name)
    length = get_max_length(connection, table_name, column_name)
    #print(length) 
    convert_to_varchar(connection, table_name, column_name, length)

# clean and convert any text to varchar (no special characters in text)
def text_to_varchar_any(connection, table_name, column_name):
    connection = connection 
    table_name = table_name
    column_name = column_name
    clean_text_data(connection, table_name, column_name)
    length = get_max_length(connection, table_name, column_name)
    #print(length) 
    convert_to_varchar(connection, table_name, column_name, length)

def text_to_varchar_255(connection, table_name, column_name):
    connection = connection 
    table_name = table_name
    column_name = column_name
    clean_text_data(connection, table_name, column_name)
    convert_to_varchar(connection, table_name, column_name, 255)

# clean and convert any text UUID to actual UUI D
def text_uuid_to_uuid(connection, table_name, column_name):
    connection = connection 
    table_name = table_name
    column_name = column_name
    clean_uuid(connection, table_name, column_name)
    convert_to_uuid(connection, table_name, column_name)

# clean and convcert expiry data to var char 
def exp_to_varchar_any(connection, table_name, column_name):
    connection = connection 
    table_name = table_name
    column_name = column_name
    clean_exp_date(connection, table_name, column_name)
    length = get_max_length(connection, table_name, column_name)
    #print(length) 
    convert_to_varchar(connection, table_name, column_name, length)

# clean and convcert expiry data to var char 
def text_date_to_date(connection, table_name, column_name):
    connection = connection 
    table_name = table_name
    column_name = column_name
    clean_date_data(connection, table_name, column_name)
    convert_to_date(connection, table_name, column_name)

def store_product_to_varchar(connection, table_name, column_name):
    connection = connection 
    table_name = table_name
    column_name = column_name
    clean_store_or_product_codes(connection, table_name, column_name)
    length = get_max_length(connection, table_name, column_name) 
    convert_to_varchar(connection, table_name, column_name, length)

def bigint_to_smallint(connection, table_name, column_name):
    connection = connection 
    table_name = table_name
    column_name = column_name
    clean_numbers(connection, table_name, column_name)
    convert_to_smallint(connection, table_name, column_name)

def text_to_float(connection, table_name, column_name):
    connection = connection 
    table_name = table_name
    column_name = column_name
    clean_numbers(connection, table_name, column_name)
    convert_to_float(connection, table_name, column_name)



# Function to run all operations
def run_all_operations():
    
    # Create instance of a DatabaseConnector  
    instance = DatabaseConnector() 
    # Create an engine by using the init_my_db_engine() method of DatabaseConnector 
    engine = instance.init_my_db_engine()
    
    #try to do engine.connect() 
    with engine.connect() as connection:
    
        with connection.begin():  # Ensure the transaction is committed
            
            run_cleaning_operations()

            #put the attempt to run the functions in a try block 
            try:
                
                # ORDERS TABLE 
                
                # Cleaning then converting the date_uuid
                # clean_uuid(connection, 'orders_table', 'date_uuid')
                # convert_to_uuid(connection, 'orders_table', 'date_uuid')
                text_uuid_to_uuid(connection, 'orders_table', 'date_uuid')
                
                # # Cleaning then converting the user_uuid
                # clean_uuid(connection, 'orders_table', 'user_uuid')
                # convert_to_uuid(connection, 'orders_table', 'user_uuid')
                text_uuid_to_uuid(connection, 'orders_table', 'user_uuid')

                # # Cleaning then converting the card_number
                # clean_numbers(connection, 'orders_table', 'card_number')
                # max_length = get_max_length(connection, 'orders_table', 'card_number')
                # convert_to_varchar(connection, 'orders_table', 'card_number', max_length)
                #num_to_varchar_any(connection, 'orders_table', 'card_number')
                clean_card_number(connection, 'orders_table', 'card_number')
                max_length = get_max_length(connection, 'orders_table', 'card_number')
                convert_to_varchar(connection, 'orders_table', 'card_number', max_length)

                # # Cleaning then converting store_code
                # clean_store_or_product_codes(connection, 'orders_table', 'store_code')
                # max_length = get_max_length(connection, 'orders_table', 'store_code')
                # convert_to_varchar(connection, 'orders_table', 'store_code', max_length)
                store_product_to_varchar(connection, 'orders_table', 'store_code')

                # # Cleaning then converting product_code
                # clean_store_or_product_codes(connection, 'orders_table', 'product_code')
                # max_length = get_max_length(connection, 'orders_table', 'product_code')
                # convert_to_varchar(connection, 'orders_table', 'product_code', max_length)
                # #print('Convert product_code worked')
                store_product_to_varchar(connection, 'orders_table', 'store_code')

                # # Cleaning then converting the product_quantity
                # clean_numbers(connection, 'orders_table', 'product_quantity')
                # convert_to_smallint(connection, 'orders_table', 'product_quantity')
                bigint_to_smallint(connection, 'orders_table', 'product_quantity')

                # DIM USERS TABLE 
                # # Cleaning then converting first_name
                # clean_text_data(connection, 'dim_users', 'first_name')
                # convert_to_varchar(connection, 'dim_users', 'first_name', 255)
                text_to_varchar_255(connection, 'dim_users', 'first_name')

                # # Cleaning then converting first_name
                # clean_text_data(connection, 'dim_users', 'last_name')
                # convert_to_varchar(connection, 'dim_users', 'last_name', 255)
                text_to_varchar_255(connection, 'dim_users', 'last_name')

                # # Cleaning then converting date_of_birth
                # clean_date_data(connection, 'dim_users', 'date_of_birth')
                # convert_to_date(connection, 'dim_users', 'date_of_birth')
                text_date_to_date(connection, 'dim_users', 'date_of_birth')

                # # Cleaning then converting country_code 
                # clean_text_data(connection, 'dim_users', 'country_code')
                # max_length = get_max_length(connection, 'dim_users', 'country_code')
                # convert_to_varchar(connection, 'dim_users', 'country_code', max_length)
                text_to_varchar_any(connection, 'dim_users', 'country_code')

                # # Cleaning then converting the user_uuid
                # clean_uuid(connection, 'dim_users', 'user_uuid')
                # convert_to_uuid(connection, 'dim_users', 'user_uuid')
                text_uuid_to_uuid(connection, 'dim_users', 'user_uuid')

                # # Cleaning then converting date_of_birth
                # clean_date_data(connection, 'dim_users', 'join_date')
                # convert_to_date(connection, 'dim_users', 'join_date')
                text_date_to_date(connection, 'dim_users', 'join_date')

                # DIM_STORE_DETAILS

                # # Cleaning then converting longitude
                # clean_numbers(connection, 'dim_store_details', 'longitude')
                # convert_to_float(connection, 'dim_store_details', 'longitude')
                text_to_float(connection, 'dim_store_details', 'longitude')

                # # Cleaning then converting locality 
                # clean_text_data(connection, 'dim_store_details', 'locality')
                # convert_to_varchar(connection, 'dim_store_details', 'locality', 255)
                text_to_varchar_255(connection, 'dim_store_details', 'locality')            

                # Cleaning then converting store_code 
                # clean_store_or_product_codes(connection, 'dim_store_details', 'store_code')
                # max_length = get_max_length(connection, 'dim_store_details', 'store_code')
                # convert_to_varchar(connection, 'dim_store_details', 'store_code', max_length)
                store_product_to_varchar(connection, 'dim_store_details', 'store_code')

                # # Cleaning then converting staff_numbers
                # clean_numbers(connection, 'dim_store_details', 'staff_numbers')
                # convert_to_smallint(connection, 'dim_store_details', 'staff_numbers')
                bigint_to_smallint(connection, 'dim_store_details', 'staff_numbers')

                # # Cleaning then converting opening_date
                # clean_date_data(connection, 'dim_store_details', 'opening_date')
                # convert_to_date(connection, 'dim_store_details', 'opening_date')
                text_date_to_date(connection, 'dim_store_details', 'opening_date')

                # # Cleaning then converting locality 
                # clean_text_data(connection, 'dim_store_details', 'store_type')
                # convert_to_varchar(connection, 'dim_store_details', 'store_type', 255)
                text_to_varchar_255(connection, 'dim_store_details', 'store_type') 

                # # Cleaning then converting longitude
                # clean_numbers(connection, 'dim_store_details', 'latitude')
                # convert_to_float(connection, 'dim_store_details', 'latitude')
                text_to_float(connection, 'dim_store_details', 'latitude')

                # # Cleaning then converting country_code 
                # clean_text_data(connection, 'dim_store_details', 'country_code')
                # max_length = get_max_length(connection, 'dim_store_details', 'country_code')
                # convert_to_varchar(connection, 'dim_store_details', 'country_code', max_length)
                text_to_varchar_any(connection, 'dim_store_details', 'country_code')

                # # Cleaning then converting continent 
                # clean_text_data(connection, 'dim_store_details', 'continent')
                # convert_to_varchar(connection, 'dim_store_details', 'continent', 255)
                text_to_varchar_255(connection, 'dim_store_details', 'continent')

                # DIM PRODUCTS 

                # Removing pound from price column
                remove_pound_symbol(connection, 'dim_products', 'product_price')
                
                # adding weight categories 
                add_weight_categories(connection, 'dim_products', 'weight_in_kg', 'weight_category')

                # # Cleaning product_price 
                # clean_numbers(connection, 'dim_products', 'product_price')
                # convert_to_float(connection, 'dim_products', 'product_price')
                text_to_float(connection, 'dim_products', 'product_price')

                # # Cleaning and converting weigth_in_kg
                # clean_numbers(connection, 'dim_products', 'weight_in_kg')
                # convert_to_float(connection, 'dim_products', 'weight_in_kg')
                text_to_float(connection, 'dim_products', 'weight_in_kg')

                # # Cleaning and converting EAN 
                # clean_numbers(connection, 'dim_products', 'EAN')
                
                # convert_to_varchar(connection, 'dim_products', 'EAN', max_length)
                #text_to_varchar_any(connection, 'dim_products', 'EAN')
                clean_ean(connection, 'dim_products', 'EAN')
                max_length = get_max_length(connection, 'dim_products', 'EAN')
                convert_to_varchar(connection, 'dim_products', 'EAN', max_length)

                # # Cleaning and converting product_code
                # clean_store_or_product_codes(connection, 'dim_products', 'product_code')
                # max_length = get_max_length(connection, 'dim_products', 'product_code')
                # convert_to_varchar(connection, 'dim_products', 'product_code', max_length)
                #text_to_varchar_any(connection, 'dim_products', 'product_code')
                clean_product_code(connection, 'dim_products', 'product_code')
                max_length = get_max_length(connection, 'dim_products', 'product_code')
                convert_to_varchar(connection, 'dim_products', 'product_code', max_length)

                # # Cleaning then converting date_added 
                # clean_date_data(connection, 'dim_products', 'date_added')
                # convert_to_date(connection, 'dim_products', 'date_added')
                text_date_to_date(connection, 'dim_products', 'date_added')
          
                # # Cleaning then converting the uuid
                # clean_uuid(connection, 'dim_products', 'uuid')
                # convert_to_uuid(connection, 'dim_products', 'uuid')
                text_uuid_to_uuid(connection, 'dim_products', 'uuid')

                # # Cleaning then converting the removed column
                clean_text_data(connection, 'dim_products', 'removed')
                convert_to_boolean(connection, 'dim_products', 'removed', 'is_removed', 'Still_avaliable', 'Removed')

                # DIM DATE TIMES 

                # # Cleaning then converting 'dim_date_times', 'day' 
                num_to_varchar_any(connection, 'dim_date_times', 'day')

                # # Cleaning then converting 'dim_date_times', 'year'
                num_to_varchar_any(connection, 'dim_date_times', 'year')

                # # Cleaning then converting 'dim_date_times', 'month' 
                num_to_varchar_any(connection, 'dim_date_times', 'month')

                # # Cleaning then converting 'dim_date_times', 'time_period' 
                text_to_varchar_any(connection, 'dim_date_times', 'time_period')

                # # Cleaning then converting 'dim_date_times', 'date_uuid' 
                text_uuid_to_uuid(connection, 'dim_date_times', 'date_uuid')

                # # Cleaning then converting 'dim_card_details', 'expiry_date'
                exp_to_varchar_any(connection, 'dim_card_details', 'expiry_date')

                # Cleaning then converting ''dim_card_details', 'card_number'
                #num_to_varchar_any(connection, 'dim_card_details', 'card_number')
                clean_card_number(connection,  'dim_card_details', 'card_number')
                max_length = get_max_length(connection,  'dim_card_details', 'card_number')
                convert_to_varchar(connection,  'dim_card_details', 'card_number', max_length)
                
                # Cleaning then converting 'dim_card_details', 'date_payment_confirmed'
                text_date_to_date(connection,'dim_card_details', 'date_payment_confirmed')

                # adding primary keys dim tables
                add_primary_key(connection, 'dim_card_details', 'card_number')
                add_primary_key(connection, 'dim_date_times', 'date_uuid')
                add_primary_key(connection, 'dim_products', 'product_code')
                add_primary_key(connection, 'dim_store_details', 'store_code')
                add_primary_key(connection, 'dim_users', 'user_uuid')

                #adding foreign keys 
                add_foreign_key(connection, 'orders_table', 'card_number', 'dim_card_details', 'card_number')
                add_foreign_key(connection, 'orders_table', 'date_uuid', 'dim_date_times', 'date_uuid')
                add_foreign_key(connection, 'orders_table', 'product_code', 'dim_products', 'product_code')
                add_foreign_key(connection, 'orders_table', 'store_code', 'dim_store_details', 'store_code')
                add_foreign_key(connection, 'orders_table', 'user_uuid', 'dim_users', 'user_uuid')

                primary_keys = get_primary_keys(connection, 'orders_table')
                print(f"Primary keys for table 'orders_table': {primary_keys}")

                foreign_keys = get_foreign_keys(connection, 'orders_table')
                print(f"Foreign keys for table 'orders_table': {foreign_keys}")


            except SQLAlchemyError as e:
                print(f"An error occurred: {e}")

            # Check and print column type after conversion
            #column_type_after = check_column_type(connection, 'dim_store_details', 'store_type')
            #print(f"Column type after conversion: {column_type_after}")

    print('End of call')

run_all_operations()


init_my_db_engine is working
read_my_db_creds is working
Connection to the PostgreSQL database was successful!
init_my_db_engine is working
read_my_db_creds is working
Connection to the PostgreSQL database was successful!
Sample EAN data after cleaning: [(0, "FurReal Dazzlin' Dimples My Playful Dolphin", '£39.99', '1.6kg', 'toys-and-games', '7425710935115', '2005-12-02', '83dc0a69-f96f-4c34-bcb7-928acae19a94', 'Still_avaliable', 'R7-3126933h', 1.6), (1, "Tiffany's World Day Out At The Park", '£12.99', '0.48kg', 'toys-and-games', '487128731892', '2006-01-09', '712254d7-aea7-4310-aff8-8bcdd0aec7ff', 'Still_avaliable', 'C2-7287916l', 0.48), (2, "Tiffany's World Pups Picnic Playset", '£7.00', '590g', 'toys-and-games', '1945816904649', '1997-03-29', 'b089ef6f-b628-4e37-811d-fffe0102ba64', 'Still_avaliable', 'S7-1175877v', 0.59), (3, "Tiffany's World Wildlife Park Adventures", '£12.99', '540g', 'toys-and-games', '1569790890899', '2013-03-20', 'd55de422-8b98-47d6-9991-e4bc4c5c0cb0', 'Removed'

In [6]:
#THIS IS CODE SO THAT I CAN SEE WHAT THE TABLE LOOKS LIKE 
def view_data(): 

    instance = DatabaseConnector() 

    engine = instance.init_my_db_engine()


    # Define the query to display column headers and first few rows :: dim_store_details
    query = "SELECT * FROM dim_store_details LIMIT 10;" 

    # Use a context manager to handle the connection
    with engine.connect() as connection:  
        result = connection.execute(text(query))
        # Print the column headers
        display(result.keys())
        # Print each row
        for row in result:
            display(row)

view_data()

init_my_db_engine is working
read_my_db_creds is working
Connection to the PostgreSQL database was successful!


RMKeyView(['index', 'address', 'longitude', 'locality', 'store_code', 'staff_numbers', 'opening_date', 'store_type', 'latitude', 'country_code', 'continent'])

(2, 'Heckerstraße 4/5\n50491 Säckingen, Landshut', 48.52961, 'Landshut', 'LA-0772C7B9', 92, datetime.date(2013, 4, 12), 'Super Store', 12.16179, 'DE', 'Europe')

(3, '5 Harrison tunnel\nSouth Lydia\nWC9 2BE, Westbury', 51.26, 'Westbury', 'WE-1DE82CEE', 69, datetime.date(2014, 1, 2), 'Super Store', -2.1875, 'GB', 'Europe')

(4, 'Studio 6\nStephen landing\nSouth Simon\nB77 2WA, Belper', 53.0233, 'Belper', 'BE-18074576', 35, datetime.date(2019, 9, 9), 'Local', -1.48119, 'GB', 'Europe')

(5, 'Flat 92u\nChristian harbors\nPort Charlotte\nN57 8FJ, Gainsborough', 53.38333, 'Gainsborough', 'GA-CAD01AC2', 36, datetime.date(1995, 5, 15), 'Local', -0.76667, 'GB', 'Europe')

(6, '7 Gillian rue\nWest Robertside\nPH4 8NY, Rutherglen', 55.82885, 'Rutherglen', 'RU-C603E990', 92, datetime.date(2001, 1, 4), 'Super Store', -4.21376, 'GB', 'Europe')

(7, 'Lilija-Heß-Allee 660\n34566 Regensburg, Stuttgart', 48.78232, 'Stuttgart', 'ST-229D997E', 34, datetime.date(2000, 6, 1), 'Local', 9.17702, 'DE', 'Europe')

(8, '510 Jill Mill\nSouth Laura, FL 38723, Kaukauna', 44.27804, 'Kaukauna', 'KA-FA7ED3B8', 31, datetime.date(2022, 9, 5), 'Local', -88.27205, 'US', 'America')

(9, '3 Lee valleys\nWest Janetview\nDY4M 2RL, Hartley', 51.38673, 'Hartley', 'HA-974352FE', 20, datetime.date(2004, 9, 11), 'Local', 0.30367, 'GB', 'Europe')

(12, 'Flat 37\nBennett expressway\nNew Charlotte\nSY8R 5WE, Devizes', 51.35084, 'Devizes', 'DE-585399CF', 36, datetime.date(2014, 10, 11), 'Local', -1.99421, 'GB', 'Europe')

(14, 'Herma-Rädel-Gasse 29\n74557 Fulda, Halstenbek', 53.63333, 'Halstenbek', 'HA-39A446E2', 38, datetime.date(1994, 3, 8), 'Local', 9.85, 'DE', 'Europe')

In [66]:
#CHECK THAT ALL NAMES IN FIRST NAME COLUMN ARE ALPHABETIC 

def view_data(): 
    # Create engine 
    instance = DatabaseConnector() 

    engine = instance.init_my_db_engine()


    # Define the query to display column headers and first few rows
    query = "SELECT first_name FROM dim_users LIMIT 5;"

    # Use a context manager to handle the connection
    with engine.connect() as connection:  
        result = connection.execute(text(query))
        # Print the column headers
        display(result.keys())
        # Initialise the list to store the odd names 
        odd_names = []
        # create the regex pattern
        pattern = re.compile('^[A-Za-z]+$')
        # iterate through each row in the result 
        for row in result:
            # Access the first element of the tuple to get the name
            name = row[0]
            if not pattern.match(name):
                odd_names.append(name)
                 
        #display(row)
        display(odd_names)
view_data()



init_my_db_engine is working
read_my_db_creds is working
Connection to the PostgreSQL database was successful!


RMKeyView(['first_name'])

[]

In [39]:
# Step 1: Initial import
from data_cleaning import DataCleaning
from data_extraction import DataExtractor

# creating instances 
extractor_instance = DataExtractor()
cleaning_instance = DataCleaning()

# get original dataframe 
raw_df = extractor_instance.retrieve_stores_data() 

# cleaning data
#clean_store_df = cleaning_instance.cleaning_store_details()


retrieve_stores_data is working
read_api_key is working


In [40]:
display(raw_df)

Unnamed: 0,index,address,longitude,lat,locality,store_code,staff_numbers,opening_date,store_type,latitude,country_code,continent
0,0,,,,,WEB-1388012W,325,2010-06-12,Web Portal,,GB,Europe
1,1,"Flat 72W\nSally isle\nEast Deantown\nE7B 8EB, ...",51.62907,,High Wycombe,HI-9B97EE4E,34,1996-10-25,Local,-0.74934,GB,Europe
2,2,"Heckerstraße 4/5\n50491 Säckingen, Landshut",48.52961,,Landshut,LA-0772C7B9,92,2013-04-12,Super Store,12.16179,DE,Europe
3,3,"5 Harrison tunnel\nSouth Lydia\nWC9 2BE, Westbury",51.26,,Westbury,WE-1DE82CEE,69,2014-01-02,Super Store,-2.1875,GB,Europe
4,4,Studio 6\nStephen landing\nSouth Simon\nB77 2W...,53.0233,,Belper,BE-18074576,35,2019-09-09,Local,-1.48119,GB,Europe
...,...,...,...,...,...,...,...,...,...,...,...,...
446,446,"Täschestraße 25\n39039 Nördlingen, Kirchlengern",52.2,,Kirchlengern,KI-78096E8C,61,2005-05-12,Super Store,8.63333,DE,Europe
447,447,K0ODETRLS3,K8CXLZDP07,UXMWDMX1LC,3VHFDNP8ET,9D4LK7X4LZ,D23PCWSM6S,36IIMAQD58,NN04B3F6UQ,JZP8MIJTPZ,B3EH2ZGQAV,1WZB1TE1HL
448,448,"Studio 8\nMoss mall\nWest Linda\nM0E 6XR, High...",51.62907,,High Wycombe,HI-EEA7AE62,33,1998-05-14,Local,-0.74934,GB,Europe
449,449,"Baumplatz 6\n80114 Kötzting, Bretten",49.03685,,Bretten,BR-662EC74C,35,2020-10-17,Local,8.70745,DE,Europe


In [59]:
import numpy as np
from database_utils import DatabaseConnector

# LOGGING 
print('Started cleaning_store_details')

#retrieving the data from the stores API
df = raw_df

display(df.head(4)) 

# Clean the latitude column
df['latitude'] = pd.to_numeric(df['latitude'], errors='coerce')
df = df.dropna(subset=['latitude'])

# Dropping the 'latitude' column as it's empty 
df = df.drop(columns=['lat'])

# filtering out items in locality that aren't real place names or NULL 
pattern = r'^[a-zA-Z\s-]+$'
df = df[df['locality'].str.match(pattern)]
df['locality'] = df['locality'].replace('NULL', np.nan)
df = df.dropna(subset=['locality'])

display(df.head(4))  

# replacing incorrect spellings of continents 
continent_replacements = {
    'eeEurope': 'Europe',
    'eeAmerica': 'America'
}

df['continent'] = df['continent'].replace(continent_replacements)

# LOGGING: checking everything has worked 
#print(df['continent'].unique())
#print(df['store_type'].unique()) 
#print(df['country_code'].unique()) 
#print(df['continent'].unique())

# LOGGING: checking of datetime before is datetime64 datetype  
#is_datetime_before = pd.api.types.is_datetime64_any_dtype(df['opening_date'])

# converting opening date to datetime object 
df['opening_date'] = pd.to_datetime(df['opening_date'], format='%Y-%m-%d', errors='coerce')

# dropping any rows which contain missing values  
df = df.dropna(axis=0)

# Filter the DataFrame to keep only rows where 'latitude' is not NaN
non_null_rows = df[df['opening_date'].isna()]  

# Convert the filtered DataFrame to a list of rows
non_null_list = non_null_rows.values.tolist()

display(len(non_null_list))
display(non_null_list)
# LOGGING: checking the conversion worked 
#is_datetime_after = pd.api.types.is_datetime64_any_dtype(df['opening_date'])
#print(f"Is 'dates' column datetime64 dtype? {is_datetime_before}")
#print(f"Is 'dates' column datetime64 dtype? {is_datetime_after}")
display('last', df.head(4)) 
#returning the dataframe 
df.info()  

databaseconnector_instance = DatabaseConnector() 

databaseconnector_instance.upload_to_db(df, 'dim_store_details')

Started cleaning_store_details


Unnamed: 0,index,address,longitude,lat,locality,store_code,staff_numbers,opening_date,store_type,latitude,country_code,continent
0,0,,,,,WEB-1388012W,325,2010-06-12,Web Portal,,GB,Europe
1,1,"Flat 72W\nSally isle\nEast Deantown\nE7B 8EB, ...",51.62907,,High Wycombe,HI-9B97EE4E,34,1996-10-25,Local,-0.74934,GB,Europe
2,2,"Heckerstraße 4/5\n50491 Säckingen, Landshut",48.52961,,Landshut,LA-0772C7B9,92,2013-04-12,Super Store,12.16179,DE,Europe
3,3,"5 Harrison tunnel\nSouth Lydia\nWC9 2BE, Westbury",51.26,,Westbury,WE-1DE82CEE,69,2014-01-02,Super Store,-2.1875,GB,Europe


Unnamed: 0,index,address,longitude,locality,store_code,staff_numbers,opening_date,store_type,latitude,country_code,continent
1,1,"Flat 72W\nSally isle\nEast Deantown\nE7B 8EB, ...",51.62907,High Wycombe,HI-9B97EE4E,34,1996-10-25,Local,-0.74934,GB,Europe
2,2,"Heckerstraße 4/5\n50491 Säckingen, Landshut",48.52961,Landshut,LA-0772C7B9,92,2013-04-12,Super Store,12.16179,DE,Europe
3,3,"5 Harrison tunnel\nSouth Lydia\nWC9 2BE, Westbury",51.26,Westbury,WE-1DE82CEE,69,2014-01-02,Super Store,-2.1875,GB,Europe
4,4,Studio 6\nStephen landing\nSouth Simon\nB77 2W...,53.0233,Belper,BE-18074576,35,2019-09-09,Local,-1.48119,GB,Europe


0

[]

'last'

Unnamed: 0,index,address,longitude,locality,store_code,staff_numbers,opening_date,store_type,latitude,country_code,continent
1,1,"Flat 72W\nSally isle\nEast Deantown\nE7B 8EB, ...",51.62907,High Wycombe,HI-9B97EE4E,34,1996-10-25,Local,-0.74934,GB,Europe
2,2,"Heckerstraße 4/5\n50491 Säckingen, Landshut",48.52961,Landshut,LA-0772C7B9,92,2013-04-12,Super Store,12.16179,DE,Europe
3,3,"5 Harrison tunnel\nSouth Lydia\nWC9 2BE, Westbury",51.26,Westbury,WE-1DE82CEE,69,2014-01-02,Super Store,-2.1875,GB,Europe
4,4,Studio 6\nStephen landing\nSouth Simon\nB77 2W...,53.0233,Belper,BE-18074576,35,2019-09-09,Local,-1.48119,GB,Europe


<class 'pandas.core.frame.DataFrame'>
Index: 428 entries, 1 to 450
Data columns (total 11 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   index          428 non-null    int64         
 1   address        428 non-null    object        
 2   longitude      428 non-null    object        
 3   locality       428 non-null    object        
 4   store_code     428 non-null    object        
 5   staff_numbers  428 non-null    object        
 6   opening_date   428 non-null    datetime64[ns]
 7   store_type     428 non-null    object        
 8   latitude       428 non-null    float64       
 9   country_code   428 non-null    object        
 10  continent      428 non-null    object        
dtypes: datetime64[ns](1), float64(1), int64(1), object(8)
memory usage: 40.1+ KB
upload_to_db is working
init_my_db_engine is working
read_my_db_creds is working
Connection to the PostgreSQL database was successful!
Table 'dim_store_det

#Ignore below 


In [None]:
#BACKUP OF CHATGPT UUID CODE WITH CHECKS 

# Function to run all operations
def run_all_operations():
    
    # Create instance of a DatabaseConnector  
    instance = DatabaseConnector() 
    # Create an engine by using the init_my_db_engine() method of DatabaseConnector 
    engine = instance.init_my_db_engine()
    
    #try to do engine.connect() 
    with engine.connect() as connection:
        
        # Fetch and print data before conversion
        print("Data before conversion:")
        data_before = fetch_data(connection, 'orders_table')
        for row in data_before:
            print(row)
        
        # Check and print column type before conversion
        column_type_before = check_column_type(connection, 'orders_table', 'date_uuid')
        print(f"Column type before conversion: {column_type_before}")
        
        #put the attempt to run the functions in a try block 
        try:
            clean_uuid(connection, 'orders_table', 'date_uuid')
            print('Clean UUID worked')
            convert_data_uuid(connection, 'orders_table', 'date_uuid')
            print('Convert to UUID worked')
        except SQLAlchemyError as e:
            print(f"An error occurred: {e}")

        # Fetch and print data after conversion
        print("Data after conversion:")
        data_after = fetch_data(connection, 'orders_table')
        for row in data_after:
            print(row)
        
        # Check and print column type after conversion
        column_type_after = check_column_type(connection, 'orders_table', 'date_uuid')
        print(f"Column type after conversion: {column_type_after}")

    print('End of call')


In [None]:
#THINK THIS CAN BE GOTTEN RID OF 
#def cast_data(): 
  
#UUID 
# Create function to clean uuid with regex 
def clean_uuid(connection, table_name, column_name):
    clean_uuid = f"""
        UPDATE {table_name}
        SET {column_name} = NULL
        WHERE {column_name} !~ '^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$';
        """  
    # Convert the SQL string to a TextClause object and execute the query
    connection.execute(text(clean_uuid))

# Create function to update specified column to UUID 
def convert_data_uuid(connection, table_name, column_name): 
    convert_date_uuid = f"""
        ALTER TABLE {table_name}
        ALTER COLUMN {column_name} TYPE UUID
        USING {column_name}::UUID;
        """
    # Convert the SQL string to a TextClause object and execute the query
    connection.execute(text(convert_date_uuid))

# Function to run all operations
def run_all_operations():
    
    # Create instance of a DatabaseConnector  
    instance = DatabaseConnector() 
    # Create an engine by using the init_my_db_engine() method of DatabaseConnector 
    engine = instance.init_my_db_engine()
    
    #try to do engine.connect() 
    with engine.connect() as connection:
        
        #put the attempt to run the functions in a try block 
        try:
            clean_uuid(connection, 'orders_table', 'date_uuid')
            print('clean uuid worked')
            #max_length = get_max_length(connection, table_name, column_name)
            #convert_numbers(connection, table_name, column_name, max_length)
            #print("Data cleaning and conversion completed successfully.")
        except SQLAlchemyError as e:
            print(f"An error occurred: {e}")

    print('end of call')
run_all_operations() 



In [None]:
# OLD CODE GET RID OF 
# Function to run all operations
def run_all_operations():
    
    # Create instance of a DatabaseConnector  
    instance = DatabaseConnector() 
    # Create an engine by using the init_my_db_engine() method of DatabaseConnector 
    engine = instance.init_my_db_engine()
    
    #try to do engine.connect() 
    with engine.connect() as connection:
        
        # Fetch and print data before conversion
        print("Data before conversion:")
        data_before = fetch_data(connection, 'orders_table')
        #data_before returns a list of tuples, each tuple is a row in the database, the list is all the rows 
        for row in data_before:
            print(row)
        
        # Check and print column type before conversion
        column_type_before = check_column_type(connection, 'orders_table', 'date_uuid')
        print(f"Column type before conversion: {column_type_before}")
        
        #put the attempt to run the functions in a try block 
        try:
            clean_uuid(connection, 'orders_table', 'date_uuid')
            print('Clean UUID worked')
            convert_data_uuid(connection, 'orders_table', 'date_uuid')
            print('Convert to UUID worked')
        except SQLAlchemyError as e:
            print(f"An error occurred: {e}")

        # Fetch and print data after conversion
        print("Data after conversion:")
        data_after = fetch_data(connection, 'orders_table')
        for row in data_after:
            print(row)
        
        # Check and print column type after conversion
        column_type_after = check_column_type(connection, 'orders_table', 'date_uuid')
        print(f"Column type after conversion: {column_type_after}")

    print('End of call')

run_all_operations()

In [24]:
# OLD CODE

#THIS WILL BE THE FUNCTION THAT RUNS ALL THE OTHER SUBFUNCTIONS. BUT NEED TO DECIDE IF IT"S EASIER TO PUT IT IN CLASS ('CASTING') WITH EACH SUB FUNCTION AS A METHOD

class DataCasting:

    def __init__(self):
        #connection = XXXX 
    
def cast_data(): 
    
    # Create instance of a DatabaseConnector  
    instance = DatabaseConnector() 

    # Create an engine by using the init_my_db_engine() method of DatabaseConnector 
    engine = instance.init_my_db_engine()

    with engine.connect() as connection:    
#THIS IS WHERE I'M TRYING TO BUILD REUSABLE FUNCTIONS FOR EACH OF THE CAST TYPES AND APPLYING THEM 


#UUID 
    # Create function to clean uuid with regex 
    def clean_uuid(connection, table_name, column_name):
        clean_uuid = f"""
            UPDATE {table_name}
            SET {column_name} = NULL
            WHERE {column_name} !~ '^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$';
            """  
        connection.execute(text(clean_uuid))

    def convert_data_uuid(connection, table_name, column_name): 
        convert_date_uuid = f"""
            ALTER TABLE {table_name}
            ALTER COLUMN {column_name} TYPE UUID
            USING {column_name}::UUID;
            """
        # Convert the SQL string to a TextClause object and execute the query
        connection.execute(text(convert_date_uuid))

#VARCHAR 

    # Create function to clean card number data with regex
    def clean_numbers(connection, table_name, column_name):
        clean_numbers_sql = f"""
        UPDATE {table_name}
        SET {column_name} = NULL
        WHERE {column_name} !~ '^[0-9]+$';
        """  
        connection.execute(text(clean_numbers_sql))

    # Function to determine the maximum length of values in the column
    def get_max_length(connection, table_name, column_name):
        max_length_sql = f"""
        SELECT MAX(LENGTH({column_name})) 
        FROM {table_name};
        """
        result = connection.execute(text(max_length_sql)).fetchone()
        return result[0]

    # Function to convert the column to VARCHAR with the determined maximum length
    def convert_numbers(connection, table_name, column_name, max_length): 
        convert_numbers_sql = f"""
        ALTER TABLE {table_name}
        ALTER COLUMN {column_name} TYPE VARCHAR({max_length})
        USING {column_name}::VARCHAR({max_length});
        """
        connection.execute(text(convert_numbers_sql))

    # Function to run all operations
    def run_all_operations(conn_string, table_name, column_name):
        engine = create_engine(conn_string)
        with engine.connect() as connection:
            try:
                clean_numbers(connection, table_name, column_name)
                max_length = get_max_length(connection, table_name, column_name)
                convert_numbers(connection, table_name, column_name, max_length)
                print("Data cleaning and conversion completed successfully.")
            except SQLAlchemyError as e:
                print(f"An error occurred: {e}")


#SMALL INT 



#THESE ARE SAMPLES FROM THE CHAT GPT CODE 
    def cast_column_to_integer(connection, table_name, column_name):
        sql = f"""
        ALTER TABLE {table_name}
        ALTER COLUMN {column_name} TYPE INTEGER
        USING {column_name}::INTEGER;
        """
        connection.execute(text(sql))

    def cast_column_to_date(connection, table_name, column_name):
        sql = f"""
        ALTER TABLE {table_name}
        ALTER COLUMN {column_name} TYPE DATE
        USING {column_name}::DATE;
        """
        connection.execute(text(sql))

#THIS IS MY ORIGINAL CODE THAT WORKED, BUT I REALISED IT WILL GET VERY LONG IF I DO ALL THE CASTING AND APPLYING FOR EVERY COLUMN 

    # Use a context manager to handle the connection
    with engine.connect() as connection:  
        try:
            # Define the SQL query to clean data
            clean_date_uuid = """
            UPDATE orders_table
            SET date_uuid = NULL
            WHERE date_uuid !~ '^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$';
            """  
            
            # Convert the SQL string to a TextClause object
            clean_date_query = text(clean_date_uuid)
            
            # Execute the query
            connection.execute(clean_date_query)

            # Define the SQL query to alter the column data type
            convert_date_uuid = """
            ALTER TABLE orders_table
            ALTER COLUMN date_uuid TYPE UUID
            USING date_uuid::UUID;
            """
            # Convert the SQL string to a TextClause object
            convert_date_query = text(convert_date_uuid)
            
            # Execute the query
            connection.execute(convert_date_query)

            print("Data type casting completed successfully.")
       
        except SQLAlchemyError as e:
            print(f"An error occurred: {e}")

In [None]:
# OLD CODE 
# THIS IS THE CHATGPT CODE THAT I AM TAKING BITS FROM - IT'S GOT IDEAS FOR CREATING REUSABLE FUNCTIONS FOR EACH OF THE TYPES OF CASTING I NEED TO DO 

from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

def cast_column_to_integer(connection, table_name, column_name):
    sql = f"""
    ALTER TABLE {table_name}
    ALTER COLUMN {column_name} TYPE INTEGER
    USING {column_name}::INTEGER;
    """
    connection.execute(text(sql))

def cast_column_to_date(connection, table_name, column_name):
    sql = f"""
    ALTER TABLE {table_name}
    ALTER COLUMN {column_name} TYPE DATE
    USING {column_name}::DATE;
    """
    connection.execute(text(sql))

def cast_column_to_boolean(connection, table_name, column_name):
    sql = f"""
    ALTER TABLE {table_name}
    ALTER COLUMN {column_name} TYPE BOOLEAN
    USING {column_name}::BOOLEAN;
    """
    connection.execute(text(sql))

# Add more functions as needed for different types

def run_all_casting_operations(conn_string):
    engine = create_engine(conn_string)
    with engine.connect() as connection:
        try:
            cast_column_to_integer(connection, 'users', 'age')
            cast_column_to_date(connection, 'users', 'birthdate')
            cast_column_to_boolean(connection, 'users', 'is_active')
            # Add more casting operations as needed
            print("All casting operations completed successfully.")
        except SQLAlchemyError as e:
            print(f"An error occurred: {e}")

if __name__ == '__main__':
    conn_string = 'postgresql://username:password@localhost:5432/mydatabase'
    run_all_casting_operations(conn_string)


In [None]:
cast_data()

In [None]:
#THIS IS OLD CODE THAT I PROBABLY WON'T USE 
def run_data_casting(conn_string):
    """
    Connects to the PostgreSQL database and performs data type casting
    on the 'age' column in the 'users' table.
    
    Args:
    conn_string (str): The connection string for the PostgreSQL database.
    """
    # Create an engine to connect to the database
    engine = create_engine(conn_string)
    
    # Use a context manager to handle the connection
    with engine.connect() as connection:
        try:
            # Define the SQL query to clean data
            clean_data_sql = """
            UPDATE users
            SET age = NULL
            WHERE age !~ '^[0-9]+$';
            """
            # Convert the SQL string to a TextClause object
            clean_data_query = text(clean_data_sql)
            
            # Execute the query
            connection.execute(clean_data_query)

            # Define the SQL query to alter the column data type
            alter_column_sql = """
            ALTER TABLE users
            ALTER COLUMN age TYPE INTEGER
            USING age::INTEGER;
            """
            # Convert the SQL string to a TextClause object
            alter_column_query = text(alter_column_sql)
            
            # Execute the query
            connection.execute(alter_column_query)

            print("Data type casting completed successfully.")
        except SQLAlchemyError as e:
            print(f"An error occurred: {e}")

In [None]:

# #VARCHAR 

#     # Create function to clean card number data with regex
#     def clean_numbers(connection, table_name, column_name):
#         clean_numbers_sql = f"""
#         UPDATE {table_name}
#         SET {column_name} = NULL
#         WHERE {column_name} !~ '^[0-9]+$';
#         """  
#         connection.execute(text(clean_numbers_sql))

#     # Function to determine the maximum length of values in the column
#     def get_max_length(connection, table_name, column_name):
#         max_length_sql = f"""
#         SELECT MAX(LENGTH({column_name})) 
#         FROM {table_name};
#         """
#         result = connection.execute(text(max_length_sql)).fetchone()
#         return result[0]

#     # Function to convert the column to VARCHAR with the determined maximum length
#     def convert_numbers(connection, table_name, column_name, max_length): 
#         convert_numbers_sql = f"""
#         ALTER TABLE {table_name}
#         ALTER COLUMN {column_name} TYPE VARCHAR({max_length})
#         USING {column_name}::VARCHAR({max_length});
#         """
#         connection.execute(text(convert_numbers_sql))

  
