In [1]:
from fastapi import FastAPI, HTTPException
import sqlite3
import pandas as pd
from typing import *
from pydantic import BaseModel
from fastapi.responses import JSONResponse
import numpy as np
import networkx as nx
from decimal import *
import time , datetime
import csv
import random
import os, sys
import yaml
from faker import Faker
from os.path import dirname, join
from re import sub, compile
import string
import tempfile
import sqlite3
from sdv.metadata import MultiTableMetadata
from sdv.multi_table import HMASynthesizer # Hierarchical Model Synthesizer

In [2]:
app = FastAPI()

In [3]:
database = "data/all_data_sqllite.db"

In [4]:
print(database)

data/all_data_sqllite.db


In [5]:
preset_autogen_data_path = "data/output/autogen/"

In [6]:
# 1. @app.get("/presets")
@app.get("/presets")
def get_preset_function():
    """
    Retrieve distinct preset names from the database.
    """
    try:
        conn = sqlite3.connect(database)
        df_meta_table_defn = pd.read_sql("SELECT DISTINCT preset_name FROM meta_table_defn;", conn)
        conn.close()
        return df_meta_table_defn['preset_name'].tolist()
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error retrieving presets: {str(e)}")

In [7]:
get_preset_function()

['Standard Claims']

In [8]:
# 2. @app.get("/preset/{preset_name}/tables")
@app.get("/preset/{preset_name}/tables")
def get_table_name(preset_name: str):
    """
    Retrieve the list of tables associated with a given preset.
    """
    try:
        conn = sqlite3.connect(database)
        query = f"SELECT DISTINCT TableName FROM meta_table_defn WHERE preset_name='{preset_name}';"
        df_tables = pd.read_sql(query, conn)
        conn.close()
        return df_tables['TableName'].tolist()
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error retrieving tables for preset {preset_name}: {str(e)}")


In [10]:
get_table_name('Standard Claims')

['claims', 'members', 'providers']

In [11]:
# 3. @app.get("/preset/{preset_name}/table/{table_name}/definition")
@app.get("/preset/{preset_name}/table/{table_name}/definition")
def get_table_defination(preset_name: str, table_name: str):
    """
    Retrieve the metadata (schema, datatypes) for a selected table.
    """
    try:
        conn = sqlite3.connect(database)
        query = f"SELECT * FROM meta_table_defn WHERE preset_name='{preset_name}' AND TableName='{table_name}';"
        df_table_def = pd.read_sql(query, conn)
        conn.close()
        return df_table_def.to_dict(orient="records")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error retrieving definition for table {table_name}: {str(e)}")


In [12]:
get_table_defination('Standard Claims' , 'members') 

