In [10]:
import gc
import os

# from cryptography.fernet import Fernet
# import hashlib
import pandas as pd
import pyarrow.parquet as pq
import sqlite3
from sqlite3 import Error

from CFG import Config
from security_config import SecurityConfig

In [11]:
config_ref = Config()
config = config_ref.get_config()


In [16]:

table_name = 'business'
data_file_path = os.path.join(config['data_dir'], f'{table_name}.parquet')
db_path = os.path.join(config['data_db_storage_dir'], config['data_db_name'])


In [65]:

def get_db_connection(db_path):
    try:
        conn = sqlite3.connect(db_path)
        return conn
    except Error as e:
        print(e)
    return None


def create_db_table(conn, table_name, recreate=False):
    cursor = conn.cursor()

    # sqlite_master is a system table that stores metadata about the database schema.
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table' \
                    AND name=?", (table_name,))
    
    result = cursor.fetchone()
    if result and recreate:
        print("Table already exists. Dropping...")
        cursor.execute("DROP TABLE IF EXISTS " + table_name)
        print("Table dropped successfully.")
        
    print("Reading data from parquet file...")
    df = pq.read_table(data_file_path).to_pandas()

    # Temporary fix for the issue with the data
    # 'attributes', 'hours' columns contain dictionary data type values.
    # sqlite3 does not support dictionary data type. 
    df.drop(['attributes', 'hours'], axis=1, inplace=True) 
    
    print(f"Creating table {table_name}...")
    df.to_sql(table_name, conn)
    print("Table created successfully.")
    
    del df
    gc.collect()


def read_db_table(conn, table_name, sample_size=None, verbose=False):
    assert sample_size is None or (isinstance(sample_size, int) and sample_size > 0), \
                "Sample size must be a positive integer."   
    
    assert table_name in ['business', 'checkin', 'review', 'tip', 'user'], \
                "Invalid table name."
    
    cursor = conn.cursor()
    if sample_size:
        cursor.execute(f"SELECT * FROM {table_name} LIMIT {sample_size}".format(table_name))
        result = cursor.fetchmany(sample_size)
    else:
        cursor.execute(f"SELECT * FROM {table_name}".format(table_name))
        result = cursor.fetchall()
    
    if sample_size and verbose:
        for row in result:
            print(row)

    for row in result:
        print(row)

    return result


def get_table_columns(conn, table_name, verbose=False):
    cursor = conn.cursor()
    # PRAGMA: tunes and configures SQLite's internal components
    # The PRAGMA keyword is used to signify that the remainder of the PL/SQL 
    # statement is a pragma, or directive, to the compiler
    cursor.execute("PRAGMA table_info({})".format(table_name))
    columns = [column[1] for column in cursor.fetchall()]
    
    if verbose: print("columns: ", columns)

    return columns


def fetch_column_from_table(conn, table_name, column_name, sample_size=None):
    assert sample_size is None or (isinstance(sample_size, int) and sample_size > 0), \
                "Sample size must be a positive integer."
    
    assert table_name in ['business', 'checkin', 'review', 'tip', 'user'], \
                "Invalid table name."
    
    assert column_name in get_table_columns(conn, table_name), \
            "Invalid column name. {} does not exist in the table {}.".\
            format(column_name, table_name)

    cursor = conn.cursor()
    if sample_size:
        cursor.execute(f"SELECT {column_name} FROM {table_name} LIMIT {sample_size}")
        result = cursor.fetchmany(sample_size)
    else:
        cursor.execute(f"SELECT {column_name} FROM {table_name}")
        result = cursor.fetchall()
    
    return result

Table already exists. Dropping...
Table dropped successfully.
Reading data from parquet file...


Creating table business...
Table created successfully.


In [66]:

table_name = 'business' # 'business', 'checkin', 'review', 'tip', 'user'
sample_size = 5
verbose = True
verbose = verbose and sample_size is not None and sample_size < 100


