# Import librairie #

In [1]:
import psycopg2
import pandas as pd
from dotenv import load_dotenv
import os

## Charger les variables d'environnement ##

In [2]:
# Charger le .env depuis la racine du projet
from pathlib import Path
dotenv_path = r'C:\Penny\.env'
load_result = load_dotenv(dotenv_path=dotenv_path)
print(f"Chargement .env : {'[OK] Reussi' if load_result else '[ERREUR] Echec'}")

Chargement .env : ✓ Réussi


## Connexion Redshift Pennylane ##

In [3]:
conn = psycopg2.connect(
    host='pennylane-external.csqwamh5pldr.eu-west-1.redshift.amazonaws.com',
    port=5439,
    dbname='prod',
    user='u_289572',
    password=os.getenv('PENNYLANE_DATA_SHARING_KEY')
)

print("Connexion Redshift etablie")

Connexion Redshift etablie


## Fonction helper : Connexion auto-reconnect ##

In [4]:
def get_active_connection():
    """Retourne une connexion active, la recrée si nécessaire"""
    global conn
    try:
        # Test si la connexion est active
        pd.read_sql("SELECT 1;", conn)
        return conn
    except:
        # Reconnexion si fermée
        print("⟳ Reconnexion Redshift...")
        conn = psycopg2.connect(
            host='pennylane-external.csqwamh5pldr.eu-west-1.redshift.amazonaws.com',
            port=5439,
            dbname='prod',
            user='u_289572',
            password=os.getenv('PENNYLANE_DATA_SHARING_KEY')
        )
        print("✓ Reconnecté")
        return conn

# Utilisation : remplace "conn" par "get_active_connection()" dans tes requêtes
# Exemple :
df = pd.read_sql("SELECT * FROM pennylane.customers LIMIT 1;", get_active_connection())
print(f"Test réussi : {len(df)} ligne(s)")

  pd.read_sql("SELECT 1;", conn)
  df = pd.read_sql("SELECT * FROM pennylane.customers LIMIT 1;", get_active_connection())


Test réussi : 1 ligne(s)


## Liste table Transactions Pennylane ##

In [5]:
# Tables bank transactions Pennylane
tables_extended = [
    'bank_transactions'
]

accessible_tables = []

for table in tables_extended:
    try:
        query = f"SELECT * FROM pennylane.{table} LIMIT 1;"
        df = pd.read_sql(query, conn)
        accessible_tables.append({
            'table_name': table,
            'columns': len(df.columns),
            'column_list': ', '.join(df.columns.tolist()[:5]) + '...'
        })
        print(f"✓ {table} ({len(df.columns)} colonnes)")
    except:
        pass

print(f"\n\n=== RÉSUMÉ: {len(accessible_tables)} tables accessibles ===")
df_accessible = pd.DataFrame(accessible_tables)
print(df_accessible.to_string(index=False))

  df = pd.read_sql(query, conn)


✓ bank_transactions (18 colonnes)


=== RÉSUMÉ: 1 tables accessibles ===
       table_name  columns                                                      column_list
bank_transactions       18 id, execution_date, company_id, account_name, thirdparty_name...


## Liste colonnes Transactions Pennylane ##

In [6]:
# Afficher les colonnes bank transactions
tables_found = [
    'bank_transactions'
]

for table in tables_found:
    query = f"SELECT * FROM pennylane.{table} LIMIT 0;"  # LIMIT 0 = structure seulement
    df = pd.read_sql(query, conn)
    print(f"\n{'='*60}")
    print(f"Table: pennylane.{table}")
    print(f"{'='*60}")
    print(f"Colonnes ({len(df.columns)}):")
    for i, col in enumerate(df.columns, 1):
        print(f"  {i:2d}. {col}")


Table: pennylane.bank_transactions
Colonnes (18):
   1. id
   2. execution_date
   3. company_id
   4. account_name
   5. thirdparty_name
   6. thirdparty_id
   7. label
   8. amount_eur
   9. outstanding_balance
  10. currency
  11. currency_amount
  12. source
  13. is_potential_duplicate
  14. sftp_ebics_files_filename
  15. sftp_ebics_files_created_at
  16. accountants_view_status
  17. proof_status
  18. updated_at


  df = pd.read_sql(query, conn)


## Chargement des données transaction ##

In [7]:
# Charger transactions
query_gl = "SELECT * FROM pennylane.bank_transactions;"
df_bank_transactions = pd.read_sql(query_gl, get_active_connection())

print(f"✓ Données chargées : {len(df_bank_transactions)} lignes, {len(df_bank_transactions.columns)} colonnes")

  pd.read_sql("SELECT 1;", conn)


  df_bank_transactions = pd.read_sql(query_gl, get_active_connection())


✓ Données chargées : 324 lignes, 18 colonnes


## Affichage des colonnes disponibles ##

In [8]:
print("Colonnes disponibles :")
for i, col in enumerate(df_bank_transactions.columns, 1):
    print(f"  {i:2d}. {col}")

Colonnes disponibles :
   1. id
   2. execution_date
   3. company_id
   4. account_name
   5. thirdparty_name
   6. thirdparty_id
   7. label
   8. amount_eur
   9. outstanding_balance
  10. currency
  11. currency_amount
  12. source
  13. is_potential_duplicate
  14. sftp_ebics_files_filename
  15. sftp_ebics_files_created_at
  16. accountants_view_status
  17. proof_status
  18. updated_at


