In [13]:
import os
import boto3
import json
import time
import jwt
import requests
import pandas as pd
from io import StringIO
import argparse

In [14]:
import json

# Cargar configuración
with open('config/parameters.json', 'r') as f:
    config = json.load(f)

# Construir argumentos
import sys
sys.argv.extend([
    '--input-s3-bucket', config['input_s3_bucket'],
    '--s3-key-path', config['s3_key_path'],
    '--output-s3-bucket', config['output_s3_bucket'],
    '--secret-name', config['secret_name'],
    '--region-name', config['region_name']
])

In [15]:
def parse_arguments():
    """Parsear argumentos de línea de comandos"""
    parser = argparse.ArgumentParser(description='NetSuite Data Extractor')
    parser.add_argument('--input-s3-bucket', required=True, help='S3 bucket name for input')
    parser.add_argument('--s3-key-path', required=True, help='S3 key path for private key')
    parser.add_argument('--output-s3-bucket', required=True, help='S3 bucket name for output')
    parser.add_argument('--secret-name', required=True, help='AWS Secrets Manager secret name')
    parser.add_argument('--region-name', default='us-east-1', help='AWS region name')
    
    return parser.parse_args()

In [16]:
session = boto3.Session(profile_name="arclad")

In [17]:
def get_secret_credentials(secret_name, region_name):
    """Obtener credenciales desde Secrets Manager"""
    try:
        secrets_client = session.client("secretsmanager", region_name=region_name)
        secret_value = secrets_client.get_secret_value(SecretId=secret_name)
        return json.loads(secret_value["SecretString"])
    except Exception as e:
        print(f"Error obteniendo credenciales: {e}")
        raise

In [18]:
def get_private_key_from_s3(bucket_name, key_name):
    """Descargar clave privada desde S3"""
    try:
        s3_client = session.client("s3")
        pem_obj = s3_client.get_object(Bucket=bucket_name, Key=key_name)
        return pem_obj["Body"].read().decode("utf-8")
    except Exception as e:
        print(f"Error descargando clave privada: {e}")
        raise

