In [1]:
import os
import pandas as pd
from utils.database import DatabaseConnection
from utils.logger import Logger


def read_msp_data(data_group: str, data_source: str, data_type: str, bank_code: str, start_period: str, end_period: str) -> dict:
    """
    Reads MSP data from the specified data source and returns a result dictionary.

    Args:
        data_group (str): The data group (e.g., "MSP2").
        data_source (str): The data source (e.g., "BSIS" or "EDI").
        data_type (str): The type of data to fetch.
        bank_code (str): Bank code to filter data. Use '*' for all banks.
        start_period (str): Start date of the period (YYYY-MM-DD).
        end_period (str): End date of the period (YYYY-MM-DD).

    Returns:
        dict: Contains Info, Debug, Contains SQL query, and column names.
    """
    logger = Logger()
    feedback = {"info": "", "debug": "", "df": pd.DataFrame()}
    result = {"info": "", "debug": "", "sql_query": "", "columns_names": []}

    
    try:
        with DatabaseConnection(data_source) as conn:
            #schema = "BSIS_DEV" if data_source in ["BSIS", "EDI"]
            schema = "BSIS_DEV" if data_source in ["BSIS", "EDI"] else ""
                 
            #Define SQL query 
            condition = "1=1" if bank_code == "*" else f"INSTITUTIONCODE = '{bank_code}'" 
            source =  f"""
                {schema}.MSP2_{data_type}
            """            
            sql = f"""
                SELECT * FROM {source}
                WHERE {condition}
                AND TRUNC(REPORTINGDATE) BETWEEN '{start_period}' AND '{end_period}'
            """
            result['sql_query']= sql
            
            #Fetch data
            data = conn.execute_query(sql)
            logger.info("Connected to data source and executed query.")
            
            #Define columns based on data_type
            columns_mapping = {
                "01": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "PARTICULARS", "AMOUNT"],
                "02": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "PARTICULARS", "AMOUNT", "YR_TO_DATE_AMOUNT"],
                "03": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "SECTOR", "BORROWERS", "OUTSTANDING_AMOUNT",
                       "CURRENT_AMOUNT", "ESM", "SUBSTANDARD", "DOUBTFUL", "LOSS", "WRITTENOFF"],
                "04": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "PARTICULARS", "BORROWERS",
                       "OUTSTANDING_AMOUNT", "WA_IRSLA", "NIRSLA_LOWEST", "NIRSLA_HIGHEST", "WA_IRRBA",
                       "NIRRBA_LOWEST", "NIRRBA_HIGHEST"],
                "05": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "PARTICULARS", "AMOUNT"],
                "06": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "PARTICULARS", "NUMBER_COMPLAINTS",
                       "VALUE_COMPLAINTS", "COMPLAINTS_IR", "COMPLAINTS_AGREEMENT", "COMPLAINTS_REPAYMENTS",
                       "COMPLAINTS_LOAN_ST", "COMPLAINTS_LOAN_PROC", "COMPLAINTS_OTHERS"],
                "07": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "PARTICULARS", "DEPOSIT_TZS",
                       "DEPOSIT_FOREIGN_EQV_TZS", "DEPOSIT_TOTAL", "LOAN_TZS", "LOAN_FOREIGN_EQV_TZS", "LOAN_TOTAL"],
                "08": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "PARTICULARS", "AMOUNT"],
                "09": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "PARTICULARS", "LOAN_FEMALE_NUMBER",
                       "LOAN_FEMALE_AMOUNT", "LOAN_MALE_NUMBER", "LOAN_MALE_AMOUNT", "LOAN_NUMBER", "LOAN_AMOUNT"],
                "10": ["INSTITUTIONCODE", "REPORTINGDATE", "DESCRIPTIONNO", "PARTICULARS", "BRANCHES", "EMPLOYEES",
                       "COMPULSORY_SAVINGS", "BORROWERS_TO35YRS_F", "BORROWERS_TO35YRS_M", "BORROWERS_ABOVE35YRS_F",
                       "BORROWERS_ABOVE35YRS_M", "LOANS_TO35YRS_F", "LOANS_TO35YRS_M", "LOANS_ABOVE35YRS_F",
                       "LOANS_ABOVE35YRS_M", "AMOUNT_TO35YRS_F", "AMOUNT_TO35YRS_M", "AMOUNT_ABOVE35YRS_F",
                       "AMOUNT_ABOVE35YRS_M"]
    
            }
            columns = columns_mapping.get(data_type)
            if not columns:
                raise ValueError(f"Invalid data_type '{data_type}'. No column mapping found.")
                
            #Construct DataFrame
            result["columns_names"] = columns 
            result["df"] = pd.DataFrame(data, columns=columns)
            logger.info("Data successfully retrieved and packed into a DataFrame.")

            
    except Exception as e:
        error_message = f"Error: {str(e)}"
        logger.error(error_message)
        result["debug"] = error_message

    return result





ImportError: cannot import name 'read_msp_data' from partially initialized module 'utils.data_reader' (most likely due to a circular import) (C:\_rwey\pers\PDP\MSC\UDSM\shule\Semester I\IS671_Py\mycodes\mycodes\MSPUsersDev\utils\data_reader.py)

