In [31]:
import boto3
import time
import json
import psycopg2
from datetime import datetime
import configparser
import pandas as pd
from sqlalchemy import create_engine

import binascii
from cryptography.fernet import Fernet


In [32]:

def decrypt_with_password(key, encrypted_data):
    
    '''
    Function to decrypt data using the key.
    '''

    enc=binascii.unhexlify(encrypted_data)
    cipher_suite = Fernet(key)
    decrypted_data = cipher_suite.decrypt(enc).decode()
    return decrypted_data

In [36]:
def main():
    
    """
    Main function that connect to sqs and call subroutine for 
    transforming, loading data into postgres and poll continuosly
    waiting for new message
    """
    
    config = configparser.ConfigParser()
    config.read('cred.ini')
    
    # Getting all the credential/configuration value from cred.ini
    passwd =config.get('hash','passwd')
    salt =config.get('hash','salt')
    if config.has_option('hash','key'):
        key = binascii.unhexlify(config.get('hash','key'))
        
    
    db_params = {
        "host": config.get('postgres', 'host'),
        "database": config.get('postgres', 'db'),
        "user": config.get('postgres', 'user_id'),
        "password": config.get('postgres', 'password'),
        "port": config.get('postgres', 'port')
    }


    while True:
        try:
            # Establish a connection
            engine = create_engine(f'postgresql+psycopg2://{db_params["user"]}:{db_params["password"]}@{db_params["host"]}:{db_params["port"]}/{db_params["database"]}')



            # Execute a SELECT query
            query = str(input("Enter the query to run on postgres user_logins table having both the fields masked_device_id and masked_ip\n\nType exit to quit\n"))
            print("\nEntered query : \n%s"%query)
            if query.lower() != "exit":
                df = pd.read_sql_query(query, engine)
                df['ip'] = df.masked_ip.apply(lambda x: decrypt_with_password(key,x))
                df['device_id'] = df.masked_device_id.apply(lambda x: decrypt_with_password(key,x))
                print(df)
            else:
                break

        except psycopg2.Error as e:
            print("Error:", e)

        finally:
            # Close the database connection
            engine.dispose()


        # Added optional sleep to control the polling rate
        time.sleep(1)  




In [37]:
if __name__ == "__main__":
    main()

Enter the query to run on postgres user_logins table having both the fields masked_device_id and masked_ip

Type exit to quit
select * from user_logins;

Entered query : 
select * from user_logins;
                                 user_id device_type  \
0   424cdd21-063a-43a7-b91b-7ca1a833afae     android   
1   c0173198-76a8-4e67-bfc2-74eaa3bbff57         ios   
2   66e0635b-ce36-4ec7-aa9e-8a8fca9b83d4         ios   
3   181452ad-20c3-4e93-86ad-1934c9248903     android   
4   60b9441c-e39d-406f-bba0-c7ff0e0ee07f     android   
..                                   ...         ...   
94  2ad5985b-dc70-44ca-b3ed-f4246a42f611     android   
95  ca322b6c-97f8-4075-9663-42cebeb1d26e     android   
96  d4f4f380-349e-48ac-bdc3-5ee4e29b57c1         ios   
97  792e3e1f-bf84-409e-925c-d653fab4b6be         ios   
98  4e216e8b-1682-4bc0-9b47-0777b4a77a9e         ios   

                                            masked_ip  \
0   674141414141426c455550393952514b5234477352546e...   
1   67414141414