In [1]:
import boto3
import os
import pandas as pd

In [2]:
os.environ["AWS_PROFILE"] = "men"   
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

#### Revisión y actualización de metadatos en S3 desde DynamoDB

1. **Fuente de datos**: Extraer los objetos de la tabla `prod_objects_url` en **DynamoDB** donde la columna `description` tenga el valor `"(nan)"` o `"..."`.  

2. **Objetivo**: Actualizar el valor del metadato `description` en S3 para estos archivos.  

3. **Método de procesamiento**:
   - **Archivos `.txt`**: Pasarlos por un **prompt** para generar una nueva descripción.  
   - **Archivos `.pdf`**: Extraer el contenido usando **Amazon Textract** antes de generar la descripción.  


In [10]:
import boto3
import pandas as pd

# Crear cliente de DynamoDB
dynamodb = boto3.resource('dynamodb')

# Nombre de la tabla
table_name = 'prod_objects_url'
table = dynamodb.Table(table_name)

# Escanear la tabla con paginación, pero solo almacenar filas con description == "(nan)" o "..."
data = []
response = table.scan()

data.extend([item for item in response.get('Items', []) if item.get('description') in ["(nan)", "..."]])
while 'LastEvaluatedKey' in response:
    response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    data.extend([item for item in response.get('Items', []) if item.get('description') in ["(nan)", "..."]])

# Convertir los datos en un DataFrame
df = pd.DataFrame(data)

# Crear cliente de S3
s3_client = boto3.client('s3')

# Función para verificar si un objeto existe en S3
def check_s3_object_exists(s3_path):
    try:
        # Extraer el bucket y el key del path tipo "s3://bucket/key"
        s3_parts = s3_path.replace("s3://", "").split("/", 1)
        bucket, key = s3_parts[0], s3_parts[1]
        
        # Intentar obtener metadata del objeto
        s3_client.head_object(Bucket=bucket, Key=key)
        return True  # Existe
    except s3_client.exceptions.ClientError as e:
        return False  # No existe

# Aplicar la verificación a la columna de paths
df["exists_in_s3"] = df["s3_object_path"].apply(check_s3_object_exists)

# Verificar existencia de la versión con ".metadata.json"
df["exists_metadata_json"] = df["s3_object_path"].apply(lambda x: check_s3_object_exists(x + ".metadata.json"))

# Mostrar los primeros registros
df.head()

Unnamed: 0,description,web_url,s3_object_path,exists_in_s3,exists_metadata_json
0,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,True
1,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,True
2,...,https://especiales.colombiaaprende.edu.co/teji...,s3://men-prod-clean-data/contenidos_formativos...,True,True
3,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,True
4,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False


In [22]:
df["description"].value_counts()

description
(nan)    948
...      587
Name: count, dtype: int64

In [23]:
# Extraer la extensión del archivo desde la columna s3_object_path
df["file_extension"] = df["s3_object_path"].str.extract(r'\.([a-zA-Z0-9]+)$')

df["file_extension"].value_counts()

file_extension
pdf    823
txt    707
PDF      5
Name: count, dtype: int64

In [24]:
df_no_metadata = df[
    (df["exists_in_s3"] == False) | 
    (df["exists_metadata_json"] == False)
]

df_no_metadata

Unnamed: 0,description,web_url,s3_object_path,exists_in_s3,exists_metadata_json,file_extension,base_name
4,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False,txt,s3://men-prod-clean-data/contenidos_formativos...
5,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False,txt,s3://men-prod-clean-data/contenidos_formativos...
35,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False,txt,s3://men-prod-clean-data/contenidos_formativos...
36,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False,txt,s3://men-prod-clean-data/contenidos_formativos...
43,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False,txt,s3://men-prod-clean-data/contenidos_formativos...
...,...,...,...,...,...,...,...
1513,...,https://colab.colombiaaprende.edu.co/experienc...,s3://men-prod-clean-data/contenidos_formativos...,False,True,pdf,s3://men-prod-clean-data/contenidos_formativos...
1519,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False,txt,s3://men-prod-clean-data/contenidos_formativos...
1530,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False,txt,s3://men-prod-clean-data/contenidos_formativos...
1532,...,https://eco.colombiaaprende.edu.co/masterclass/,s3://men-prod-clean-data/contenidos_formativos...,False,True,pdf,s3://men-prod-clean-data/contenidos_formativos...


