In [86]:
import csv
import os
import random
from faker import Faker

class TPCH_RBAC_Simulator:
    def __init__(self):
        self.fake = Faker()
        self.policy_id = self.get_last_policy_id()  # Initialize policy ID from existing data
        self.initialize_scenarios()

    def get_last_policy_id(self):
        """ Retrieve the last policy ID from the policy.csv file if it exists, else start from 0 """
        try:
            with open('policy.csv', 'r') as file:
                last_line = list(csv.reader(file))[-1]
                return int(last_line[0]) + 1
        except (FileNotFoundError, IndexError):
            return 0

    def initialize_scenarios(self):
        self.scenarios = {
        "Financial Health Overview": {
            "roles": {
                "Financial Analyst": {
                    "permissions": {
                        "ORDERS": ["SELECT o_totalprice", "AVG o_totalprice"],
                        "LINEITEM": ["SELECT l_extendedprice"]
                    },
                    "row_conditions": "o_orderdate >= '2022-01-01'"
                }
            }
        },
        "Quarterly Revenue Trends": {
            "roles": {
                "Financial Analyst": {
                    "permissions": {
                        "ORDERS": ["SUM o_totalprice", "COUNT o_orderid"],
                        "LINEITEM": ["AVG l_discount"]
                    },
                    "row_conditions": "o_orderdate between '2022-01-01' and '2022-03-31'"
                }
            }
        },
        "Inventory Analysis and Optimization": {
            "roles": {
                "Inventory Manager": {
                    "permissions": {
                        "PART": ["SELECT p_partkey"],
                        "PARTSUPP": ["AVG ps_supplycost"]
                    },
                    "row_conditions": "p_type = 'critical'"
                }
            }
        },
        "Critical Stock Levels": {
            "roles": {
                "Inventory Manager": {
                    "permissions": {
                        "PARTSUPP": ["SELECT ps_availqty", "SELECT ps_partkey"],
                        "PART": ["SELECT p_name"]
                    },
                    "row_conditions": "ps_availqty < 50"
                }
            }
        },
        "Market Segmentation Analysis": {
            "roles": {
                "Marketing Analyst": {
                    "permissions": {
                        "CUSTOMER": ["SELECT c_mktsegment", "COUNT c_custkey"],
                        "ORDERS": ["SELECT o_orderkey"]
                    },
                    "row_conditions": "c_mktsegment = 'AUTOMOBILE'"
                }
            }
        },
        "High-Value Customer Identification": {
            "roles": {
                "Marketing Analyst": {
                    "permissions": {
                        "CUSTOMER": ["SELECT c_name", "SELECT c_revenue"],
                        "ORDERS": ["SELECT o_totalprice"]
                    },
                    "row_conditions": "c_revenue > 10000"
                }
            }
        },
        "Supplier Financial Health Assessment": {
            "roles": {
                "Procurement Specialist": {
                    "permissions": {
                        "SUPPLIER": ["SELECT s_acctbal", "SUM s_acctbal"],
                        "PARTSUPP": ["SELECT ps_supplycost"]
                    },
                    "row_conditions": "s_acctbal > 5000"
                }
            }
        },
        "Supplier Reliability Check": {
            "roles": {
                "Procurement Specialist": {
                    "permissions": {
                        "SUPPLIER": ["SELECT s_performance"],
                        "PARTSUPP": ["SELECT ps_partkey"]
                    },
                    "row_conditions": "s_performance >= 8"
                }
            }
        },
        "Regional Performance and Tax Analysis": {
            "roles": {
                "Tax Analyst": {
                    "permissions": {
                        "ORDERS": ["SELECT o_region", "SUM o_tax"],
                        "CUSTOMER": ["SELECT c_region"]
                    },
                    "row_conditions": "o_region = 'EUROPE'"
                }
            }
        },
        "End of Year Sales Review": {
            "roles": {
                "Regional Sales Manager": {
                    "permissions": {
                        "ORDERS": ["SUM o_totalprice", "SELECT o_orderdate"],
                        "LINEITEM": ["SELECT l_quantity"]
                    },
                    "row_conditions": "o_orderdate between '2022-10-01' and '2022-12-31'"
                }
            }
        },
        "Global Demand Forecasting": {
            "roles": {
                "Market Research Analyst": {
                    "permissions": {
                        "ORDERS": ["SELECT o_shippriority", "COUNT o_orderid"],
                        "CUSTOMER": ["SELECT c_nationkey"]
                    },
                    "row_conditions": "o_orderdate >= '2022-01-01'"
                }
            }
        },
        "Operational Efficiency Review": {
            "roles": {
                "Operations Manager": {
                    "permissions": {
                        "LINEITEM": ["SUM l_quantity", "AVG l_extendedprice"],
                        "ORDERS": ["COUNT o_orderid"]
                    },
                    "row_conditions": "l_shipdate <= '2022-12-31'"
                }
            }
        },
        "Compliance Audit": {
            "roles": {
                "Compliance Officer": {
                    "permissions": {
                        "SUPPLIER": ["SELECT s_comment", "SELECT s_name"],
                        "PART": ["SELECT p_name"]
                    },
                    "row_conditions": "s_acctbal < 0"
                }
            }
        },
        "Product Profitability Analysis": {
            "roles": {
                "Product Manager": {
                    "permissions": {
                        "PART": ["SELECT p_partkey", "AVG ps_supplycost"],
                        "PARTSUPP": ["SELECT ps_partkey"]
                    },
                    "row_conditions": "p_size between 1 and 50"
                }
            }
        },
        "Customer Loyalty Program Review": {
            "roles": {
                "Customer Relationship Manager": {
                    "permissions": {
                        "CUSTOMER": ["SELECT c_name", "SUM c_acctbal"],
                        "ORDERS": ["COUNT o_orderid"]
                    },
                    "row_conditions": "c_acctbal > 5000"
                }
            }
        },
        "Resource Allocation Planning": {
            "roles": {
                "Resource Manager": {
                    "permissions": {
                        "PARTSUPP": ["SELECT ps_supplycost", "SELECT ps_availqty"],
                        "SUPPLIER": ["SELECT s_nationkey"]
                    },
                    "row_conditions": "ps_availqty > 100"
                }
            }
        },
        "Quality Control Analysis": {
            "roles": {
                "Quality Assurance Manager": {
                    "permissions": {
                        "LINEITEM": ["SELECT l_quality", "AVG l_quality"],
                        "ORDERS": ["SELECT o_orderstatus"]
                    },
                    "row_conditions": "l_quality < 'B'"
                }
            }
        },
        "Revenue Stream Analysis": {
            "roles": {
                "Chief Financial Officer": {
                    "permissions": {
                        "ORDERS": ["SUM o_totalprice"],
                        "CUSTOMER": ["SELECT c_credit"]
                    },
                    "row_conditions": "o_totalprice > 10000"
                }
            }
        },
        "Strategic Planning Session": {
            "roles": {
                "Strategy Analyst": {
                    "permissions": {
                        "NATION": ["SELECT n_name", "COUNT n_regionkey"],
                        "REGION": ["SELECT r_name"]
                    },
                    "row_conditions": "n_regionkey =r_regionkey"
                }
            }
        },
        "Supply Chain Disruption Analysis": {
            "roles": {
                "Supply Chain Manager": {
                    "permissions": {
                        "SUPPLIER": ["SELECT s_name", "COUNT s_suppkey"],
                        "PARTSUPP": ["SELECT ps_partkey"]
                    },
                    "row_conditions": ".s_status = 'active'"
                }
            }
        }
    }


    def generate_random_name(self):
        """Generates a random name from predefined first and last names."""
        return self.fake.name()

    def record_exists(self, filename, entry):
        """ Check if an entry already exists in a file """
        try:
            with open(filename, 'r', newline='') as file:
                reader = csv.reader(file)
                for row in reader:
                    if entry in row:
                        return True
        except FileNotFoundError:
            pass
        return False

    def append_to_csv(self, filename, row):
        """Append a row to a CSV file if not already present."""
        if not self.record_exists(filename, row[0]):
            with open(filename, 'a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow(row)
                




    def simulate_scenario(self):
        """Simulate a random scenario with detailed role-based permissions."""
        scenario_name, scenario_details = random.choice(list(self.scenarios.items()))
        print(f"Scenario: {scenario_name}")

        # Randomly select one role from the chosen scenario to limit the execution to one role per run
        role, details = random.choice(list(scenario_details["roles"].items()))
        user = self.generate_random_name()
        print(f"\nExecuting User: {user} (Role: {role})")
        print("Access Consideration: Details for the scenario.")

        
        self.append_to_csv('user.csv', [user ])

    
        self.append_to_csv('role.csv', [role ])

        
        self.append_to_csv('assignment.csv', [user , role ])

        # Process only the first permission found for the randomly selected role
        for table, operations in details["permissions"].items():
            operation = operations[0]  # Select only the first permission
            # Splitting the operation to get the function (if any) and the column name
            parts = operation.split()
            if len(parts) > 1:
                column_name = parts[1]  # assuming the column name is always after the function (e.g., "SELECT o_totalprice")
            else:
                column_name = parts[0]  # in case there's no function prefix (e.g., just a column name)

            permission = f"{operation} on {table}"
            condition = details["row_conditions"] if details["row_conditions"] else ","  # Ensuring the comma for empty conditions
            # Format the output to meet the specific requirement
            policy_entry = [str(self.policy_id) , role , column_name , condition]

            # Append to policy.csv, format: id, role, column_name, condition,
            self.append_to_csv('policy.csv', policy_entry)

            # Increment policy ID for unique identifiers
            self.policy_id += 1
            break  # Ensure only one policy is output per execution by breaking out of the loop

        # Since we are controlling the execution to only one policy per run, no further return is needed after the break

# Initialize the simulator and run the simulation
simulator = TPCH_RBAC_Simulator()
simulator.simulate_scenario()



Scenario: Product Profitability Analysis

Executing User: Michael Burke (Role: Product Manager)
Access Consideration: Details for the scenario.


In [83]:
import csv
from IPython.display import FileLink, display

def print_csv_file(filename):
    """Print the contents of a CSV file."""
    try:
        with open(filename, newline='') as file:
            reader = csv.reader(file)
            for row in reader:
                print(', '.join(row))
    except FileNotFoundError:
        print(f"File {filename} not found.")

def create_download_link(filename):
    """Create a download link in the Jupyter Notebook for the given file."""
    try:
        # Display the file link
        display(FileLink(filename, result_html_prefix=f"Click here to download {filename}: "))
    except FileNotFoundError:
        print(f"Download link could not be created for {filename}. File does not exist.")

# Filenames of the CSV files
filenames = ['user.csv', 'role.csv', 'assignment.csv', 'policy.csv']

# Process each file to print contents and create a download link
for filename in filenames:
    print(f"\nContents of {filename}:")
    print_csv_file(filename)
    create_download_link(filename)



Contents of user.csv:
name;age
Alice;24
Bob;29



Contents of role.csv:
role
admin
user



Contents of assignment.csv:
user;role
Alice;admin
Bob;user



Contents of policy.csv:
policy_id;role;column_name;condition
1;admin;username;active=yes


In [84]:
import os

def clear_csv_files(filenames):
    for filename in filenames:
        # Ensure the directory exists, if it does not exist, create it
        os.makedirs(os.path.dirname(filename) or '.', exist_ok=True)
        # Open each file in write mode to clear it
        with open(filename, 'w') as file:
            pass  # Opening in 'w' mode and closing the file clears it

# List of filenames to clear
filenames = [
   'user.csv', 'role.csv', 'assignment.csv', 'policy.csv'
]

# Clear CSV files
clear_csv_files(filenames)

print("All CSV files have been cleared.")


All CSV files have been cleared.


In [87]:
import csv
from IPython.display import FileLink, display

def print_csv_file(filename):
    """Print the contents of a CSV file using a semicolon as the delimiter."""
    try:
        with open(filename, newline='') as file:
            reader = csv.reader(file, delimiter=';')  # Use semicolon as the delimiter
            for row in reader:
                print('; '.join(row))  # Join using semicolon to visually match the input format
    except FileNotFoundError:
        print(f"File {filename} not found.")

def create_download_link(filename):
    """Create a download link in the Jupyter Notebook for the given file."""
    try:
        # Display the file link
        display(FileLink(filename, result_html_prefix=f"Click here to download {filename}: "))
    except FileNotFoundError:
        print(f"Download link could not be created for {filename}. File does not exist.")

# Filenames of the CSV files
filenames = ['user.csv', 'role.csv', 'assignment.csv', 'policy.csv']

# Process each file to print contents and create a download link
for filename in filenames:
    print(f"\nContents of {filename}:")
    print_csv_file(filename)
    create_download_link(filename)



Contents of user.csv:
Michael Burke



Contents of role.csv:
Product Manager



Contents of assignment.csv:
Michael Burke,Product Manager



Contents of policy.csv:
0,Product Manager,p_partkey,p_size between 1 and 50