In [19]:
def get_access_token(account_id, client_id, certificate_id, private_key, scope="rest_webservices"):
    """Crear JWT y obtener access token"""
    now = int(time.time())
    payload = {
        "iss": client_id,
        "scope": scope,
        "aud": f"https://{account_id}.suitetalk.api.netsuite.com/services/rest/auth/oauth2/v1/token",
        "iat": now,
        "exp": now + 300
    }
    
    headers = {
        "alg": "PS256",
        "typ": "JWT",
        "kid": certificate_id
    }
    
    client_assertion = jwt.encode(
        payload,
        private_key,
        algorithm="PS256",
        headers=headers
    )
    
    token_url = f"https://{account_id}.suitetalk.api.netsuite.com/services/rest/auth/oauth2/v1/token"
    
    data = {
        "grant_type": "client_credentials",
        "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
        "client_assertion": client_assertion
    }
    
    resp = requests.post(token_url, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"})
    
    if resp.status_code == 200:
        access_token = resp.json()["access_token"]
        print("Token obtenido con éxito")
        return access_token
    else:
        print(f"Error al obtener token: {resp.status_code} {resp.text}")
        raise Exception(f"Failed to get access token: {resp.text}")

In [20]:
def extract_table_data(table, account_id, access_token, entity_tables):
    """Extraer datos de una tabla específica"""
    record_url = f"https://{account_id}.suitetalk.api.netsuite.com/services/rest/query/v1/suiteql"
    df_table = pd.DataFrame()
    limit = 1000
    offset = 0
    
    if table in entity_tables:
        sql = {"q": f"SELECT * FROM {table}"}
    else:
        transaction_fields = [
            "id","tranId","tranDate","entity","status","memo",
            "createdDate","lastModifiedDate","source","currency","location"
        ]
        select_fields = ",".join(transaction_fields)
        sql = {"q": f"SELECT {select_fields} FROM transaction WHERE type='{table}'"}
    
    print(f"SQL Query: {sql}")
    start_table = time.time()
    
    while True:
        print(f"Consultando tabla {table} | limit={limit}&offset={offset}")
        
        res = requests.post(record_url, headers={
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
            "Prefer": "transient"
        }, json=sql, params={"limit": limit, "offset": offset})

        if res.status_code == 200:
            data = res.json()
            cant_records = data.get("count", 0)
            
            if cant_records > 0:
                records = data.get("items", [])
                print(f"Datos obtenidos de la tabla {table}: {cant_records} registros.")
                
                df_page = pd.DataFrame(records)
                df_table = pd.concat([df_table, df_page], ignore_index=True)
            
            if data.get("hasMore", False):
                offset += limit
            else:
                print(f"No hay más páginas en la tabla {table}.")
                break
                
        elif res.status_code == 401 or "INVALID_LOGIN" in res.text:
            print("Token expirado, se necesita renovar...")
            raise Exception("Token expired - need to regenerate")
        else:
            print(f"Error SuiteQL: {res.status_code} {res.text}")
            break

    end_table = time.time()
    print(f"Tiempo de procesamiento de la tabla {table}: {end_table - start_table:.2f} seg")
    
    return df_table

In [21]:
def save_to_s3(df, table, output_bucket):
    """Guardar DataFrame en S3"""
    if len(df) > 0:
        print('Guardando el DataFrame en formato Parquet en S3')
        s3_path = f"s3://{output_bucket}/{table}/{table}.parquet"
        df.to_parquet(s3_path, engine='pyarrow', compression='snappy', index=False)
        print(f"Archivo guardado correctamente en S3: {s3_path}")
        return True
    return False

In [22]:
def main():
    # Configurar credenciales AWS
    
    # Parsear argumentos
    args = parse_arguments()
    
    # Definir tablas a extraer
    entity_tables = ["customer", "customersubsidiaryrelationship", "vendor"]
    transactions_tables = [
        "AdvIjrnl", "AsmBld", "AsmUnBld", "BinTrnfr", "BinWksht", "BPO", "CashRfnd", 
        "CashSale", "Check", "CardChrg", "CardRfnd", "CustCrM", "CustDep", "CustPymt", 
        "CustRfnd", "Depst", "DepAppl", "Estimate", "ExpRept", "FulfReq", "InvTrn", 
        "CustInvc", "ItemShip", "ItemRcpt", "Journal", "Opprtnty", "PeJrnl", "PrchCntrct", 
        "PurchOrd", "PurchRqst", "RtnAuth", "SalesOrd", "TrnfrOrd", "VendBill", 
        "VendCred", "VendPymt", "VPrep", "VPrepApp", "WorkOrd"
    ]
    tables = entity_tables + transactions_tables
    
    try:
        # Obtener credenciales
        print("Obteniendo credenciales...")
        creds = get_secret_credentials(args.secret_name, args.region_name)
        
        # Obtener clave privada
        print("Descargando clave privada...")
        private_key = get_private_key_from_s3(args.input_s3_bucket, args.s3_key_path)
        
        # Obtener access token
        print("Obteniendo access token...")
        access_token = get_access_token(
            creds["ACCOUNT_ID"],
            creds["CLIENT_ID"], 
            creds["CERTIFICATE_ID"],
            private_key
        )
        
        start_job = time.time()
        successful_tables = 0
        
        # Procesar cada tabla
        for table in tables:
            try:
                print(f"\n--- Procesando tabla: {table} ---")
                df_table = extract_table_data(table, creds["ACCOUNT_ID"], access_token, entity_tables)
                
                cant = len(df_table.index)
                print(f"Cantidad de registros de la tabla {table}: {cant}")
                
                if save_to_s3(df_table, table, args.output_s3_bucket):
                    successful_tables += 1
                
                del df_table
                
            except Exception as e:
                print(f"Error procesando tabla {table}: {e}")
                # Continuar con la siguiente tabla
                continue
        
        end_job = time.time()
        print(f"\n=== Migración completada ===")
        print(f"Tablas procesadas exitosamente: {successful_tables}/{len(tables)}")
        print(f"Tiempo total: {end_job - start_job:.2f} segundos")
        
    except Exception as e:
        print(f"Error en la ejecución: {e}")
        return 1
    
    return 0

In [26]:
main()

usage: ipykernel_launcher.py [-h] --input-s3-bucket INPUT_S3_BUCKET
                             --s3-key-path S3_KEY_PATH --output-s3-bucket
                             OUTPUT_S3_BUCKET --secret-name SECRET_NAME
                             [--region-name REGION_NAME]
ipykernel_launcher.py: error: unrecognized arguments: --f=c:\Users\Usuario\AppData\Roaming\jupyter\runtime\kernel-v3d14ee4095c0578c8d59aa96f1c6e5f8cf1d6581a.json


SystemExit: 2

In [27]:
%tb

SystemExit: 2