# Processo de ETL
### Transformação de Dados Usando PySpark
---

### Inicializa sessão.


In [None]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

### Cria um DynamicFrame e converte para DF do PySpark
DynamicFrame é criado a partir de um catálogo do Glue.

In [None]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='wecode-db', table_name='raw')
df = dyf.toDF()

### Executa transformações necessárias nos dados


In [None]:
from pyspark.sql.functions import last, first, count, split

df = df.withColumn("username", split(df["email"], "@").getItem(0))\
    .groupBy("postId")\
    .agg(
        count("*").alias("count_mensagens"),
        last("body").alias("ultima_mensagem"),
        first("username").alias("username")
    )

### Envia dados transformados para S3


In [None]:
from awsglue.dynamicframe import DynamicFrame

DyF = DynamicFrame.fromDF(df, glueContext, "parsed_df")

In [None]:
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=DyF,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "s3://teste-wecode/parsed/",
        "partitionKeys": [],
    },
    transformation_ctx="S3bucket_node3",
)


In [None]:
job.commit()