In [3]:
#df.to_csv("datos_procesados.csv", sep=";", index=False, encoding="utf-8-sig")

import pandas as pd

df = pd.read_csv("datos_procesados.csv", sep=";", encoding="utf-8-sig")

# Textract PDFS

In [8]:
df_pdf = df[df["file_extension"] == "pdf"].copy()
df_pdf.head()

Unnamed: 0,description,web_url,s3_object_path,exists_in_s3,exists_metadata_json,file_extension
0,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,True,pdf
1,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,True,pdf
3,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,True,pdf
6,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,True,pdf
8,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,True,pdf


In [13]:
import boto3
import time
import pandas as pd

# Crear cliente de Textract
textract = boto3.client('textract')

# Definir tamaño del bache
batch_size = 25

# Dividir df_pdf en baches
num_batches = (len(df_pdf) // batch_size) + (1 if len(df_pdf) % batch_size != 0 else 0)

print(f"🔹 Total de PDFs: {len(df_pdf)}")
print(f"🔹 Tamaño del bache: {batch_size}")
print(f"🔹 Número total de baches: {num_batches}")

# Procesar cada bache
for batch_num in range(num_batches):
    start_idx = batch_num * batch_size
    end_idx = start_idx + batch_size
    df_batch = df_pdf.iloc[start_idx:end_idx].copy()

    # Inicializar la columna 'textract' con NaN
    df_batch["textract"] = pd.NA

    print(f"\n🚀 Procesando bache {batch_num + 1}/{num_batches} (PDFs {start_idx + 1} - {min(end_idx, len(df_pdf))})")

    # Procesar cada archivo PDF en el bache
    for index, row in df_batch.iterrows():
        s3_path = row["s3_object_path"]
        bucket_name, object_key = s3_path.replace("s3://", "").split("/", 1)

        print(f"\n📂 Procesando archivo {index + 1}/{len(df_batch)}: {object_key}")

        # Iniciar medición del tiempo
        start_time = time.time()

        # Iniciar análisis con Textract
        response = textract.start_document_text_detection(
            DocumentLocation={'S3Object': {'Bucket': bucket_name, 'Name': object_key}}
        )
        job_id = response['JobId']
        print(f"🚀 Job ID de Textract: {job_id}")

        # Esperar hasta que el análisis termine
        while True:
            result = textract.get_document_text_detection(JobId=job_id)
            status = result['JobStatus']

            if status == "SUCCEEDED":
                print("✅ Extracción completada.")
                break
            elif status == "FAILED":
                print("❌ Error en la extracción.")
                break
            else:
                print("⌛ Procesando... esperando 5 segundos.")
                time.sleep(5)

        # Calcular el tiempo total de extracción
        elapsed_time = round(time.time() - start_time, 2)
        print(f"⏳ Tiempo total de extracción: {elapsed_time} segundos")

        # Obtener el texto extraído
        extracted_text = ""
        for item in result.get('Blocks', []):
            if item['BlockType'] == 'LINE':
                extracted_text += item['Text'] + '\n'

        # Guardar el texto extraído en df_batch
        df_batch.at[index, "textract"] = extracted_text if extracted_text.strip() else pd.NA

    # Guardar el bache en un CSV
    file_name = f"df_pdf_bache_{batch_num + 1}.csv"
    df_batch.to_csv(file_name, sep=";", encoding="utf-8-sig", index=False)

    print(f"\n✅ Bache {batch_num + 1}/{num_batches} procesado y guardado en '{file_name}'")

print("\n🎉 Todos los baches han sido procesados exitosamente.")


🔹 Total de PDFs: 823
🔹 Tamaño del bache: 25
🔹 Número total de baches: 33

🚀 Procesando bache 1/33 (PDFs 1 - 25)

📂 Procesando archivo 1/25: contenidos_formativos/CAMPUS VIRTUAL/filedir/a5/00/MEN_PTA_C7_CDA.pdf
🚀 Job ID de Textract: 6c14170fcd3f6c88a3aaed8b6529fd0c2901dc728f10f760646f2db88f88108f
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
✅ Extracción completada.
⏳ Tiempo total de extracción: 53.66 segundos

📂 Procesando archivo 2/25: contenidos_formativos/CAMPUS VIRTUAL/filedir/8c/ca/udea_g10par_cna_m7_c2_inf06_V2 PDF.pdf
🚀 Job ID de Textract: b5d50aba84210efa1c36f8d0f5cd979f05dbc5abef7d9eec39eacf26df2285cf
⌛ Procesando... esperando 5 segundos.
⌛ Procesan

InvalidS3ObjectException: An error occurred (InvalidS3ObjectException) when calling the StartDocumentTextDetection operation: Unable to get object metadata from S3. Check object key, region and/or access permissions.

In [14]:
# Verificar si las credenciales de AWS están activas antes de continuar
import boto3

try:
    sts_client = boto3.client("sts")
    identity = sts_client.get_caller_identity()
    print(f"✅ Credenciales activas. Usuario ARN: {identity['Arn']}")
except Exception as e:
    print(f"⚠️ Error con las credenciales de AWS: {e}")
    print("🔄 Intenta reconectar las credenciales antes de continuar.")
    raise SystemExit

# Continuar con el procesamiento del bache 11 si las credenciales están bien
import time
import pandas as pd

# Crear cliente de Textract
textract = boto3.client('textract')

# Definir el tamaño del bache
batch_size = 25
batch_num = 11  # Número de bache a procesar

# Determinar los índices del bache 11
start_idx = (batch_num - 1) * batch_size
end_idx = start_idx + batch_size
df_batch = df_pdf.iloc[start_idx:end_idx].copy()

# Inicializar la columna 'textract' con NaN
df_batch["textract"] = pd.NA

print(f"\n🚀 Procesando bache {batch_num} (PDFs {start_idx + 1} - {min(end_idx, len(df_pdf))})")

# Procesar cada archivo PDF en el bache
for index, row in df_batch.iterrows():
    s3_path = row["s3_object_path"]
    bucket_name, object_key = s3_path.replace("s3://", "").split("/", 1)

    print(f"\n📂 Procesando archivo {index + 1}/{len(df_batch)}: {object_key}")

    # Iniciar medición del tiempo
    start_time = time.time()

    try:
        # Iniciar análisis con Textract
        response = textract.start_document_text_detection(
            DocumentLocation={'S3Object': {'Bucket': bucket_name, 'Name': object_key}}
        )
        job_id = response['JobId']
        print(f"🚀 Job ID de Textract: {job_id}")

        # Esperar hasta que el análisis termine
        while True:
            result = textract.get_document_text_detection(JobId=job_id)
            status = result['JobStatus']

            if status == "SUCCEEDED":
                print("✅ Extracción completada.")
                break
            elif status == "FAILED":
                print("❌ Error en la extracción.")
                break
            else:
                print("⌛ Procesando... esperando 5 segundos.")
                time.sleep(5)

        # Calcular el tiempo total de extracción
        elapsed_time = round(time.time() - start_time, 2)
        print(f"⏳ Tiempo total de extracción: {elapsed_time} segundos")

        # Obtener el texto extraído
        extracted_text = ""
        for item in result.get('Blocks', []):
            if item['BlockType'] == 'LINE':
                extracted_text += item['Text'] + '\n'

        # Guardar el texto extraído en df_batch
        df_batch.at[index, "textract"] = extracted_text if extracted_text.strip() else pd.NA

    except Exception as e:
        print(f"⚠️ Error procesando {object_key}: {e}")
        df_batch.at[index, "textract"] = pd.NA

# Guardar el bache en un CSV
file_name = f"df_pdf_bache_{batch_num}.csv"
df_batch.to_csv(file_name, sep=";", encoding="utf-8-sig", index=False)

print(f"\n✅ Bache {batch_num} procesado y guardado en '{file_name}'")


✅ Credenciales activas. Usuario ARN: arn:aws:sts::825765421732:assumed-role/AWSReservedSSO_PS-MLTraineeAccess_ee0d75838073a97a/mateo.grisales@nuvu.cc

🚀 Procesando bache 11 (PDFs 251 - 275)

📂 Procesando archivo 484/25: contenidos_formativos/CAMPUS VIRTUAL/filedir/c3/58/MEN_PTA_C3_ANEXO4_BINGO.pdf
🚀 Job ID de Textract: bec1a35002a4f03ac6f8fdb726a44ea9646c1f695bf0c79330b4f1977d166b4d
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
✅ Extracción completada.
⏳ Tiempo total de extracción: 17.56 segundos

📂 Procesando archivo 488/25: contenidos_formativos/CAMPUS VIRTUAL/filedir/cb/2b/Guia descarga App_Moviles_V2.pdf
🚀 Job ID de Textract: c0524447172be601ac6b433f98cbc03530b7a54274caa4e96e84d46262ae8205
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
✅ Extracción completada.
⏳ Tiempo total de extracción: 11.06 segundos

📂 Procesando archivo 489/25: contenidos_formativos/CAMPUS VIRTUAL/filedir/8d/59/Gu

In [15]:
import boto3
import time
import pandas as pd

# Crear cliente de Textract
textract = boto3.client('textract')

# Definir tamaño del bache y desde dónde continuar
batch_size = 25
start_batch = 12  # Iniciar desde el bache 12

# Calcular el número total de baches restantes
num_batches = (len(df_pdf) // batch_size) + (1 if len(df_pdf) % batch_size != 0 else 0)
remaining_batches = range(start_batch, num_batches + 1)

print(f"🔹 Total de PDFs: {len(df_pdf)}")
print(f"🔹 Tamaño del bache: {batch_size}")
print(f"🔹 Procesando desde bache {start_batch} hasta {num_batches}")

# Procesar cada bache restante
for batch_num in remaining_batches:
    start_idx = (batch_num - 1) * batch_size
    end_idx = start_idx + batch_size
    df_batch = df_pdf.iloc[start_idx:end_idx].copy()

    # Inicializar la columna 'textract' con NaN
    df_batch["textract"] = pd.NA

    print(f"\n🚀 Procesando bache {batch_num}/{num_batches} (PDFs {start_idx + 1} - {min(end_idx, len(df_pdf))})")

    # Procesar cada archivo PDF en el bache
    for index, row in df_batch.iterrows():
        s3_path = row["s3_object_path"]
        bucket_name, object_key = s3_path.replace("s3://", "").split("/", 1)

        print(f"\n📂 Procesando archivo {index + 1}/{len(df_batch)}: {object_key}")

        # Iniciar medición del tiempo
        start_time = time.time()

        try:
            # Iniciar análisis con Textract
            response = textract.start_document_text_detection(
                DocumentLocation={'S3Object': {'Bucket': bucket_name, 'Name': object_key}}
            )
            job_id = response['JobId']
            print(f"🚀 Job ID de Textract: {job_id}")

            # Esperar hasta que el análisis termine
            while True:
                result = textract.get_document_text_detection(JobId=job_id)
                status = result['JobStatus']

                if status == "SUCCEEDED":
                    print("✅ Extracción completada.")
                    break
                elif status == "FAILED":
                    print("❌ Error en la extracción.")
                    break
                else:
                    print("⌛ Procesando... esperando 5 segundos.")
                    time.sleep(5)

            # Calcular el tiempo total de extracción
            elapsed_time = round(time.time() - start_time, 2)
            print(f"⏳ Tiempo total de extracción: {elapsed_time} segundos")

            # Obtener el texto extraído
            extracted_text = ""
            for item in result.get('Blocks', []):
                if item['BlockType'] == 'LINE':
                    extracted_text += item['Text'] + '\n'

            # Guardar el texto extraído en df_batch
            df_batch.at[index, "textract"] = extracted_text if extracted_text.strip() else pd.NA

        except Exception as e:
            print(f"⚠️ Error procesando {object_key}: {e}")
            df_batch.at[index, "textract"] = pd.NA

    # Guardar el bache en un CSV
    file_name = f"df_pdf_bache_{batch_num}.csv"
    df_batch.to_csv(file_name, sep=";", encoding="utf-8-sig", index=False)

    print(f"\n✅ Bache {batch_num}/{num_batches} procesado y guardado en '{file_name}'")

print("\n🎉 Todos los baches restantes han sido procesados exitosamente.")


🔹 Total de PDFs: 823
🔹 Tamaño del bache: 25
🔹 Procesando desde bache 12 hasta 33

🚀 Procesando bache 12/33 (PDFs 276 - 300)

📂 Procesando archivo 538/25: contenidos_formativos/CAMPUS VIRTUAL/filedir/d6/80/Anexo 2 Conceptualizacio´n (1).pdf
🚀 Job ID de Textract: 5ea4ca034bcbfdf7e0423581fbca770f2bdbca762a18dd8c5eeb3e928824f8c5
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
⌛ Procesando... esperando 5 segundos.
✅ Extracción completada.
⏳ Tiempo total de extracción: 53.77 segundos

📂 Procesando archivo 540/25: contenidos_formativos/ENGLISH_FOR_COLOMBIA_ECOWEB/masterclass/MC # 3/MC3-Presentación/MC3-Presentación.pdf
⚠️ Error procesando contenidos_formativos/ENGLISH_FOR_COLOMBIA_ECOWEB/masterclass/MC #

# TXTs a csv

In [4]:
df_txt = df[df["file_extension"] == "txt"].copy()

In [6]:
import boto3
import pandas as pd

# Crear cliente de S3
s3_client = boto3.client('s3')

# Inicializar la columna 'textract' con NaN
df_txt["textract"] = pd.NA

# Procesar cada archivo .txt en df_txt
for index, row in df_txt.iterrows():
    s3_path = row["s3_object_path"]
    bucket_name, object_key = s3_path.replace("s3://", "").split("/", 1)

    print(f"\n📂 Procesando archivo TXT {index + 1}/{len(df_txt)}: {object_key}")

    try:
        # Descargar y leer el contenido del archivo .txt desde S3
        response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
        text_content = response["Body"].read().decode("utf-8")

        # Guardar el contenido en la columna 'textract'
        df_txt.at[index, "textract"] = text_content

    except Exception as e:
        print(f"⚠️ Error al leer {object_key}: {e}")
        df_txt.at[index, "textract"] = pd.NA

# Guardar el DataFrame en un CSV
df_txt.to_csv("df_txt.csv", sep=";", encoding="utf-8-sig", index=False)

print("\n✅ Archivo 'df_txt.csv' creado exitosamente con el contenido de los archivos TXT.")



📂 Procesando archivo TXT 3/707: contenidos_formativos/EDUSITIO_TEJIENDO_SABERES/archivos/MEFC1ALF.4.1.27.txt

📂 Procesando archivo TXT 5/707: contenidos_formativos/CAMPUS VIRTUAL/filedir/6b/06/MEN_ESC_C4_M3_VS02_Ver2_baja.txt

📂 Procesando archivo TXT 6/707: contenidos_formativos/CAMPUS VIRTUAL/filedir/8f/de/MEN_ESC_C2_M4_AN02_Ver2.txt

📂 Procesando archivo TXT 8/707: contenidos_formativos/APRENDER_DIGITAL_LIGERA_MOVIL/pages/_media_24_download.txt

📂 Procesando archivo TXT 10/707: contenidos_formativos/EDUSITIO_TEJIENDO_SABERES/archivos/MEFC1ALF.3.1.21.txt

📂 Procesando archivo TXT 15/707: contenidos_formativos/EDUSITIO_TEJIENDO_SABERES/archivos/MEFC1ALF.1.1.8.txt

📂 Procesando archivo TXT 16/707: contenidos_formativos/APRENDER_DIGITAL_LIGERA_MOVIL/pages/_media_174_download.txt

📂 Procesando archivo TXT 18/707: contenidos_formativos/APRENDER_DIGITAL_LIGERA_MOVIL/pages/_media_176_download.txt

📂 Procesando archivo TXT 19/707: contenidos_formativos/APRENDER_DIGITAL_LIGERA_MOVIL/pages/_m

In [7]:
df_txt.head()

Unnamed: 0,description,web_url,s3_object_path,exists_in_s3,exists_metadata_json,file_extension,textract
2,...,https://especiales.colombiaaprende.edu.co/teji...,s3://men-prod-clean-data/contenidos_formativos...,True,True,txt,A dónde va tan apurado don Paco? Pues para don...
4,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False,txt,toda persona tiene derecho a presentar peticio...
5,(nan),https://campus.colombiaaprende.edu.co/course/v...,s3://men-prod-clean-data/contenidos_formativos...,True,False,txt,el gran reto actual del Ministerio Nacional de...
7,...,https://movil.colombiaaprende.edu.co/media/24/...,s3://men-prod-clean-data/contenidos_formativos...,True,True,txt,
9,...,https://especiales.colombiaaprende.edu.co/teji...,s3://men-prod-clean-data/contenidos_formativos...,True,True,txt,A dónde va tan apurado Don Juanco? Pues para d...
