# Ejercicio 1

Construcción de un proceso ETL.

Para la construcciòn del ETL se opto por el uso de AWS GLUE. 
Servicio tipo serverless que facilita la integración de datos.

## Crawler
El primer paso es crear un crawler. El cual es el enlace con nuestro DataStore. El crawler permite inferir el schema de la información persistida.

1. En la consola de AWS buscamos AWS Glue.
2. Damos clic en la parte de crawler.
3. Clic en Add crawler.
4. En crawler source type escogemos Data Stores y Crawl all folders.
5. Como Data Store escogemos un bucket S3.
6. Dejamos vacia la parte de connection.
7. Seleccionamos Specified path in my account.
8. En la parte path buscamos nuestro bucket-test-data y seleccionamos la carpeta raw.
9. En Add Another Data Store seleccionamos No.
10. Escogemos un IAM role adecuado. (AmazonS3FullAccess, AWSGlueServiceRole, AWSGlueConsoleFullAccess)
11. En frequency por tema del ejercicio usaremos Run On Demand.
12. El output del crawler será una base de datos en AWS GLUE. Seleccionamos Add Database.
13. Le damos como nombre test-db.
14. Damos Next.
15. Damos Finish.
16. Seleccionamos el crawler creado y la opción Run Crawler.
17. Al terminar el status, navegamos a Databases/tables
18. Veremos la tabla raw lo que indica que el crawler fue ejecutado con éxito.

## ETL
El segundo paso es crear un ETL Job.
1. Damos clic en ETL jobs.
2. Clic en Add job.
3. Damos un nombre al job, y escogemos el mismo rol usado previamente.
4. Type Spark, Glue Version Spark 2.4, Python 3(Glue Version 2.0)
5. En this job runs seleccionamos A proposed script generated by AWS Glue.
6. Damos un nombre al script y una ruta S3 donde se almacenará. s3://anoc001-test-rodrigoruiz/AWS-Glue/glue-scripts
7. Para el temporal s3://anoc001-test-rodrigoruiz/AWS-Glue/glue-tmp
8. Escogemos un Data Source. En nuestro caso el de raw.
9. Escogemos Change Schema.
10. Escogemos la opción Create tables in your data target con los valores.
    a. Data Store: S3
    b. Format: Parquet
    c. Target Path: s3://anoc001-test-data-rodrigoruiz/processed
11. Se visualiza un mapeo de la información Source y Target. Damos en Save job and edit script.
12. Se visualiza el script generado de forma automática el cual lee nuestra información en el bucket S3 y la transforma para persistirla en nuestro S3 destino.

La sección anterior nos da un ETL listo para ejecutar. Sin embargo reemplazaremos el script por el proporcionado en el bucket en la misma sección. s3://anoc001-test-rodrigoruiz/AWS-Glue/glue-scripts/glue-test-job

A continuación se explica el código agregado de forma manual.

Al revisar la información y entender que solo se quiere los montos referentes a las ventas. Se hace un filtrado en los registros dada la condición status en paid o pending_payment. Entendiendo que los demás conceptos se requieren pero para un balance general.

In [None]:
# Filtramos los registros que son de tipo paid o pending_payment
## @type: Filter
## @args: [f = lambda x: x["status"] in ['paid',"pending_payment"]]
## @return: filtersales
## @inputs: [frame = resolvechoice2]
filtersales = Filter.apply(frame = resolvechoice2, f = lambda x: x["status"] in ['paid',"pending_payment"])

Para la limpieza de los datos se crearon 3 funciones.

In [None]:
# Funcion que completa el campo name
def completeName(row):
    if row["name"] is None or row["name"] == "":
        if row["company_id"] == "cbf1c8b09cd5b549416d49d220a40cbd317f952e":
            row["name"] = "MiPasajefy"
        elif row["company_id"] == "8f642dc67fccf861548dfe1c761ce22f795e91f0":
            row["name"] = "Muebles chidos"
    return row

# Funcion que completa el campo company_id            
def completeCompanyId(row):
    if row["company_id"] is None or row["company_id"] == "":
        if row["name"] == "MiPasajefy":
            row["company_id"] = "cbf1c8b09cd5b549416d49d220a40cbd317f952e"
        elif row["name"] == "Muebles chidos":
            row["company_id"] = "8f642dc67fccf861548dfe1c761ce22f795e91f0"
    return row
    
# Funcion que limpia y estandariza el campo created_at
def cleanCreatedAt(row):
    if len(row["created_at"])<9:
        row["created_at"]=row["created_at"][6:]+"/"+row["created_at"][4:6]+"/"+row["created_at"][:4]
    if "-" in row["created_at"]:
        row["created_at"]=row["created_at"][8:10]+"/"+row["created_at"][5:7]+"/"+row["created_at"][:4]
    return row

Aplicamos las funciones a nuestro dynamicframe.

In [None]:
# Aplicamos la función completeName    
## @type: Map
## @args: [f = completeName, transformation_ctx = "completedName"]
## @return: completedName
## @inputs: [frame = <frame>]
completedName = Map.apply(frame = filtersales, f = completeName, transformation_ctx = "completedName")

# Aplicamos la función completeCompanyId    
## @type: Map
## @args: [f = completeCompanyId, transformation_ctx = "completeCompanyId"]
## @return: completedCompanyId
## @inputs: [frame = completedName]
completedCompanyId = Map.apply(frame = completedName, f = completeCompanyId, transformation_ctx = "completeCompanyId")

# Aplicamos la función cleanCreatedAt    
## @type: Map
## @args: [f = cleanCreatedAt, transformation_ctx = "cleanCreatedAt"]
## @return: cleanedCreatedAt
## @inputs: [frame = completedCompanyId]
cleanedCreatedAt = Map.apply(frame = completedCompanyId, f = cleanCreatedAt, transformation_ctx = "cleanCreatedAt")

Por requisito se requiere persistir solo el sumarizado de ventas, agrupando por los campos name y created_at.
Por lo que convertiremos nuestro dynamicframe en un dataframe para poderlo explotar de manera facil con SQL.

In [None]:
# Creamos un df para crear una vista temporal y agrupar via SQL las ventas por name y created_at
df = cleanedCreatedAt.toDF()
df.createOrReplaceTempView("salesTable")
result_sql_df=spark.sql("SELECT name, created_at, SUM(amount) FROM salesTable GROUP BY name, created_at")
# Lo pasamos a dynamicFrame para persistirlo
result_dyf = DynamicFrame.fromDF(result_sql_df, glueContext, "result_sql_df")
# Este coalesce es opcional y es para que quede un unico archivo en el procesamiento. Sin embargo puede afectar temas de performance.
result_dyf_with_less_partitions=result_dyf.coalesce(1)

Finalmente Persistimos la información en nuestro S3. El cual será insumo de nuestro dashboard.

In [None]:
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://anoc001-test-data-rodrigoruiz/processed"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = result_dyf_with_less_partitions]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = result_dyf_with_less_partitions, connection_type = "s3", connection_options = {"path": "s3://anoc001-test-data-rodrigoruiz/processed"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

In [None]:
Para ejecutar el job basta con Dar clic en Run job.
Al terminar la ejecución podremos visualizar en el S3 el insumo requerido.