In [None]:
"""
DATA PULL GUIDE
==============

To pull individual data reports use the following function 
   data = read_data(data_group, data_source, data_type, institution_code, start_period, end_period)   
   example: data = read_data('MSP','BSIS','01','M100','31-MAY-2024','31-JUL-2024')

To pull institution profile use the following function  
   data = read_profile('MSP','BSIS','M100')
   example:  data = read_profile(data_group, data_source,institution_code)
To extract 
   data frame use data['df']
   error use data['error']
   information use data['ínfo']
To save 
   example: data['df'].to_csv("data.csv")

   
Example:
Data Groups: MSP, BANK, ITRS, NPS, PF, FUNDS, MORGAGE, LEASING, TMS, FXCFMIS, CBR, DERP-DATA
Data Source:      BSIS, EDI
Data Type or ReportName: 
   MSP: 01,02,03,04,05,06,07,08,09,10
   ITRS:
Institution Code:
   MSP: M100
Start or end period should be in format 'DD-MON-YYYY' eg '24-DEC-2024'
"""

In [44]:
import pandas as pd
import numpy as np
#import sys
#import os

#sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), '../MSPUsersDev')))

#from mspLibrary import readData,cfg
from utils import read_data, read_profile


#DATA
res = read_profile('MSP','BSIS','M100')
#res = read_data('MSP','BSIS','01','M100','31-MAY-2024','31-JUL-2024')

#res = read_data('MSP','BSIS','01','M100','31-MAY-2024','31-DEC-2024')

Please log in to continue.
Username: RIBARONGO
Password: ········


10-01-2025 18:17:28 [INFO] Domain validation completed successfully


In [28]:
import requests
from utils.logger import Logger

LOGIN_URL = 'https://help.bot.go.tz:9090/'  # Ensure this is the correct login endpoint
CERT_PATH = r"C:\_rwey\pers\PDP\MSC\UDSM\shule\Semester I\IS671_Py\mycodes\mycodes\MSPUsersDev\certificate\_.bot.go.tz.crt"