(0, 'Pns2l4eNsfO8kk83dixA6A', 'Abby Rappoport, LAC, CMQ', '1616 Chapala St, Ste 2', 'Santa Barbara', 'CA', '93101', 34.42667770385742, -119.71119689941406, 5.0, 7, 0, 'Doctors, Traditional Chinese Medicine, Naturopathic/Holistic, Acupuncture, Health & Medical, Nutritionists')
(1, 'mpf3x-BjTdTEA3yCZrAYPw', 'The UPS Store', '87 Grasso Plaza Shopping Center', 'Affton', 'MO', '63123', 38.551124572753906, -90.335693359375, 3.0, 15, 1, 'Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services')
(2, 'tUFrWirKiKi_TAnsVWINQQ', 'Target', '5255 E Broadway Blvd', 'Tucson', 'AZ', '85711', 32.223236083984375, -110.88045501708984, 3.5, 22, 0, 'Department Stores, Shopping, Fashion, Home & Garden, Electronics, Furniture Stores')
(3, 'MTSW4McQd7CbVtyjqoe9mw', 'St Honore Pastries', '935 Race St', 'Philadelphia', 'PA', '19107', 39.95550537109375, -75.15556335449219, 4.0, 80, 1, 'Restaurants, Food, Bubble Tea, Coffee & Tea, Bakeries')
(4, 'mWMc6_wTdE0EUBKIGXDVfA', 'Perkiomen Valley Br

In [None]:

conn = get_db_connection(db_path)
create_db_table(conn, table_name, recreate=False)
table_columns = get_table_columns(conn, table_name)
table = read_db_table(conn, table_name, sample_size=sample_size, verbose=verbose)


['index', 'business_id', 'name', 'address', 'city', 'state', 'postal_code', 'latitude', 'longitude', 'stars', 'review_count', 'is_open', 'categories']


In [68]:


column_data = fetch_column_from_table(conn, table_name=table_name, 
                        column_name='categories', sample_size=None)


Doctors, Traditional Chinese Medicine, Naturopathic/Holistic, Acupuncture, Health & Medical, Nutritionists
Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services
Department Stores, Shopping, Fashion, Home & Garden, Electronics, Furniture Stores
Restaurants, Food, Bubble Tea, Coffee & Tea, Bakeries
Brewpubs, Breweries, Food


In [74]:
# query = f"SELECT * FROM {table_name} WHERE 'Restaurants' IN (categories) LIMIT 5"
# cursor.execute(query)
# result = cursor.fetchmany(5)
# for row in result:
#     print(row)



(3655, 'x2J-YIFeGZ-nsezzooVA9g', 'Twenty 21', '2005 Market St', 'Philadelphia', 'PA', '19103', 39.95410919189453, -75.17466735839844, 3.0, 8, 0, 'Restaurants')
(5873, 'VlrSuulqTFeQfV2PToJGvg', 'Fung Garden', '5118 Gall Blvd', 'Zephyrhills', 'FL', '33542', 28.232059478759766, -82.1803970336914, 3.5, 8, 0, 'Restaurants')
(6856, 'mFE9V6LPpsDRUQLEBsBRRA', 'Pearl of East', '2049 W Oregon Ave', 'Philadelphia', 'PA', '19145', 39.918636322021484, -75.18241882324219, 3.0, 17, 1, 'Restaurants')
(9801, 'BXUqeFDqvTK2uL6sQd5YnQ', 'Crazy D’s Hot Chicken', '101 University Ter', 'Reno', 'NV', '89503', 39.53628921508789, -119.81822967529297, 3.5, 37, 1, 'Restaurants')
(11413, 'Bf1cdbdHXi8Omlkc7KShkg', 'Siam Cafe', '435 Esplanade Ave', 'New Orleans', 'LA', '70116', 29.96196746826172, -90.05735778808594, 4.0, 9, 0, 'Restaurants')


In [75]:
# query = f"SELECT * FROM {table_name} WHERE 'Restaurants' IN (categories) AND city = 'Philadelphia' LIMIT 5"
# cursor.execute(query)
# result = cursor.fetchall()
# for row in result:
#     print(row)


(3655, 'x2J-YIFeGZ-nsezzooVA9g', 'Twenty 21', '2005 Market St', 'Philadelphia', 'PA', '19103', 39.95410919189453, -75.17466735839844, 3.0, 8, 0, 'Restaurants')
(6856, 'mFE9V6LPpsDRUQLEBsBRRA', 'Pearl of East', '2049 W Oregon Ave', 'Philadelphia', 'PA', '19145', 39.918636322021484, -75.18241882324219, 3.0, 17, 1, 'Restaurants')
(14327, 'TCdNYh5tdjoNmA1WMx3ZPg', "Roselena's", '1623 E Passyunk Ave', 'Philadelphia', 'PA', '19148', 39.92937469482422, -75.1638412475586, 4.0, 10, 0, 'Restaurants')
(25351, 'x2xQUz1YfOzy5A9FRuCpMw', 'Passage To India', '1320 Walnut St', 'Philadelphia', 'PA', '19107', 39.948951721191406, -75.16304016113281, 3.0, 23, 0, 'Restaurants')
(40722, 'qaPADjsU6c0Ufho0zdL6Yw', 'Azure', '931 N 2nd St', 'Philadelphia', 'PA', '19123', 39.964820861816406, -75.1402359008789, 3.5, 22, 0, 'Restaurants')


In [69]:
# conn.close()

In [None]:
class DatabaseDDLManager:
    '''
        Database Data Definition Language (DDL) Manager
            Create, Drop, Alter, Truncate, Rename, Comment, etc.
    '''
    def __init__(self, db_name):
        self.db_name = db_name
        self.conn = self.open_connection()

    def open_connection(self):
        """
        Opens a connection to the database.

        Args:
            db_name (str): The name of the database to connect to.
                Or None if failed.

        Returns:
            None
        """
        try:
            self.conn = sqlite3.connect(self.db_name)
            return self.conn
        except Error as e:
            print(e)
        
        return None

    def close_connection(self):
        """
        Closes the connection to the database.
        """
        self.conn.close()
    
    def create_table(self, create_table_sql):
        """
        Creates a table in the database using the provided SQL statement.

        Args:
            create_table_sql (str): The SQL statement to create the table.
            Example:
                create_table_sql = 
                    CREATE TABLE IF NOT EXISTS employees (
                        id integer PRIMARY KEY,
                        name text NOT NULL,
                        salary real,
                        department_id integer,
                        FOREIGN KEY (department_id) REFERENCES departments (id)
                    );
                    
        Raises:
            Error: If there is an error executing the SQL statement.

        Returns:
            Boolean: True if the table was created successfully, False otherwise.
        """
        try:
            c = self.conn.cursor()
            c.execute(create_table_sql)
            return True
        except Error as e:
            print(e)
        
        return False
    
    def drop_table(self, table_name):
        """
        Drops a table from the database.

        Args:
            table_name (str): The name of the table to drop.

        Raises:
            Error: If there is an error executing the SQL statement.

        Returns:
            Boolean: True if the table was dropped successfully, False otherwise.
        """
        try:
            c = self.conn.cursor()
            c.execute(f"DROP TABLE IF EXISTS {table_name}")
            return True
        except Error as e:
            print(e)
        
        return False

    def alter_table(self, table_name, alter_table_sql):
        """
        Alters a table in the database using the provided SQL statement.

        Args:
            table_name (str): The name of the table to alter.
            alter_table_sql (str): The SQL statement to alter the table.
            Example:
                alter_table_sql = 
                    ALTER TABLE employees
                    ADD COLUMN email text;
                    
        Raises:
            Error: If there is an error executing the SQL statement.

        Returns:
            Boolean: True if the table was altered successfully, False otherwise.
        """
        try:
            c = self.conn.cursor()
            c.execute(alter_table_sql)
            return True
        except Error as e:
            print(e)
        
        return False

    def truncate_table(self, table_name):
        """
        Truncates a table in the database.

        Args:
            table_name (str): The name of the table to truncate.

        Raises:
            Error: If there is an error executing the SQL statement.

        Returns:
            Boolean: True if the table was truncated successfully, False otherwise.
        """
        try:
            c = self.conn.cursor()
            c.execute(f"DELETE FROM {table_name}")
            return True
        except Error as e:
            print(e)
        
        return False

    def rename_table(self, table_name, new_table_name):
        """
        Renames a table in the database.

        Args:
            table_name (str): The name of the table to rename.
            new_table_name (str): The new name for the table.

        Raises:
            Error: If there is an error executing the SQL statement.

        Returns:
            Boolean: True if the table was renamed successfully, False otherwise.
        """
        try:
            c = self.conn.cursor()
            c.execute(f"ALTER TABLE {table_name} RENAME TO {new_table_name}")
            return True
        except Error as e:
            print(e)
        
        return False

    def comment_table(self, table_name, comment):
        """
        Comments a table in the database.

        Args:
            table_name (str): The name of the table to comment.
            comment (str): The comment for the table.

        Raises:
            Error: If there is an error executing the SQL statement.

        Returns:
            Boolean: True if the table was commented successfully, False otherwise.
        """
        try:
            c = self.conn.cursor()
            c.execute(f"COMMENT ON TABLE {table_name} IS {comment}")
            return True
        except Error as e:
            print(e)
        
        return False

    def sanity_check(self, table_name, export_to_file=False, file_name=None):
        """
        NOTE: WILL LATER MOVE THIS FUNCTION TO THE TESTING MODULE.
        NOT DEVELOPING ANY FURTHER AS OF NOW.

        Performs a sanity check on a table in the database.

        Args:
            table_name (str): The name of the table to perform the sanity check on.
            export_to_file (bool): Whether to export the sanity check results to a file. Default is False.
            file_name (str): The name of the file to export the sanity check results to. Only applicable if export_to_file is True.

        Returns:
            None
        """
        # check if primaries key exists
        def __check_primary_key_exists(self, table_name):
            """
            Checks if the primary key exists for a table in the database.

            Args:
                table_name (str): The name of the table to check.

            Returns:
                Boolean: True if the primary key exists, False otherwise.
            """
            try:
                c = self.conn.cursor()
                c.execute(f"PRAGMA table_info({table_name})")
                columns = c.fetchall()
                for column in columns:
                    if column[5] == 1:  # Check if the column is part of the primary key
                        return True
                return False
            except Error as e:
                print(e)
                return False
        
        __check_primary_key_exists(self, table_name)

        # check if primary key columns are constrained to be non-null
        # check if primary key columns are constrained to be unique

        # check if foreign key columns exist in the referenced table
        # check if foreign key columns are constrained to be non-null
        # check if foreign key columns are constrained to be unique
        # check if foreign key columns are constrained to be primary keys

        # check if unique key columns are constrained to be non-null

        # check if check constraints are valid
        # check if default values are valid
        # check if column data types are valid

        # check if table is commented
        # check if column names are commented
        # check if column names follow a naming conventio n with regex

        if export_to_file:
            # Export the sanity check results to a file
            if file_name is None:
                file_name = f"{table_name}_sanity_check_results.txt"
            with open(file_name, "w") as file:
                file.write("Sanity Check Results:\n")
                file.write(f"Table Name: {table_name}\n")
                # Write the sanity check results to the file

    

In [None]:
class DatabaseDMLManager:
    '''
    A class that manages a SQLite database connection and provides methods for interacting with the database.
    
    Attributes:
        db_name (str): The name of the database.
        conn (sqlite3.Connection): The SQLite database connection.
    '''
    def __init__(self, db_name):
        self.db_name = db_name
        self.conn = self.open_connection()

    def __open_connection(self):
        '''
        Opens a connection to the database.
        
        Returns:
            sqlite3.Connection: The SQLite database connection; 
                Or None if failed.
        '''
        try:
            self.conn = sqlite3.connect(self.db_name)
            return self.conn
        except Error as e:
            print(e)
        
        return None
    
    def close_connection(self):
        '''
        Closes the database connection.
        '''
        self.conn.close()
    
    def get_db_table_names(self):
        '''
        Retrieves the names of all tables in the database.
        
        Returns:
            list: A list of table names.
        '''
        cursor = self.conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        tables = cursor.fetchall()
        return tables
    
    def get_table_columns(self, table_name):
        '''
        Retrieves the column names of a specific table.
        
        Args:
            table_name (str): The name of the table.
        
        Returns:
            list: A list of column names.
        '''
        cursor = self.conn.cursor()
        cursor.execute("PRAGMA table_info({})".format(table_name))
        columns = [column[1] for column in cursor.fetchall()]
        return columns
    
    def get_table_number_of_rows(self, table_name):
        '''
        Retrieves the number of rows in a specific table.
        
        Args:
            table_name (str): The name of the table.
        
        Returns:
            int: The number of rows in the table.
        '''
        cursor = self.conn.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        count = cursor.fetchone()
        return count[0]
    
    def fetch_column_from_table(self, table_name, column_name, sample_size=None):
        '''
        Retrieves a specific column from a table.
        
        Args:
            table_name (str): The name of the table.
            column_name (str): The name of the column.
            sample_size (int, optional): The number of rows to retrieve. Defaults to None.
        
        Returns:
            list: A list of values from the specified column.
        '''
        assert sample_size is None or (isinstance(sample_size, int) and sample_size > 0), \
                    "Sample size must be a positive integer."
        
        assert table_name in ['business', 'checkin', 'review', 'tip', 'user'], \
                    "Invalid table name."
        
        assert column_name in self.get_table_columns(table_name), \
                "Invalid column name. {} does not exist in the table {}.".\
                format(column_name, table_name)

        cursor = self.conn.cursor()
        if sample_size:
            cursor.execute(f"SELECT {column_name} FROM {table_name} LIMIT {sample_size}")
            result = cursor.fetchmany(sample_size)
        else:
            cursor.execute(f"SELECT {column_name} FROM {table_name}")
            result = cursor.fetchall()
        
        return result

    def read_db_table(self, table_name, sample_size=None, verbose=False):
        '''
        Retrieves all/requested number of rows from a specific table.
        
        Args:
            table_name (str): The name of the table.
            sample_size (int, optional): The number of rows to retrieve. Defaults to None.
            verbose (bool, optional): Whether to print the retrieved rows. Defaults to False.
        
        Returns:
            list: A list of rows from the specified table.
        '''
        assert sample_size is None or (isinstance(sample_size, int) and sample_size > 0), \
                    "Sample size must be a positive integer."   

        assert table_name in ['business', 'checkin', 'review', 'tip', 'user'], \
                    "Invalid table name."
        
        cursor = self.conn.cursor()
        if sample_size:
            cursor.execute(f"SELECT * FROM {table_name} LIMIT {sample_size}".format(table_name))
            result = cursor.fetchmany(sample_size)
        else:
            cursor.execute(f"SELECT * FROM {table_name}".format(table_name))
            result = cursor.fetchall()
        
        if sample_size and verbose:
            for row in result:
                print(row)

        for row in result:
            print(row)

        return result
    
    def insert_data(self, table_name, data_dict):
        '''
        Inserts data into the database.
        Leaving to SQL to generate errors for invalid values like that 
            not matching the data type or primary key or null values in non-null 
            columns or duplicate values in unique columns.
        
        Args:
            table_name (str): The name of the table.
            data_dict (dict): The data to insert.

        Returns:
            Boolean: True if the data was inserted successfully, False otherwise.        
        '''
        cursor = self.conn.cursor()
        columns = ', '.join(data_dict.keys())
        
        # Check if all columns in data_dict are valid columns in the table
        valid_columns = self.get_table_columns(table_name)
        invalid_columns = [column for column in data_dict.keys() if column not in valid_columns]
        if invalid_columns:
            print(f"Invalid columns: {', '.join(invalid_columns)}")
            return False
        
        # Create a string of placeholders for the values to insert
        # in order to prevent SQL injection attacks
        placeholders = ', '.join(['?' for _ in range(len(data_dict))])
        values = tuple(data_dict.values())
        try:
            cursor.execute(f"INSERT INTO {table_name} ({columns}) VALUES \
                        ({placeholders})", values)
            self.conn.commit()
            return True
        except Exception as e:
            # For table constraints and other value errors
            print(f"Error inserting data into {table_name}: {e}")
            return False
    
    def update_data(self, table_name, data_dict):
        '''
        Updates data in the database.

        Args:
            table_name (str): The name of the table.
            data_dict (dict): The new data to update.

        Returns:
            Boolean: True if the data was updated successfully, False otherwise.
        '''
        cursor = self.conn.cursor()
        
        # Check if all columns in data_dict are valid columns in the table
        valid_columns = self.get_table_columns(table_name)
        invalid_columns = [column for column in data_dict.keys() if column not in valid_columns]
        if invalid_columns:
            print(f"Invalid columns: {', '.join(invalid_columns)}")
            return False

        # Create a string of placeholders for the values to update
        # in order to prevent SQL injection attacks
        placeholders = ', '.join([f"{column} = ?" for column in data_dict.keys()])
        values = tuple(data_dict.values())
        try:
            cursor.execute(f"UPDATE {table_name} SET {placeholders}", values)
            self.conn.commit()
            return True
        except Exception as e:
            print(f"Error updating data in {table_name}: {e}")
            return False
    
    def __get_primary_key_columns(self, table_name):
        '''
        Retrieves the primary key column(s) of a table.

        Args:
            table_name (str): The name of the table.

        Returns:
            list: A list of primary key column(s).
        '''
        cursor = self.conn.cursor()
        cursor.execute(f"PRAGMA table_info({table_name})")
        table_info = cursor.fetchall()

        # table_info structure in sqlite looks like:
        # (column_id, column_name, data_type, not_null, default_value, is_primary_key)
        
        # is_primary_key is 1 for all the columns in the composite primary key
        primary_key_columns = [column[1] for column in table_info if column[5] == 1]
        
        return primary_key_columns
            
    def __get_deletion_columns(self, table_name, deletion_dict):
        '''
        Retrieves the columns to use as the deletion key(s) from the deletion_dict.
        '''
        table_prim_key_columns = self.__get_primary_key_columns(table_name)
        if not table_prim_key_columns:
            # IMPORTANT NOTE: Ideally, we should not allow deletion without a primary key
            # But allowing it for now
            deleteion_columns = list(deletion_dict.keys())
        else:
            # check if the deletion key(s) are keys of the table - primary or composite etc.
            # If passed multiple columns; Filter out only the columns that form the 
            # primary key
            deleteion_columns = []
            for column in deletion_dict.keys():
                if column not in table_prim_key_columns:
                    print(f"Column {column} is not a primary key column. 
                        Cannot delete with this key.")
                    return False, None
                else: 
                    deleteion_columns.append(column)

            # ensure all columns that form the primary key are passed in the deletion_dict
            # to prevent accidental deletion of multiple rows in case of composite primary key
            if len(deleteion_columns) != len(table_prim_key_columns):
                print(f"Not all primary key columns are not passed in the deletion_dict.")
                print("Key deleteion_columns: ", deleteion_columns)
                print("table_prim_key_columns: ", table_prim_key_columns)
                return False, None

        return deleteion_columns
        
    def delete_data(self, table_name, deletion_dict, return_deleted_data=False):
        '''
            Deletes data from the database based on the provided key-value 
            pairs in the deletion_dict.

        Args:
            table_name (str): The name of the table.
            deletion_dict (dict): A dictionary containing the key-value pairs 
                        to use as the deletion key-value pairs.
            return_deleted_data (bool, optional): Whether to return the deleted 
                    data. Defaults to False.

        Returns:
            tuple: A tuple containing a boolean flag indicating the success of 
                    the operation and the deleted data 
                    (if return_deleted_data is True).
        '''
        cursor = self.conn.cursor()

        # Check if deletion_key_name is a valid column in the table
        valid_columns = self.get_table_columns(table_name)
        for column in deletion_dict.keys():
            if column not in valid_columns:
                print(f"Invalid column: {column}")
                return False, None
        
        deleteion_columns = self.__get_deletion_columns(table_name, deletion_dict)
        
        # Create a string of placeholders for the WHERE clause
        placeholders = ' AND '.join([f"{column} = ?" for column in deleteion_columns])
        values = tuple(deletion_dict.values())
        
        # Check if the column can be deleted based on table constraints
        # Add your own logic here to check the constraints
        try:
            # Select the data to be returned before deletion
            if return_deleted_data:
                cursor.execute(f"SELECT * FROM {table_name} WHERE \
                               {placeholders}", values)
                deleted_data = cursor.fetchall()
            else:
                deleted_data = None

            # Delete the data based on the deletion key
            cursor.execute(f"DELETE FROM {table_name} WHERE  {placeholders}", \
                           values)
            self.conn.commit()

            return True, deleted_data
        except Exception as e:
            print(f"Error deleting data from {table_name}: {e}")
            return False, None

    def read_data(self, table_name, key_val_dict):
        '''
        Reads data from the database based on the provided key-value pairs.
        Can handle multiple key-value pairs.

        Args:
            table_name (str): The name of the table.
            key_val_dict (dict): A dictionary containing the key-value pairs 
                    to use for reading the data.

        Returns:
            list: A list of rows from the specified table.
        '''
        cursor = self.conn.cursor()
    
        # Create a string of placeholders for the WHERE clause
        placeholders = ' AND '.join([f"{column} = ?" for column in \
                                     key_val_dict.keys()])
        values = tuple(key_val_dict.values())
        
        try:
            cursor.execute(f"SELECT * FROM {table_name} WHERE {placeholders}", \
                           values)
            result = cursor.fetchall()
            return result
        except Exception as e:
            print(f"Error reading data from {table_name}: {e}")
            return [] 
        
