# Transform_from_Bronze_to_Silver
##### es un proceso en la arquitectura de datos que implica la transformación y limpieza de datos desde una capa inicial de almacenamiento (Bronze) hacia una capa más refinada y estructurada (Silver).


In [1]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.sql.types import StringType
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
import re
from awsglue.job import Job
from pyspark.context import SparkContext
import boto3

# Obtener los argumentos del trabajo
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Crear contexto de Glue
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session


# Crear el objeto Job
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 97cbb2b7-60ab-4ec5-84d1-90fa7011284e
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 97cbb2b7-60ab-4ec5-84d1-90fa7011284e to get into ready status...
Session 97cbb2b7-60ab-4ec5-84d1-90fa7011284e has been created.
GlueArgumentError: the following arguments are required: --JOB_NAME


In [2]:


# Definir el esquema basado en el objeto JSON
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image_url", StringType(), True),
    StructField("news_site", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("published_at", TimestampType(), True),
    StructField("updated_at", TimestampType(), True)
])

# Rutas de los archivos CSV en S3
csv_path1 = "s3://spaceflightbuckes2025/Bronze/data_articules.csv"
csv_path2 = "s3://spaceflightbuckes2025/Bronze/data_blogs.csv"
csv_path3 = "s3://spaceflightbuckes2025/Bronze/data_reports.csv"


# Leer los archivos CSV con el esquema definido
df1 = spark.read.option("header", True).schema(schema).csv(csv_path1)
df2 = spark.read.option("header", True).schema(schema).csv(csv_path2)
df3 = spark.read.option("header", True).schema(schema).csv(csv_path3)

df1_final = df1.filter(df1["id"].isNotNull())

# Identificar todas las columnas únicas
all_columns = set(df1.columns).union(df2.columns).union(df3.columns)

# Función para alinear columnas faltantes
def align_columns(df, all_columns):
    for col in all_columns:
        if col not in df.columns:
            df = df.withColumn(col, lit(None))  # Agregar columna con valores nulos
    return df.select(sorted(all_columns))  # Ordenar columnas alfabéticamente

# Alinear las columnas de los DataFrames
df1_aligned = align_columns(df1_final, all_columns)
df2_aligned = align_columns(df2, all_columns)
df3_aligned = align_columns(df3, all_columns)

# Unir los DataFrames
data = df1_aligned.unionByName(df2_aligned).unionByName(df3_aligned)

# Eliminar las filas donde la columna 'id' es null
#data_cleaned = data.filter(data["id"].isNotNull())

data_cleaned = data.dropna()





In [4]:

# Limpieza de datos: Columnas principales
#cleaned_data = data_cleaned.select("id", "title", "summary", "news_site", "published_at") \
                  # .withColumn("published_date", to_date("published_at"))
cleaned_data = data_cleaned
#medoto para eliminar si alguna fila esta duplicada en el conjunto de datos 
cleaned_data = cleaned_data.dropDuplicates()




In [6]:
# Escribir datos S3
output_path = "s3://spaceflightbuckes2025/Silver /"

#crear archivo para la fact 
# Guardar el archivo Parquet con el mismo nombre, sobrescribiendo siempre el archivo
cleaned_data.coalesce(1).write.mode("overwrite").parquet(output_path + "/cleaned_data")




In [None]:
# Realizar commit del job al final
job.commit()