[{'TableName': 'members',
  'Columns': 'Member_Id',
  'Datatype': 'serial_no',
  'MinValue': '100000000',
  'MaxValue': '900000000',
  'CustomDirectListValue': '',
  'externalMapFileName': None,
  'externalMapColumnName': None,
  'preset_name': 'Standard Claims'},
 {'TableName': 'members',
  'Columns': 'Member_Dep_Num',
  'Datatype': 'integer',
  'MinValue': '001',
  'MaxValue': '099',
  'CustomDirectListValue': '',
  'externalMapFileName': None,
  'externalMapColumnName': None,
  'preset_name': 'Standard Claims'},
 {'TableName': 'members',
  'Columns': 'Member_First_Name',
  'Datatype': 'individual:first_name',
  'MinValue': '',
  'MaxValue': '',
  'CustomDirectListValue': '',
  'externalMapFileName': None,
  'externalMapColumnName': None,
  'preset_name': 'Standard Claims'},
 {'TableName': 'members',
  'Columns': 'Member_Last_Name',
  'Datatype': 'individual:last_name',
  'MinValue': '',
  'MaxValue': '',
  'CustomDirectListValue': '',
  'externalMapFileName': None,
  'externalMapCol

In [13]:
def create_table_dependency_df(df):

    """
    The function create_table_dependency_df takes a DataFrame containing table dependency information and processes it to:
    1. Ensure required columns exist,
    2. Remove duplicates and cyclic dependencies (self-references),
    3. Return a clean DataFrame that can be used to analyze table dependencies.
    If anything goes wrong (e.g., missing columns or other errors), the function handles the error and returns 
    an empty DataFrame to prevent breaking the workflow.
    """

    try:
        # Check if required columns are present in the dataframe
        if not all(col in df.columns for col in ['TableName', 'externalMapFileName']):
            raise KeyError("DataFrame must contain 'TableName' and 'externalMapFileName' columns.")
        df = df[['TableName', 'externalMapFileName']].copy()
        df = df.drop_duplicates(subset=['TableName', 'externalMapFileName'])
        df = df.loc[df["TableName"] != df["externalMapFileName"]]  # Remove cyclic relationships
        return df

    except KeyError as ke:  # Handle the missing column error (could return None, raise another exception, or return an empty DataFrame)
        print(f"KeyError: {str(ke)}")
        return pd.DataFrame()

    except Exception as e:  # Handle any other unexpected errors
        print(f"An error occurred: {str(e)}")
        return pd.DataFrame()

In [15]:
def create_table_dependency_for_nx_graph(df):

    """
    The create_table_dependency_for_nx_graph function converts table dependency data from a DataFrame into
    a directed graph.Each node in the graph represents a table, and edges indicate dependencies between tables.
    
    Validation:
    1. Ensures the input DataFrame contains the necessary columns.
    2. Validates that TableName and externalMapFileName values are neither NaN nor empty strings.

    Graph Construction:
    1. Initializes an empty directed graph.
    2. Iterates through each row to add edges representing dependencies.
    3. If a table has dependencies (externalMapFileName), an edge is added.
    4. If a table has no dependencies (externalMapFileName is NaN), it's added as an isolated node.
    """

    try:
        # Check if required columns are present in the dataframe
        if not all(col in df.columns for col in ['TableName', 'externalMapFileName']):
            raise KeyError("DataFrame must contain 'TableName' and 'externalMapFileName' columns.")
        nx_graph_table_dependency = nx.DiGraph()

        # Add edges to the graph based on the DataFrame
        for index, row in df.iterrows():
            table_name = row['TableName']
            external_map_file_name = row['externalMapFileName']

            # Check for missing or invalid values in 'TableName' and 'externalMapFileName'
            if pd.isna(table_name) or (isinstance(table_name, str) and table_name.strip() == ""):
                raise ValueError(f"Invalid 'TableName' value at index {index}: {table_name}")

            if pd.notna(external_map_file_name):  # Add edges only if 'externalMapFileName' is not NaN
                if isinstance(external_map_file_name, str) and external_map_file_name.strip() == "":
                    raise ValueError(f"Invalid 'externalMapFileName' value at index {index}: {external_map_file_name}")
                nx_graph_table_dependency.add_edge(table_name, external_map_file_name)
            else:
                # If externalMapFileName is NaN, add the table_name as an isolated node
                nx_graph_table_dependency.add_node(table_name)

        return nx_graph_table_dependency

    except KeyError as ke:    # Handle missing columns, return an empty graph
        print(f"KeyError: {str(ke)}")
        return nx.DiGraph()

    except ValueError as ve:  # Handle invalid values, return an empty graph
        print(f"ValueError: {str(ve)}")
        return nx.DiGraph()

    except Exception as e:    # Catch any other unforeseen errors and return an empty graph
        print(f"An unexpected error occurred: {str(e)}")
        return nx.DiGraph()

In [16]:
def nx_get_downstream_nodes(graph, start_nodes):
    """
    Finds all downstream nodes from a set of start nodes in a directed graph,
    including their levels. Ensures no duplicates with the highest level retained.
    Args:
      graph: A NetworkX directed graph.
      start_nodes: A list of start nodes.
    Returns:
      A DataFrame containing the downstream nodes and their levels.
    """
    try:
        if not isinstance(graph, nx.DiGraph): # Ensure the input graph is a directed graph
            raise TypeError("Input graph must be a directed NetworkX graph (DiGraph).")

        if not isinstance(start_nodes, list): # Ensure the start_nodes is a list
            raise TypeError("start_nodes must be a list of nodes.")

        node_levels = {}  # Initialize a dictionary to store node levels
        queue = [(node, 0) for node in start_nodes]  # Initialize queue with start nodes and level 0
        
        while queue:
            current_node, current_level = queue.pop(0)
            if current_node not in graph.nodes:  # Check if current_node exists in the graph
                raise ValueError(f"Node '{current_node}' does not exist in the graph.")
            
            # Update node_levels only if the node is either not present or has a lower level
            if current_node not in node_levels or current_level > node_levels[current_node]:
                node_levels[current_node] = current_level
                for neighbor in graph.successors(current_node):  # Iterate through the neighbors (successors) of the current node
                    queue.append((neighbor, current_level + 1))
        downstream_nodes = [(node, level) for node, level in node_levels.items()]    # Convert the node levels into a DataFrame
        df_table_dependency_with_levels = pd.DataFrame(downstream_nodes, columns=['TableName', 'Level'])
        return df_table_dependency_with_levels

    except TypeError as te: # Handle type errors such as wrong graph type or non-list start nodes
        print(f"TypeError: {str(te)}")
        return pd.DataFrame()

    except ValueError as ve:  # Handle cases where nodes are missing or invalid
        print(f"ValueError: {str(ve)}") 
        return pd.DataFrame()

    except Exception as e: # Catch any other unforeseen errors
        print(f"An unexpected error occurred: {str(e)}")  
        return pd.DataFrame()


In [17]:
# 4. @app.post("/table/dependencies")
@app.post("/table/dependencies")
def post_table_dependency_calculation(selected_tables: List[str]):
    """
    Calculate table dependencies and return the ordered list of tables based on their dependencies.
    """
    try:
        # Fetch metadata for selected tables
        conn = sqlite3.connect(database)
        df_meta_table_defn_selected_table_all = pd.read_sql("SELECT * FROM meta_table_defn;", conn)
        conn.close()

        """
        This code keeps only the rows from df_meta_table_defn_selected_table_all where the 
        TableName is part of the selected_tables collection, effectively filtering the DataFrame
        based on the table names you're interested in.
        .isin(selected_tables): This checks if each value in the TableName column is in the selected_tables 
        collection (which could be a list, set, or other iterable).
        """

        df_meta_table_defn_selected_table_all = df_meta_table_defn_selected_table_all[
            df_meta_table_defn_selected_table_all['TableName'].isin(selected_tables)
        ]

        """
        This code processes table metadata to generate a list of tables in the order of their dependencies. 
        It first constructs a graph of table relationships, identifies downstream dependencies from a set of selected tables, 
        sorts them by dependency depth, and then returns the table names in that order. The result (final_listOftableOrder)
        is a list of tables ordered from the most dependent to the least dependent.
        """
        df_dependent_tables = create_table_dependency_df(df_meta_table_defn_selected_table_all)
        nx_table_dependency_graph = create_table_dependency_for_nx_graph(df_dependent_tables)
        df_total_table_dependency = nx_get_downstream_nodes(nx_table_dependency_graph, selected_tables)
    
        set_total_table_dependency = set(df_total_table_dependency["TableName"])
        df_total_table_dependency_sorted = df_total_table_dependency.sort_values(by='Level', ascending=False)
        table_names_final_listOftableOrder = df_total_table_dependency_sorted['TableName']
        final_listOftableOrder = table_names_final_listOftableOrder.tolist()

        # Return ordered list of tables and dependencies
        return {
            "table_order": final_listOftableOrder,
            "dependencies": list(set_total_table_dependency)
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error calculating table dependencies: {str(e)}")

In [19]:
post_table_dependency_calculation(['claims', 'members', 'providers'])

{'table_order': ['members', 'providers', 'claims'],
 'dependencies': ['claims', 'providers', 'members']}

In [20]:
def createVariableNameFromFileName(filename):
    """
    Generates a set of variable names based on a given filename.
    Args:
        filename (str): The input file name or file path.
    Returns:
        tuple: A tuple containing the randomIndex_variableName, len_variableName, and variableName.
    """
    try:
        if not isinstance(filename, str):  # Ensure the filename is a string
            raise TypeError("Filename must be a string.")

        if not os.path.exists(filename): # Ensure the file exists (optional, if needed)
            raise FileNotFoundError(f"The file '{filename}' does not exist.")

        baseFileName = os.path.basename(filename) # Extract the base filename from the full path

        # Use regex to create a valid variable name by replacing non-alphanumeric characters with '_'
        variableName = re.sub(r'\W+', '_', baseFileName)

        # Construct variable names
        variableName = "df_" + variableName
        len_variableName = "len_" + variableName
        randomIndex_variableName = "random_" + variableName
        return randomIndex_variableName, len_variableName, variableName

    except TypeError as te:
        print(f"TypeError: {str(te)}")
        return "", "", ""    # Return empty strings to indicate failure

    except FileNotFoundError as fnf:
        print(f"FileNotFoundError: {str(fnf)}")
        return "", "", ""   # Return empty strings to indicate failure

    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")
        return "", "", ""    # Return empty strings to indicate failure
    

In [21]:
def isNan(x):
    # Returns True if a value is Nan else False
    return (x != x)


In [22]:
def getMinMaxValue(userMin, userMax, configMin, configMax):
    """
    Returns the minimum and maximum values based on user inputs and configuration defaults.
    Args:
        userMin: User-defined minimum value (can be NaN or None).
        userMax: User-defined maximum value (can be NaN or None).
        configMin: Configuration-defined minimum value.
        configMax: Configuration-defined maximum value.
    Returns:
        tuple: A tuple containing the determined minimum and maximum values.
    """
    try:
        # Default return values are from the config
        returnMin = configMin
        returnMax = configMax

        # If userMin is not NaN, use it instead of configMin
        if not isNan(userMin):
            returnMin = userMin
        
        # If userMax is not NaN, use it instead of configMax
        if not isNan(userMax):
            returnMax = userMax

        return returnMin, returnMax

    except TypeError as te:  # Handle invalid input types by returning the config values
        print(f"TypeError: {str(te)} - Invalid input types.")
        return configMin, configMax

    except Exception as e:  # Handle any other unexpected errors
        print(f"An unexpected error occurred: {str(e)}")
        return configMin, configMax

In [23]:
class Timer(object):
    def __init__(self, total):
        self.start = datetime.datetime.now()
        self.total = total

    def remains(self, done):
        now = datetime.datetime.now()
        # print(now-start)  # elapsed time
        left = (self.total - done) * (now - self.start) / (done+0.000000000001)
        sec = int(left.total_seconds())
        if sec < 60:
            return "{} seconds".format(sec)
        else:
            return "{} minutes".format(int(sec / 60))

In [24]:
def flatten_list(nested_list):
    """
    Flattens a nested list into a single list.
    Args:
        nested_list (list): A list containing nested lists.
    Returns:
        list: A flattened list containing all items from the nested lists.
    """
    try:   # Ensure that the input is a list
        if not isinstance(nested_list, list):
            raise TypeError("Input must be a list.")
        list_of_lists = []
        for item in nested_list:   # Iterate through each item in the list
            if not isinstance(item, list):   # Ensure each item is a list
                raise TypeError(f"Each element of the nested list must be a list, but got {type(item)}.")
            list_of_lists.extend(item)  # Extend the flattened list with the current item
        return list_of_lists

    except TypeError as te: # Return an empty list in case of error
        print(f"TypeError: {str(te)}")
        return []

    except Exception as e: # Return an empty list in case of unforeseen errors
        print(f"An unexpected error occurred: {str(e)}")
        return []


In [25]:
def get_externalMapFileName(externalMapFileName, preset_autogen_data_path):
    """
    Processes the externalMapFileName by ensuring it ends with a .csv extension.
    Args:
        externalMapFileName (str): The filename or path of the external map file.
        preset_autogen_data_path (str): A preset path for automatically generated data files.
    Returns:
        str: The processed external map file name.
    """
    try:
        
        if not isinstance(externalMapFileName, str):   # Ensure externalMapFileName is a string
            raise TypeError("externalMapFileName must be a string.")

        if not isinstance(preset_autogen_data_path, str):  # Ensure preset_autogen_data_path is a string
            raise TypeError("preset_autogen_data_path must be a string.")

        if externalMapFileName.endswith(('.csv', '.CSV')):   # Check if the file name ends with .csv or .CSV
            new_externalMapFileName = externalMapFileName
        else:
            new_externalMapFileName = preset_autogen_data_path + externalMapFileName + ".csv"
        
        return new_externalMapFileName

    except TypeError as te:
        print(f"TypeError: {str(te)}")
        return ""

    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")
        return ""

In [26]:
def get_tablename_from_filepath(filepath):
    """
    Extracts the table name from a file path by removing its extension 
    and replacing spaces and dots with underscores.

    Args:
        filepath (str): The full path to the file.
    Returns:
        str: The extracted table name.
    """
    try:
        if not isinstance(filepath, str):   # Ensure that the input is a string
            raise TypeError("Input filepath must be a string.")

        filenamewithext = os.path.basename(filepath)   # Get the filename with extension
        filename, ext = os.path.splitext(filenamewithext)  # Split the filename into name and extension
        tablename = filename.replace(' ', '_').replace('.', '_')  # Replace spaces and dots with underscores
        return tablename

    except TypeError as te:
        print(f"TypeError: {str(te)}")
        return ""

    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")
        return ""

In [27]:
def load_table_into_db(database, tablename, df_generated_data):
    try:
        # Connect to the database
        conn = sqlite3.connect(database)
        cur = conn.cursor()

        # Check if the table exists
        listOfTables = cur.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{tablename}';").fetchall()

        if listOfTables == []:
            print('Table not found!')
            print(f"Saving generated data to {tablename}...")
            df_generated_data.to_sql(name=tablename, con=conn, index=False)
            print(f"Data saved to {tablename}.")
        else:
            print('Table found!')
            print(f"Dropping table {tablename}...")
            conn.execute(f"DROP TABLE {tablename}")
            print(f"Saving generated data to {tablename}...")
            df_generated_data.to_sql(name=tablename, con=conn, index=False)
            print(f"Data saved to {tablename}.")

    except sqlite3.Error as e:
        print(f"An error occurred: {e}")

    finally:
        # Ensure the connection is closed
        if conn:
            conn.close()