- Script diseñado para la migración de usuarios de una aplicación web.
- En la aplicación web de origen, la gestión de usuarios se lleva a cabo a través de Keycloak.
- El propósito de este script es prescindir del uso de la API de la aplicación web, evitando así reenviar mensajes de bienvenida a los usuarios, verificar sus credenciales y asignarles roles.
- Para esto interactuaremos tanto con la base de datos de la apliacion web como con la base de datos de Keycloak.

In [None]:
import pandas as pd
import numpy as np

import pyodbc
import mysql.connector
from azure.storage.blob import BlobClient, ContainerClient, BlobServiceClient

import requests
import random
import string

import time

In [None]:
class UserMigration:
    
    def __init__(self):
        # Configuración de Keycloak
        self._keycloak_base_url = "https://website/auth"
        self._realm_name = "realm-name"
        self._client_id = "client-id"
        self._client_secret = "client-secret"
        
        # Configuración de Azure
        self._connect_string = "azure-connection-string"
        self._conn = None
        
        # Resultados
        self._results = pd.DataFrame(columns=['uuid', 'keycloak', 'bbdd', 'contraseña', 'usuario'])
        
    def azure_connection(self):
        # Establecer conexión con la base de datos Azure
        try:
            conn = pyodbc.connect(self._connect_string)
            self._conn = conn
            print('Conectado..')
        except pyodbc.Error as e:
            print(f'Error de pyodbc: {e}')
        except Exception as e:
            print(f'Ocurrió un error: {e}')
            
    def azure_close(self):
        # Cerrar la conexión con la base de datos Azure
        try:
            conn = self._conn.close()
            print('Conexion terminada..')
        except Exception as e:
            print('Error')
        
    @contextmanager
    def database_transaction(self):
        try:
            yield
            self._conn.commit()
        except pyodbc.Error as e:
            print(f'Error en la base de datos: {db_e}')
            self._conn.rollback()
            raise

    def get_access_token(self):
        # Obtener el token de acceso de Keycloak
        token_url = f'{self._keycloak_base_url}/realms/{self._realm_name}/protocol/openid-connect/token'
        token_data = {
            'grant_type': 'client_credentials',
            'client_id': self._client_id,
            'client_secret': self._client_secret
        }
        response = requests.post(token_url, data=token_data)

        if response.status_code == 200:
            return response.json().get('access_token')
        else:
            raise Exception(f'Error al obtener el token de acceso: {response.text}')
               
    def generate_password(self, length=12):
        # Generar una contraseña aleatoria
        characters = string.ascii_letters + string.digits + string.punctuation
        return ''.join(random.choice(characters) for i in range(length))
    
    def get_user_keycloak_id(self, username):
        # Obtener el ID del usuario en Keycloak a partir del nombre de usuario
        user_endpoint = f'{self._keycloak_base_url}/admin/realms/{self._realm_name}/users'

        headers = {
            'Authorization': f'Bearer {self.get_access_token()}',
            'Content-Type': 'application/json'
        }

        response = requests.get(user_endpoint, headers=headers, params={'username': username})

        if response.status_code == 200:
            user_data = response.json()
            if len(user_data) > 0:
                user_id = user_data[0]['id']
                return user_id
            else:
                print(f'No se encontró un usuario con el nombre de usuario {username}')
        else:
            print(f'Error al obtener el usuario: {response.status_code}: {response.text}')

    def update_dataframe_row(self, row, value):
        # Actualizar una fila en el DataFrame de resultados
        self._results.at[self._results.index[-1], row] = value
             
    def create_user(self, row):
        # Crear un nuevo usuario en Keycloak y almacenar la información en Azure
        password = self.generate_password()
        user_data = {
            'username': row.get('Email'),
            'firstName': row.get('FirstName'),
            'lastName': row.get('LastName'),
            'email': row.get('Email'),
            'enabled': True,
            'credentials': [{'type': 'password', 'value': password, 'temporary': False}],
            'emailVerified': True,
            'requiredActions': ['UPDATE_PASSWORD']
        }
        
        create_user_url = f'{self._keycloak_base_url}/admin/realms/{self._realm_name}/users'

        max_retries_keycloak = 5
        for _ in range(max_retries_keycloak):
            try:
                response = requests.post(create_user_url, json=user_data, headers={'Authorization': f'Bearer {self.get_access_token()}'}, timeout=5)
                break  # Salir del bucle si la solicitud tiene éxito
            except (requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
                print(f"Error en la solicitud a Keycloak: {e}. Reintentando...")
                time.sleep(50)  # Esperar antes de reintentar
                
        self._results = pd.concat([self._results, pd.DataFrame([{'uuid': None, 'usuario': None, 'keycloak': 'Éxito', 'bbdd': None, 'contraseña': password}])], ignore_index=True)       

        try:
            with self.database_transaction():
                if response.status_code == 201:
                    row['uuid'] = self.get_user_keycloak_id(row['Email'])
                    self.update_dataframe_row('uuid', row.get('uuid'))
                    self.update_dataframe_row('usuario', row.get('Email'))
                    print(f'Usuario {row["Email"]} insertado con éxito en Keycloak')

                    query = "INSERT INTO [User] (Id, FirstName, LastName, Email, CreatedAt, RoleId, Address, BirthDate, City, Country, Language, PostalCode, Province, PaymentId, ThumbnailId, SendEmailNotifications) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
                    values = (str(row['uuid']), row['FirstName'], row['LastName'], row['Email'], row['CreatedAt'], str(row['RoleId']), row['Address'], row['BirthDate'], row['City'], row['Country'], row['Language'], None, row['Province'], None, None, bool(row['SendEmailNotifications']))
                    self._conn.execute(query, values)

                    self.update_dataframe_row('bbdd', 'Éxito')

                    self.roles(row['uuid'], row['RoleId'])
                    print(f'Usuario {row["Email"]} insertado con éxito en Azure')

                else:
                    print(f'{row["Email"]} **No** Insertado en Keycloak {response.status_code}: {response.text}')
                    self.update_dataframe_row('usuario', row.get('Email'))
                    self.update_dataframe_row('keycloak', 'Fallo')
                    self.update_dataframe_row('contraseña', 'Fallo')
                    self.update_dataframe_row('bbdd', 'Fallo en Keycloak')

        except pyodbc.Error as db_error:
            print(f'Error en la base de datos: {db_error}')
            self.update_dataframe_row('bbdd', 'Fallo en la base de datos')
        except Exception as ex:
            print(f'Error general: {str(ex)}')
            self.update_dataframe_row('uuid', row.get('uuid'))
            self.update_dataframe_row('usuario', row.get('Email'))
            self.update_dataframe_row('keycloak', 'Fallo')
            self.update_dataframe_row('contraseña', 'Fallo')


    def get_results_and_passwords(self):
        # Obtener una copia del DataFrame de resultados y contraseñas
        return self._results.copy()
    
    def export_results(self, file_nmar):
        # Exportar los resultados a un archivo CSV
        self._results.to_csv(file_name)
    
    def delete_user(self, user_id):
        # Eliminar un usuario en Keycloak y la información asociada en Azure
        delete_user_url = f'{self._keycloak_base_url}/admin/realms/{self._realm_name}/users/{user_id}'
        headers = {'Authorization': f'Bearer {self.get_access_token()}'}

        try:
            response = requests.delete(delete_user_url, headers=headers)
            response.raise_for_status()  # Esto generará una excepción si hay un error HTTP (código de estado diferente de 2xx)

            print(f'Usuario con ID {user_id} eliminado con éxito.')

        except requests.exceptions.RequestException as e:
            print(f'Error al eliminar el usuario: {e}')
            
        try:
            # Eliminar la información del usuario en Azure
            query_delete = "DELETE FROM [User] WHERE Id = ?"
            values_delete = (user_id,)

            # Ejecutar la consulta
            result = self._conn.execute(query_delete, values_delete)

            # Obtener el número de filas afectadas
            rows_affected = result.rowcount

            # Imprimir el resultado
            print(f'ID: {user_id}, Filas afectadas: {rows_affected}')

            self._conn.commit()

        except pyodbc.Error as e:
            # Manejar la excepción de pyodbc.Error
            print(f'Error de pyodbc al ejecutar la consulta de eliminación: {str(e)}')

        except Exception as ex:
            # Manejar otras excepciones generales
            print(f'Error general al ejecutar la consulta de eliminación: {str(ex)}')
            
        finally:
            time.sleep(3)
            
    def roles(self, user_id, RoleId):
        # Asignar roles específicos en Keycloak
        user_roles_url = f'{self._keycloak_base_url}/admin/realms/{self._realm_name}/users/{user_id}/role-mappings/realm'
        
        headers = {
        'Authorization': f'Bearer {self.get_access_token()}',
        'Content-Type': 'application/json',
        }
        
        if RoleId ==  'c650065b-4df9-41e0-b94b-d72fe7ff8cae':
            # Asignar el rol 'Professional'
            requests.post(user_roles_url, headers=headers, json=[{'id': '8b85ae89-51e6-45e7-9529-fb4afd9d4fd7', 'name': 'Professional', 'description': 'Professional User', 'composite': False, 'clientRole': False, 'containerId': 'ostube'}])
        elif RoleId == '61b957fd-21bf-47a1-80af-d4aa170230e1':
            # Asignar el rol 'Premium'
            requests.post(user_roles_url, headers=headers, json=[{'id': 'c97cb7f6-0aba-4528-b2ab-e79ebd5c7b4f', 'name': 'Premium', 'description': 'Users with Premium subscription', 'composite': False, 'clientRole': False, 'containerId': 'ostube'}])
        else:
            pass