def perform_domain_login(username, password):
    """Perform domain login and validate credentials."""
    logger = Logger()
    session = requests.Session()
    payload = {"username": username, "password": password, "submit": "Login"}
    
    try:
        # Send POST request to the login URL
        response = session.post(LOGIN_URL, data=payload, verify=CERT_PATH)
        
        # Log response status and content
        logger.info(f"Response status code: {response.status_code}")
        logger.info(f"Response content: {response.text[:200]}")  # Limit output for large responses
        
        # Check for successful login
        if response.status_code == 200:
            print(response.text)
            # Example: Check if the response indicates invalid credentials
            if "invalid credentials" in response.text.lower() or "error" in response.text.lower():
                logger.error("Login failed due to invalid credentials.")
                return False
            elif username[2:].lower() in response.text.lower() or  'successful' in response.text.lower():
                logger.info("Domain validation completed successfully. Login successful.")
                return True
            else:
                logger.warning("Login response ambiguous. Unable to determine success.")
                return False
        else:
            logger.error(f"Login failed with status code: {response.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        # Handle any network-related errors
        logger.error(f"Domain login error: {str(e).splitlines()[-1].strip()}")
        return False


# Example usage
if __name__ == "__main__":
    username = "RIBARONGO2"
    password = "dd"
    login_successful = perform_domain_login(username, password)
    print(f"Login successful: {login_successful}")


10-01-2025 16:40:21 [INFO] Response status code: 200
10-01-2025 16:40:21 [INFO] Response content: 









































<script>

    function insertParam(key, value)
    {
        key = encodeURI(key); value = encodeURI(value);
        var newUrl;
        if (location.search.l
10-01-2025 16:40:21 [ERROR] Login failed due to invalid credentials.












































<script>

    function insertParam(key, value)
    {
        key = encodeURI(key); value = encodeURI(value);
        var newUrl;
        if (location.search.length > 0) {
            var kvp = document.location.search.substr(1).split('&');

            var i = kvp.length;
            var x;
            while (i--) {
                x = kvp[i].split('=');

                if (x[0] == key) {
                    x[1] = value;
                    kvp[i] = x.join('=');
                    break;
                }
            }

            if (i < 0) {
                kvp[kvp.length] = [key, value].join('=');
            }
            //this will reload the page, it's likely better to store this until finished
            newUrl = kvp.join('&');
        } else {
            newUrl = key+'='+value;
        }
        document.location.search = newUrl;
    }

</script>


<script>
    insertParam('navLanguage', navigator.language);
</script>


<!DOCTYPE

In [22]:
username = 'ribarongo'
username[2:]

'barongo'

In [17]:
import requests
from utils.logger import Logger
LOGIN_URL = 'https://help.bot.go.tz:9090/'
CERT_PATH = r"C:\_rwey\pers\PDP\MSC\UDSM\shule\Semester I\IS671_Py\mycodes\mycodes\MSPUsersDev\certificate\_.bot.go.tz.crt"
def perform_domain_login(username, password):
    """Placeholder for domain login logic."""
    logger = Logger()
    session = requests.Session()
    payload = {"username": username, "password": password, "submit": "Login"}
    
    try:
        response = session.post(LOGIN_URL, data=payload, verify=CERT_PATH)
        logger.info(f"Response status code: {response.status_code}")
        logger.info(f"Response content: {response.text}")
        if response.status_code == 200:
            #if "invalid credentials" in response.text.lower() or "error" in response.text.lower():
            #    logger.error("Login failed due to invalid credentials.")
            #    return False
            logger.info("Domain validation completed successfully")
            return True
        else:
            logger.error(f"Login failed with status code: {response.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        # Convert the exception message to a string
        exception_message = str(e)    
        # Split the exception message into lines
        lines = exception_message.splitlines()    
        # Extract and display the last line
        if lines:
            last_line = lines[-1].strip()  # Strip removes leading/trailing whitespaces
            #logger.info(f"Domain login error: {last_line}")
        else:
            logger.info("Exception message is empty.")
        return False
   
  

perform_domain_login('RIBARONGO', 'xxx')



10-01-2025 16:07:16 [INFO] Response status code: 200
10-01-2025 16:07:16 [INFO] Response content: 









































<script>

    function insertParam(key, value)
    {
        key = encodeURI(key); value = encodeURI(value);
        var newUrl;
        if (location.search.length > 0) {
            var kvp = document.location.search.substr(1).split('&');

            var i = kvp.length;
            var x;
            while (i--) {
                x = kvp[i].split('=');

                if (x[0] == key) {
                    x[1] = value;
                    kvp[i] = x.join('=');
                    break;
                }
            }

            if (i < 0) {
                kvp[kvp.length] = [key, value].join('=');
            }
            //this will reload the page, it's likely better to store this until finished
            newUrl = kvp.join('&');
        } else {
            newUrl = key+'='+value;
        }
        document.location.search = newUrl;
   

10-01-2025 16:07:16 [INFO] Domain validation completed successfully


True

In [29]:
import requests
from utils.logger import Logger

LOGIN_URL = 'https://help.bot.go.tz:9090/'  # Ensure this is the correct login endpoint
CERT_PATH = r"C:\_rwey\pers\PDP\MSC\UDSM\shule\Semester I\IS671_Py\mycodes\mycodes\MSPUsersDev\certificate\_.bot.go.tz.crt"

def perform_domain_login(username, password):
    """Perform domain login and validate credentials."""
    logger = Logger()
    session = requests.Session()
    payload = {"username": username, "password": password, "submit": "Login"}
    
    try:
        # Send POST request to the login URL
        response = session.post(LOGIN_URL, data=payload, verify=CERT_PATH)
        
        # Log response status and content
        logger.info(f"Response status code: {response.status_code}")
        logger.info(f"Response snippet: {response.text[:200]}")  # Limit output for large responses
        
        # Check for successful login
        if response.status_code == 200:
            # Example: Check if the response indicates invalid credentials or successful login
            response_text_lower = response.text.lower()
            
            if "invalid credentials" in response_text_lower or "error" in response_text_lower:
                logger.error("Login failed: Invalid credentials.")
                return False
            
            # Check for keywords or personalized content indicating successful login
            if username[2:].lower() in response_text_lower or 'successful' in response_text_lower:
                logger.info("Domain validation completed successfully. Login successful.")
                return True
            
            # Ambiguous response
            logger.warning("Login response ambiguous. Unable to determine success.")
            return False
        else:
            logger.error(f"Login failed with unexpected status code: {response.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        # Handle any network-related errors
        error_message = str(e).splitlines()[-1].strip()  # Get the last line of the exception message
        logger.error(f"Domain login error: {error_message}")
        return False


# Example usage
if __name__ == "__main__":
    username = input("Enter username: ")
    password = input("Enter password: ")
    login_successful = perform_domain_login(username, password)
    print(f"Login successful: {login_successful}")

Enter username: ribarongo
Enter password: Bank@4321


10-01-2025 16:43:40 [INFO] Response status code: 200
10-01-2025 16:43:40 [INFO] Response snippet: 









































<script>

    function insertParam(key, value)
    {
        key = encodeURI(key); value = encodeURI(value);
        var newUrl;
        if (location.search.l
10-01-2025 16:43:40 [ERROR] Login failed: Invalid credentials.


Login successful: False


In [36]:
def binary_convert(mask: str) -> str:
    """
    Convert a character into an 8-bit binary representation.
    :param mask: The input character to convert.
    :return: The binary representation as a string of 8 bits.
    """
    # Get ASCII value of the character
    value = ord(mask)
    
    # Convert to binary (8 bits) and return
    return format(value, '08b')


def cryption(mask: str, data: str) -> str:
    """
    Encrypt a string using the binary conversion and a comparison bit by bit.
    :param mask: The mask string used for encryption.
    :param data: The data string to encrypt.
    :return: The encrypted string result.
    """
    # Ensure both strings have the same length by padding the shorter one with spaces
    max_len = max(len(mask), len(data))
    mask = mask.ljust(max_len)
    data = data.ljust(max_len)
    
    result = ''
    
    # Iterate over each character in the mask and data strings
    for i in range(len(mask)):
        r1 = binary_convert(mask[i])  # Convert mask character to binary
        r2 = binary_convert(data[i])  # Convert data character to binary
        
        divisor = 128
        r3 = 0
        
        # Compare bit by bit
        for j in range(8):
            if int(r1[j]) + int(r2[j]) == 1:
                r3 += divisor
            divisor //= 2
        
        # Append the resulting character to the result string
        result += chr(r3)
    
    return result


# Example usage
if __name__ == "__main__":
    mask = "THEBSISBANKOFTANZANIADARESSALAAM"
    #data = "SECRETENCRYPTIONTEXT123"
    data = 'BSIS12WEMA'
    
    # Encrypt the data using the mask
    encrypted_data = cryption(mask, data)
    
    print("Encrypted Data:", encrypted_data)


Encrypted Data: b{koftanzaniadaressalaam


In [43]:
import re

def binary_convert(mask: str) -> str:
    """
    Convert a character into an 8-bit binary representation.
    :param mask: The input character to convert.
    :return: The binary representation as a string of 8 bits.
    """
    value = ord(mask)
    return format(value, '08b')


def cryption(mask: str, data: str) -> str:
    """
    Encrypt a string using binary conversion and a comparison bit by bit.
    :param mask: The mask string used for encryption.
    :param data: The data string to encrypt.
    :return: The encrypted string result.
    """
    # Ensure both strings have the same length by padding the shorter one with spaces
    max_len = max(len(mask), len(data))
    mask = mask.ljust(max_len)
    data = data.ljust(max_len)
    
    result = ''
    
    # Iterate over each character in the mask and data strings
    for i in range(len(mask)):
        r1 = binary_convert(mask[i])  # Convert mask character to binary
        r2 = binary_convert(data[i])  # Convert data character to binary
        
        divisor = 128
        r3 = 0
        
        # Compare bit by bit
        for j in range(8):
            if int(r1[j]) + int(r2[j]) == 1:
                r3 += divisor
            divisor //= 2
        
        # Append the resulting character to the result string
        result += chr(r3)
    
    return result


def remove_non_encrypted_words(data: str, encrypted_data: str) -> str:
    """
    Removes non-encrypted alphabetic characters that are similar to the data.
    :param data: The original data (potential plaintext).
    :param encrypted_data: The encrypted data.
    :return: The data with non-encrypted words removed.
    """
    # Compare the encrypted data with the original data and remove matching parts
    non_encrypted = re.sub(r'[a-zA-Z]+$', '', encrypted_data)
    return non_encrypted


# Example usage
if __name__ == "__main__":
    mask = "THEBSISBANKOFTANZANIADARESSALAAM"
    data = "BSIS12XJBQ"
    
    # Encrypt the data using the mask
    encrypted_data = cryption(mask, data)
    
    # Print the encrypted data
    print("Encrypted Data:", encrypted_data)
    
    # Remove non-encrypted words similar to the data
    cleaned_encrypted_data = remove_non_encrypted_words(data, encrypted_data)
    
    print("Cleaned Encrypted Data:", cleaned_encrypted_data)


Encrypted Data: b{koftanzaniadaressalaam
Cleaned Encrypted Data: b{


In [37]:
import oracledb

# Oracle Database connection details
username = 'BSIS_DEV'
password = 'bsis_dev'
dsn = '172.16.1.167:1521/BOT1DB'

# Connect to the Oracle database
connection = oracledb.connect(user=username, password=password, dsn=dsn)

# Create a cursor object
cursor = connection.cursor()

# Define the parameters for the PL/SQL procedure
mask = 'THEBSISBANKOFTANZANIADARESSALAAM'  # Example mask value
#data = 'SECRETENCRYPTIONTEXT123'           # Example data value
data = 'BSIS12WEMA'

# Define a variable to store the encrypted data (output parameter)
encrypted_data = cursor.var(str)  # Define a bind variable for output (encrypted data)

# Call the PL/SQL procedure
cursor.callproc('dt_data_cryption_procedure', [mask, data, encrypted_data])

# Commit if the procedure modifies the database
connection.commit()

# Print the encrypted data returned by the procedure
print("Encrypted Data:", encrypted_data.getvalue())

# Close the cursor and connection
cursor.close()
connection.close()

print("PL/SQL procedure called successfully.")


Encrypted Data: b{KOFTANZANIADARESSALAAM
PL/SQL procedure called successfully.


In [2]:
import oracledb

# Oracle Database connection details
username = 'BSIS_DEV'
password = 'bsis_dev'
dsn = '172.16.1.167:1521/BOT1DB'

# Connect to the Oracle database
connection = oracledb.connect(user=username, password=password, dsn=dsn)

# Create a cursor object
cursor = connection.cursor()

# Username and password to check
username_input = 'RIBARONGO'
password_input = 'BSIS12XJBQ'

# Define a variable to store the output (1 or 0)
match_result = cursor.var(int)

# Call the PL/SQL procedure
cursor.callproc('dt_match_user_password', [username_input, password_input, match_result])

# Fetch the result of the match (1 if passwords match, 0 if they do not)
print("Password match result:", match_result.getvalue())

# Commit if the procedure modifies the database (in this case, no DB modification happens)
connection.commit()

# Close the cursor and connection
cursor.close()
connection.close()

print("PL/SQL procedure called successfully.")


Password match result: 0
PL/SQL procedure called successfully.


In [38]:
import re

def remove_non_encrypted_words(data):
    # This regex will match any sequence of alphabetic characters (words) at the end of the string
    cleaned_data = re.sub(r'[a-zA-Z]+$', '', data)
    return cleaned_data

# Example input data
data = 'THEBSISBANKOFTANZANIADARESSALAAM'

# Call the function to remove the alphabetic words at the end
cleaned_data = remove_non_encrypted_words(data)

# Print the cleaned result
print("Cleaned Data:", cleaned_data)


Cleaned Data: 


In [11]:
res['debug']

'User authentication failed.'

In [3]:
res['df']

In [3]:
import os

In [4]:
import os

# Get the current working directory
current_directory = os.getcwd()

print("Current Directory:", current_directory)

Current Directory: C:\_rwey\pers\PDP\MSC\UDSM\shule\Semester I\IS671_Py\mycodes\mycodes\MSPUsersDev


In [None]:

certificate_path = os.path.join(os.getcwd(), "certificate", "_.bot.go.tz.crt")

In [9]:
import requests
import os

# Define the URL for login (You will need to inspect the page for the correct login URL)
login_url = "https://help.bot.go.tz:9090"  # Update with actual login URL if different

# Define your login credentials (replace with actual credentials)
username = "xxxx"
password = "xxxx"

# Path to the SSL certificate file you downloaded
#cert_path = "/path/to/help_bot_go_tz.crt"  # Update with the correct path to the certificate file
cert_path = os.path.join(os.getcwd(), "certificate", "_.bot.go.tz.crt")
# Create a session to maintain cookies across requests
session = requests.Session()

# Define the payload with the login form fields (update field names based on actual form)
payload = {
    "username": username,  # Field name for username (update with actual field name)
    "password": password,  # Field name for password (update with actual field name)
    "submit": "Login"  # Field name for submit button (update if needed)
}

# Send the POST request to the login URL with certificate verification
try:
    response = session.post(login_url, data=payload, verify=cert_path)

    # Check if login was successful by examining the response status or content
    if response.status_code == 200:
        print("Login successful!")
    else:
        print(f"Login failed with status code: {response.status_code}")
        print("Response content:", response.text)  # Optionally print the response content to debug

except requests.exceptions.RequestException as e:
    print(f"An error occurred: {e}")



Login successful!


In [2]:
import requests

# Define the URL for login (You will need to inspect the page for the correct login URL)
login_url = "https://help.bot.go.tz/login"  # Update with actual login URL if different

# Define your login credentials (replace with actual credentials)
username = "ribarongo"
password = "Bank@4321"

# Create a session to maintain cookies across requests
session = requests.Session()

# Define the payload with the login form fields (update field names based on actual form)
payload = {
    "username": username,  # Field name for username (update with actual field name)
    "password": password,  # Field name for password (update with actual field name)
    "submit": "Login"  # Field name for submit button (update if needed)
}

# Send the POST request to the login URL
try:
    response = session.post(login_url, data=payload)

    # Check if login was successful by examining the response status or content
    if response.status_code == 200:
        print("Login successful!")
    else:
        print(f"Login failed with status code: {response.status_code}")
        print("Response content:", response.text)  # Optionally print the response content to debug

except requests.exceptions.RequestException as e:
    print(f"An error occurred: {e}")


An error occurred: HTTPSConnectionPool(host='help.bot.go.tz', port=443): Max retries exceeded with url: /login (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate (_ssl.c:997)')))


In [1]:
#Check if BSIS is Reachable
import requests

# Define the URL
url = "http://172.16.2.93:8888/forms/frmservlet?config=bsisapp"

# Send an HTTP GET request
try:
    response = requests.get(url)

    # Check if the request was successful (status code 200)
    if response.status_code == 200:
        print("The application is reachable. Status code:", response.status_code)
    else:
        print(f"Request failed. Status code: {response.status_code}")

except requests.exceptions.RequestException as e:
    print(f"An error occurred: {e}")


The application is reachable. Status code: 200


In [10]:
import requests
from requests.auth import HTTPBasicAuth

# Define the URL and credentials
url = "http://172.16.2.93:8888/forms/frmservlet?config=bsisapp"
username = "BSIS_DEV"  # Replace with your actual username
password = "bsis_dev"  # Replace with your actual password

# Send an HTTP GET request with basic authentication
try:
    response = requests.get(url, auth=HTTPBasicAuth(username, password))

    # Check if the request was successful (status code 200)
    if response.status_code == 200:
        print("Request succeeded!")
        print("Response content:", response.text[:500])  # Print first 500 characters of the response content
    else:
        print(f"Request failed with status code {response.status_code}.")
        print("Response content:", response.text[:500])  # Print first 500 characters of the response content

except requests.exceptions.RequestException as e:
    print(f"An error occurred: {e}")

Request failed with status code 401.
Response content: <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Draft//EN">
<HTML>
<HEAD>
<TITLE>Error 401--Unauthorized</TITLE>
</HEAD>
<BODY bgcolor="white">
<FONT FACE=Helvetica><BR CLEAR=all>
<TABLE border=0 cellspacing=5><TR><TD><BR CLEAR=all>
<FONT FACE="Helvetica" COLOR="black" SIZE="3"><H2>Error 401--Unauthorized</H2>
</FONT></TD></TR>
</TABLE>
<TABLE border=0 width=100% cellpadding=10><TR><TD VALIGN=top WIDTH=100% BGCOLOR=white><FONT FACE="Courier New"><FONT FACE="Helvetica" SIZE="3"><H3>From RFC 2068 <i>H


In [6]:
import requests
from requests.auth import HTTPBasicAuth

# Define the URL and credentials
url = "http://172.16.2.93:8888/forms/frmservlet?config=bsisapp"
username = "RIBARONGO"  # Replace with your actual username
password = "BSIS12OWEMA"  # Replace with your actual password

# Send an HTTP GET request with basic authentication
try:
    response = requests.get(url, auth=HTTPBasicAuth(username, password))

    # Check if the request was successful (status code 200)
    if response.status_code == 200:
        print("Request succeeded!")
        print("Response content:", response.text[:500])  # Print first 500 characters of the response content
    else:
        print(f"Request failed with status code {response.status_code}.")
        print("Response content:", response.text[:500])  # Print first 500 characters of the response content

except requests.exceptions.RequestException as e:
    print(f"An error occurred: {e}")


Request failed with status code 401.
Response content: <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Draft//EN">
<HTML>
<HEAD>
<TITLE>Error 401--Unauthorized</TITLE>
</HEAD>
<BODY bgcolor="white">
<FONT FACE=Helvetica><BR CLEAR=all>
<TABLE border=0 cellspacing=5><TR><TD><BR CLEAR=all>
<FONT FACE="Helvetica" COLOR="black" SIZE="3"><H2>Error 401--Unauthorized</H2>
</FONT></TD></TR>
</TABLE>
<TABLE border=0 width=100% cellpadding=10><TR><TD VALIGN=top WIDTH=100% BGCOLOR=white><FONT FACE="Courier New"><FONT FACE="Helvetica" SIZE="3"><H3>From RFC 2068 <i>H


In [4]:
import pandas as pd
from utils.data_reader import read_data

def main():
    # Define parameters
    data_source = "BSIS"
    data_type = "01"
    bank_code = "M100"
    start_period = "30-SEP-2024"
    end_period = "30-DEC-2024"

    # Fetch data
    result = read_data(data_source, data_type, bank_code, start_period, end_period)

    # Process result
    if not result['df'].empty:
        print("Data fetched successfully!")
        print(result['df'].head())
    else:
        print("No data returned.")
        print("Info log:", result['info'])
        print("Debug log:", result['debug'])

if __name__ == "__main__":
    main()


SyntaxError: f-string expression part cannot include a backslash (data_reader.py, line 38)

In [None]:
import pandas as pd
import numpy as np
from read_data import readData

# Access the cfg attribute from the module
#cfg = mspLibrary.cfg

# READING MSP2 DATA
# Create folder C:\MSP\dataFiles
# readData(dataSource, dataType, bankCode, startPeriod, endPeriod)
# dataSource: 'EDI', 'BSIS'
# dataType: '01' to '10'
# mspCode: MSP code eg 'M100' or '*' for all
# start/endPeriod: should be in format 'DD-MON-YYYY'
res = readData('BSIS', '01', 'M100', '30-SEP-2024', '30-DEC-2024')


In [8]:
res['df'].head(3)

Unnamed: 0,INSTITUTIONCODE,REPORTINGDATE,DESCRIPTIONNO,PARTICULARS,AMOUNT
0,M100,2024-09-30,1,1. CASH AND CASH EQUIVALENTS (sum a:d),7225920.09
1,M100,2024-09-30,2,(a) Cash in Hand,7100000.0
2,M100,2024-09-30,3,(b) Balances with Banks and Financial Ins...,125920.09


In [9]:
res['info']

''

In [10]:
res['debug']

''

In [11]:
res['df'].head(5)

Unnamed: 0,INSTITUTIONCODE,REPORTINGDATE,DESCRIPTIONNO,PARTICULARS,AMOUNT
0,M100,2024-09-30,1,1. CASH AND CASH EQUIVALENTS (sum a:d),7225920.09
1,M100,2024-09-30,2,(a) Cash in Hand,7100000.0
2,M100,2024-09-30,3,(b) Balances with Banks and Financial Ins...,125920.09
3,M100,2024-09-30,4,(i) Non-Agent Banking Balances,125920.09
4,M100,2024-09-30,5,(ii) Agent-Banking Balances,0.0


In [31]:
res['df'].shape

(143405, 5)

In [32]:
#export to csv
res['df'].to_csv(cfg.dataCollectionFolder + "data.csv")

In [7]:
df1 = res['df']

In [8]:
df1.columns

Index(['INSTITUTIONCODE', 'REPORTINGDATE', 'DESCRIPTIONNO', 'PARTICULARS',
       'AMOUNT'],
      dtype='object')

In [9]:
df1.shape

(143405, 5)

In [22]:
df1.dtypes

INSTITUTIONCODE            object
REPORTINGDATE      datetime64[ns]
DESCRIPTIONNO               int64
PARTICULARS                object
AMOUNT                    float64
dtype: object

In [23]:
mask = ((df1['DESCRIPTIONNO']==51) & (df1['AMOUNT'] < 20000000 ))


In [24]:
df1[mask]

Unnamed: 0,INSTITUTIONCODE,REPORTINGDATE,DESCRIPTIONNO,PARTICULARS,AMOUNT
718,MF08,2024-03-31,51,15. TOTAL CAPITAL (sum a:i),14513000.00
2310,M763,2024-03-31,51,15. TOTAL CAPITAL (sum a:i),15160250.00
2371,M470,2024-03-31,51,15. TOTAL CAPITAL (sum a:i),0.00
3064,ME81,2024-03-31,51,15. TOTAL CAPITAL (sum a:i),17444930.00
3305,MC94,2024-03-31,51,15. TOTAL CAPITAL (sum a:i),17217769.00
...,...,...,...,...,...
139360,MC35,2024-06-30,51,15. TOTAL CAPITAL (sum a:i),12163241.00
139375,MC35,2024-09-30,51,15. TOTAL CAPITAL (sum a:i),16693111.00
142113,MA50,2024-09-30,51,15. TOTAL CAPITAL (sum a:i),8013976.00
142351,M768,2024-09-30,51,15. TOTAL CAPITAL (sum a:i),11381080.31


In [2]:
from licenseKeys import encrypt_value, decrypt_value

In [8]:
# Inputs
keyword = "spectacular"  
date = "30-DEC-2024"
expire_days = 90

# Encrypt
encrypted = encrypt_value(keyword, date, expire_days)
print(f"Encrypted: {encrypted}")

# Decrypt
decrypted = decrypt_value(keyword, encrypted)
print(f"Decrypted: {decrypted}")

Encrypted: TP8hfxtI+9HR3tSFQSo5PEFsCeFz8vdvB41GXyl4BANybnIyb+PSKK5TqdRssEfj
Decrypted: {'value': '30-DEC-2024', 'expire_date': '30-Mar-2025'}


In [4]:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.padding import PKCS7
import base64
import os
from datetime import datetime, timedelta

# Constants
BACKEND = default_backend()
SALT = b'some_salt'  # Ensure this is unique and secure in real usage

# Encryption Key Derivation
def derive_key(keyword: str) -> bytes:
    """Derive a symmetric key from a keyword."""
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=SALT,
        iterations=100_000,
        backend=BACKEND,
    )
    return kdf.derive(keyword.encode())

# Encryption Function
def encrypt_value(keyword: str, credentials: str, date: str, expire_date: str) -> str:
    """
    Encrypt a value and its expiration date.
    
    Args:
        keyword (str): The encryption key (user input).
        credentials (str): Colon-separated credentials.
        date (str): Date in 'DD-MON-YYYY' format.
        expire_date (str): Expiration date in 'DD-MON-YYYY' format.

    Returns:
        str: Encrypted string (base64 encoded).
    """
    key = derive_key(keyword)

    # Generate a random IV
    iv = os.urandom(16)

    # Create the cipher with the IV
    cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=BACKEND)
    encryptor = cipher.encryptor()

    # Prepare plaintext: credentials + date + expiration date
    plaintext = f"{credentials},{date},{expire_date}"
    padder = PKCS7(algorithms.AES.block_size).padder()
    padded_plaintext = padder.update(plaintext.encode()) + padder.finalize()

    # Encrypt
    ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()

    # Return base64-encoded ciphertext with IV
    return base64.b64encode(iv + ciphertext).decode()

# Decryption Function
def decrypt_value(keyword: str, encrypted_value: str) -> dict:
    """
    Decrypt an encrypted string to retrieve the original credentials, date, and expiration date.

    Args:
        keyword (str): The decryption key (user input).
        encrypted_value (str): The encrypted base64 string.

    Returns:
        dict: Dictionary containing `credentials`, `date`, and `expire_date`.
    """
    key = derive_key(keyword)
    encrypted_data = base64.b64decode(encrypted_value)

    # Extract IV and ciphertext
    iv = encrypted_data[:16]
    ciphertext = encrypted_data[16:]

    # Create the cipher with the IV
    cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=BACKEND)
    decryptor = cipher.decryptor()

    # Decrypt and unpad
    padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
    unpadder = PKCS7(algorithms.AES.block_size).unpadder()
    plaintext = unpadder.update(padded_plaintext) + unpadder.finalize()

    # Split credentials, date, and expiration date
    credentials, date, expire_date = plaintext.decode().split(",")
    return {"credentials": credentials, "date": date, "expire_date": expire_date}

# Example Usage
if __name__ == "__main__":
    # Inputs
    keyword = "spectacular"  
    credentials = "BSIS_dev:bsis_dev:EDI:edi"
    date = "30-DEC-2024"
    expire_date = "30-JAN-2025"

    # Encrypt
    encrypted = encrypt_value(keyword, credentials, date, expire_date)
    print(f"Encrypted: {encrypted}")

    # Decrypt
    decrypted = decrypt_value(keyword, encrypted)
    print(f"Decrypted: {decrypted}")


Encrypted: fddE8nbw8g1PiLmbLASS3S5ma9DleTGiIBeCInaK2eSfACV65n5VJMyb4Xj27Unqw4+bPxlxyeOKKWs+u1mQZZ3rWlf9X0vdhzyUICxXGVY=
Decrypted: {'credentials': 'BSIS_dev:bsis_dev:EDI:edi', 'date': '30-DEC-2024', 'expire_date': '30-JAN-2025'}


In [3]:
listOfCredentials = decrypted['credentials'].split(':')

In [4]:
listOfCredentials[0]

'BSIS_dev'

In [5]:
listOfCredentials[1]

'bsis_dev'

In [6]:
listOfCredentials[2]

'EDI'

In [7]:
listOfCredentials[3]

'edi'

In [5]:
keyword = "spectacular"  
credentials = "BI_E rkuSSDV:bi_e rkussdv:EIrkuD:eirkud"
date = "30-DEC-2024"
expire_date = "30-JAN-2025"
encrypt_value(keyword, credentials, date, expire_date)

't2I3yzGyFyo11MAbHupPPm8uTcDeIUo+sm9ekRxH795lnX94Bog30K+nGm6EMTYwPqXB4spaHEdNqQj99lGct179PaW2QSYFn/9jYcvLOMo='

In [6]:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.padding import PKCS7
import base64
import os
from datetime import datetime, timedelta

# Constants
BACKEND = default_backend()
SALT = b'some_salt'  # Ensure this is unique and secure in real usage

# Encryption Key Derivation
def derive_key(keyword: str) -> bytes:
    """Derive a symmetric key from a keyword."""
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=SALT,
        iterations=100_000,
        backend=BACKEND,
    )
    return kdf.derive(keyword.encode())

# Encryption Function
def encrypt_value(keyword: str, credentials: str, date: str, days_to_expire: int) -> str:
    """
    Encrypt a value and calculate expiration based on days to expire.
    
    Args:
        keyword (str): The encryption key (user input).
        credentials (str): Colon-separated credentials.
        date (str): Date in 'DD-MON-YYYY' format.
        days_to_expire (int): Number of days until the expiration date.

    Returns:
        str: Encrypted string (base64 encoded).
    """
    key = derive_key(keyword)

    # Generate a random IV
    iv = os.urandom(16)

    # Calculate expiration date
    start_date = datetime.strptime(date, "%d-%b-%Y")
    expire_date = (start_date + timedelta(days=days_to_expire)).strftime("%d-%b-%Y")

    # Create the cipher with the IV
    cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=BACKEND)
    encryptor = cipher.encryptor()

    # Prepare plaintext: credentials + date + expiration date
    plaintext = f"{credentials},{date},{expire_date}"
    padder = PKCS7(algorithms.AES.block_size).padder()
    padded_plaintext = padder.update(plaintext.encode()) + padder.finalize()

    # Encrypt
    ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()

    # Return base64-encoded ciphertext with IV
    return base64.b64encode(iv + ciphertext).decode()

# Decryption Function
def decrypt_value(keyword: str, encrypted_value: str) -> dict:
    """
    Decrypt an encrypted string to retrieve the original credentials, date, and expiration date.

    Args:
        keyword (str): The decryption key (user input).
        encrypted_value (str): The encrypted base64 string.

    Returns:
        dict: Dictionary containing `credentials`, `date`, and `expire_date`.
    """
    key = derive_key(keyword)
    encrypted_data = base64.b64decode(encrypted_value)

    # Extract IV and ciphertext
    iv = encrypted_data[:16]
    ciphertext = encrypted_data[16:]

    # Create the cipher with the IV
    cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=BACKEND)
    decryptor = cipher.decryptor()

    # Decrypt and unpad
    padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
    unpadder = PKCS7(algorithms.AES.block_size).unpadder()
    plaintext = unpadder.update(padded_plaintext) + unpadder.finalize()

    # Split credentials, date, and expiration date
    credentials, date, expire_date = plaintext.decode().split(",")
    return {"credentials": credentials, "date": date, "expire_date": expire_date}

# Example Usage
if __name__ == "__main__":
    # Inputs
    keyword = "spectacular"  
    credentials = "BSIS_dev:bsis_dev:EDI:edi"
    date = "30-DEC-2024"
    days_to_expire = 30

    # Encrypt
    encrypted = encrypt_value(keyword, credentials, date, days_to_expire)
    print(f"Encrypted: {encrypted}")

    # Decrypt
    decrypted = decrypt_value(keyword, encrypted)
    print(f"Decrypted: {decrypted}")


Encrypted: 6ODMSV7vzyuKMXyatsiacLeeQALb7Sr8WNU272YX3BIzm3FG8YQJzenJL7IU23OknrqMzt8DOvaA2ZeIxtu/cYv6IElcbCy/a5zVZlrqupk=
Decrypted: {'credentials': 'BSIS_dev:bsis_dev:EDI:edi', 'date': '30-DEC-2024', 'expire_date': '29-Jan-2025'}


In [8]:
keyword = "spectacular"  
credentials = "BI_E rkuSSDV:bi_e rkussdv:EIrkuD:eirkud"
date = "30-DEC-2024"
expire_date = 30
encrypt_value(keyword, credentials, date, expire_date)

'qwq7uGfuAqeO1esUvLnw8+E03/oKA97gEV1SaiCOMHXRixKe0iYJBSclVu+4RMi9l+xPdbNwKlqNNYd+crb5jDCh5Gj+PnQpfnLOSSRWG5M='

In [9]:
decrypt_value(keyword,'qwq7uGfuAqeO1esUvLnw8+E03/oKA97gEV1SaiCOMHXRixKe0iYJBSclVu+4RMi9l+xPdbNwKlqNNYd+crb5jDCh5Gj+PnQpfnLOSSRWG5M=')

{'credentials': 'BI_E rkuSSDV:bi_e rkussdv:EIrkuD:eirkud',
 'date': '30-DEC-2024',
 'expire_date': '29-Jan-2025'}