## Suppresion de colonnes ##

In [9]:
# Colonnes à supprimer
columns_to_drop = ["id", "company_id","sftp_ebics_files_filename","sftp_ebics_files_created_at","created_at", "updated_at"]

# Vérifier quelles colonnes existent réellement
existing_cols = [col for col in columns_to_drop if col in df_bank_transactions.columns]
missing_cols = [col for col in columns_to_drop if col not in df_bank_transactions.columns]

print(f"Colonnes supprimées : {existing_cols}")
if missing_cols:
    print(f"Colonnes inexistantes (ignorées) : {missing_cols}")

# Supprimer les colonnes
df_bank_transactions = df_bank_transactions.drop(columns=existing_cols)

print(f"\n✓ Résultat : {len(df_bank_transactions.columns)} colonnes restantes")
print(f"Colonnes finales :\n{df_bank_transactions.columns.tolist()}")

Colonnes supprimées : ['id', 'company_id', 'sftp_ebics_files_filename', 'sftp_ebics_files_created_at', 'updated_at']
Colonnes inexistantes (ignorées) : ['created_at']

✓ Résultat : 13 colonnes restantes
Colonnes finales :
['execution_date', 'account_name', 'thirdparty_name', 'thirdparty_id', 'label', 'amount_eur', 'outstanding_balance', 'currency', 'currency_amount', 'source', 'is_potential_duplicate', 'accountants_view_status', 'proof_status']


## Affichage type clonne ##

In [10]:
print(f"Types de données des {len(df_bank_transactions.columns)} colonnes :\n")
for i, (col, dtype) in enumerate(df_bank_transactions.dtypes.items(), 1):
    print(f"  {i:2d}. {col:<30} → {dtype}")

Types de données des 13 colonnes :

   1. execution_date                 → object
   2. account_name                   → object
   3. thirdparty_name                → object
   4. thirdparty_id                  → float64
   5. label                          → object
   6. amount_eur                     → float64
   7. outstanding_balance            → float64
   8. currency                       → object
   9. currency_amount                → float64
  10. source                         → object
  11. is_potential_duplicate         → bool
  12. accountants_view_status        → object
  13. proof_status                   → object


## Conversion des types de colonnes ##

In [11]:
# Conversion des types
df_bank_transactions = df_bank_transactions.astype({
    'amount_eur': 'float64',
    'outstanding_balance': 'float64',
    'execution_date': 'datetime64[ns]',
    'currency_amount': 'float64'
})

# Convertir toutes les autres colonnes en string
columns_to_string = [col for col in df_bank_transactions.columns if col not in ['amount_eur','outstanding_balance','execution_date','currency_amount']]
df_bank_transactions[columns_to_string] = df_bank_transactions[columns_to_string].astype(str)

print("✓ Conversion effectuée\n")
print("Nouveaux types :")
for i, (col, dtype) in enumerate(df_bank_transactions.dtypes.items(), 1):
    print(f"  {i:2d}. {col:<30} → {dtype}")

✓ Conversion effectuée

Nouveaux types :
   1. execution_date                 → datetime64[ns]
   2. account_name                   → object
   3. thirdparty_name                → object
   4. thirdparty_id                  → object
   5. label                          → object
   6. amount_eur                     → float64
   7. outstanding_balance            → float64
   8. currency                       → object
   9. currency_amount                → float64
  10. source                         → object
  11. is_potential_duplicate         → object
  12. accountants_view_status        → object
  13. proof_status                   → object


## Connexion PostgreSQL local ##

In [12]:
# Connexion à PostgreSQL local (utilise les variables du .env)
conn_pg = psycopg2.connect(
    host=os.getenv('POSTGRES_HOST'),
    port=int(os.getenv('POSTGRES_PORT')),
    dbname=os.getenv('POSTGRES_DB'),
    user=os.getenv('POSTGRES_USER'),
    password=os.getenv('POSTGRES_PASSWORD')
)

print("✓ Connexion PostgreSQL établie")
print(f"  Database: {os.getenv('POSTGRES_DB')}")
print(f"  User: {os.getenv('POSTGRES_USER')}")
print(f"  Port: {os.getenv('POSTGRES_PORT')}")

✓ Connexion PostgreSQL établie
  Database: pennylane_data
  User: pennylane_user
  Port: 5433


## Import des données dans PostgreSQL avec SQLAlchemy ##

In [13]:
from sqlalchemy import create_engine

# Créer un engine SQLAlchemy
engine = create_engine(
    f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_DB')}"
)

print("✓ Engine SQLAlchemy créé")

✓ Engine SQLAlchemy créé


## Export du DataFrame vers PostgreSQL ##

In [None]:
# Paramètres d'import
table_name = 'bank_transactions'
schema_name = 'pennylane'

# Importer le DataFrame dans PostgreSQL
print(f"Import en cours de {len(df_bank_transactions)} lignes...")

df_bank_transactions.to_sql(
    name=table_name,
    con=engine,
    schema=schema_name,
    if_exists='replace',  # Options: 'fail', 'replace', 'append'
    index=False,
    method='multi',
    chunksize=1000
)

print(f"✓ Table '{schema_name}.{table_name}' importée avec succès !")
print(f"  - {len(df_bank_transactions)} lignes")
print(f"  - {len(df_bank_transactions.columns)} colonnes")