# Etapa 5

Conhecendo um pouco mais do arquivo antes de fazer o script de teste para entender o seu formato e os dados contidos no arquivo nomes.csv

In [10]:
import pandas as pd

df = pd.read_csv('nomes.csv')

df.head()

Unnamed: 0,nome,sexo,total,ano
0,Jennifer,F,54336,1983
1,Jessica,F,45278,1983
2,Amanda,F,33752,1983
3,Ashley,F,33292,1983
4,Sarah,F,27228,1983


In [11]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825433 entries, 0 to 1825432
Data columns (total 4 columns):
 #   Column  Dtype 
---  ------  ----- 
 0   nome    object
 1   sexo    object
 2   total   int64 
 3   ano     int64 
dtypes: int64(2), object(2)
memory usage: 55.7+ MB


In [12]:
df.shape

(1825433, 4)

Testando o codigo localmente

In [1]:
import os

os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk1.8.0_202"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "\\bin;" + os.environ["PATH"]

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TestarFiltro1934") \
    .getOrCreate()

caminho_csv = "nomes.csv"
caminho_saida = "saida_parquet"

df = spark.read.csv(caminho_csv, header=True, sep=",", inferSchema=True)

df.printSchema()

df_1934 = df.filter(df["ano"] == 1934)

df_1934.show(5)





root
 |-- nome: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- total: integer (nullable = true)
 |-- ano: integer (nullable = true)

+-------+----+-----+----+
|   nome|sexo|total| ano|
+-------+----+-----+----+
|   Mary|   F|56918|1934|
|  Betty|   F|31079|1934|
|Barbara|   F|29232|1934|
|Shirley|   F|22836|1934|
|Dorothy|   F|21280|1934|
+-------+----+-----+----+
only showing top 5 rows



Código utilizado na AWS: 

Inicialmente, configurei os contextos necessários, depois li o arquivo CSV armazenado no S3 como um DynamicFrame e o converti para DataFrame para facilitar a aplicação de filtros. Apliquei um filtro para manter apenas as linhas em que o valor da coluna ano fosse igual a 1934, após aplicar o filtro, converti o DataFrame novamente para DynamicFrame e escrevi o resultado no S3 em formato Parquet, utilizando os parâmetros definidos para origem e destino no próprio Glue Job, finalizei a execução com o job.commit().

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_INPUT_PATH', 'S3_TARGET_PATH'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


df_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [args['S3_INPUT_PATH']]},
    format="csv",
    format_options={"withHeader": True, "separator": ","}
)

df = df_dyf.toDF()

df_filtrado = df.filter(df["ano"] == 1934)

df_final = DynamicFrame.fromDF(df_filtrado, glueContext, "df_final")

 glueContext.write_dynamic_frame.from_options(
    frame=df_final,
    connection_type="s3",
    connection_options={"path": args['S3_TARGET_PATH']},
    format="parquet"
)

job.commit()


# Etapa 5.2

Testando o código localmente

 iniciei o processamento local no Jupyter Notebook criando uma sessão Spark com as configurações do Java, depois realizei a leitura do arquivo nomes.csv, utilizando inferência automática de schema. Após verificar a estrutura com o método .printSchema(), apliquei uma transformação na coluna nome, convertendo todos os valores para letras maiúsculas usando a função upper(). Por fim, visualizei as 5 primeiras linhas para validar a leitura e transformação.

In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper, col

os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk1.8.0_202"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "\\bin;" + os.environ["PATH"]

spark = SparkSession.builder \
    .appName("Etapa7_Parte1") \
    .getOrCreate()

caminho_csv = "nomes.csv"

df = spark.read.csv(caminho_csv, header=True, sep=",", inferSchema=True)

df = df.withColumn("nome", upper(col("nome")))

df.printSchema()

df.show(5)



root
 |-- nome: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- total: integer (nullable = true)
 |-- ano: integer (nullable = true)

+--------+----+-----+----+
|    nome|sexo|total| ano|
+--------+----+-----+----+
|JENNIFER|   F|54336|1983|
| JESSICA|   F|45278|1983|
|  AMANDA|   F|33752|1983|
|  ASHLEY|   F|33292|1983|
|   SARAH|   F|27228|1983|
+--------+----+-----+----+
only showing top 5 rows



Para fazer a contagem do total de linhas presentes realizei o agrupamento por ano e sexo, somando o total de registros para cada grupo, o resultado foi ordenado pelo ano de forma decrescente, mostrando primeiro os dados mais recentes e finalizei exibindo as primeiras linhas do agrupamento.

In [4]:
total_linhas = df.count()
print(f"Total de linhas no dataframe: {total_linhas}")

df_agrupado = df.groupBy("ano", "sexo").sum("total")
df_agrupado = df_agrupado.withColumnRenamed("sum(total)", "total_registros")
df_agrupado = df_agrupado.orderBy(col("ano").desc())

df_agrupado.show()


Total de linhas no dataframe: 1825433
+----+----+---------------+
| ano|sexo|total_registros|
+----+----+---------------+
|2014|   M|        1901376|
|2014|   F|        1768775|
|2013|   F|        1745339|
|2013|   M|        1881463|
|2012|   F|        1753922|
|2012|   M|        1889414|
|2011|   F|        1753500|
|2011|   M|        1893230|
|2010|   M|        1913851|
|2010|   F|        1772738|
|2009|   M|        1979303|
|2009|   F|        1832925|
|2008|   M|        2036289|
|2008|   F|        1887234|
|2007|   F|        1919408|
|2007|   M|        2072139|
|2006|   M|        2052377|
|2006|   F|        1898463|
|2005|   F|        1845379|
|2005|   M|        1994841|
+----+----+---------------+
only showing top 20 rows



Utilizei a função Window para particionar os dados por sexo e ordenar pelo total de registros em ordem decrescente, apliquei a função row_number para numerar os registros dentro de cada grupo e filtrei para obter o nome com maior registro para cada sexo,assim, pude indentificar o nome feminino e masculino mais popular, juntamente com o ano em que ocorreram seus maiores registros.

In [7]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window_spec = Window.partitionBy("sexo").orderBy(desc("total"))
df_with_rank = df.withColumn("rank", row_number().over(window_spec))
top_names = df_with_rank.filter(col("rank") == 1).select("sexo", "nome", "ano", "total")

top_names.show()


+----+-----+----+-----+
|sexo| nome| ano|total|
+----+-----+----+-----+
|   F|LINDA|1947|99680|
|   M|JAMES|1947|94755|
+----+-----+----+-----+



Realizei o agrupamento dos dados pela coluna ano, somando os valores da coluna total para obter a quantidade total de registros  por ano, oresultado foi ordenado de forma crescente e exibi as 10 primeiras linhas.

In [6]:
from pyspark.sql.functions import sum as _sum

total_por_ano = df.groupBy("ano").agg(_sum("total").alias("total_registros"))
total_por_ano.orderBy("ano").show(10)


+----+---------------+
| ano|total_registros|
+----+---------------+
|1880|         201484|
|1881|         192699|
|1882|         221538|
|1883|         216950|
|1884|         243467|
|1885|         240855|
|1886|         255319|
|1887|         247396|
|1888|         299480|
|1889|         288950|
+----+---------------+
only showing top 10 rows



Código utilizado na AWS: 

No script do AWS Glue, iniciei o job capturando os argumentos de entrada para identificar os caminhos no S3 e o nome do job, criei os contextos necessários  e realizei a leitura do arquivo nomes.csv no S3 como DynamicFrame, convertendo-o para DataFrame para aplicar as transformações. Verifiquei o schema e converti os nomes para letras maiúsculas com a função upper(), depois contei o total de registros e agrupei os dados por ano e sexo, somando os totais e ordenando os anos de forma decrescente. Utilizei uma janela de partição para identificar o nome mais frequente por sexo e ano, após isso, agrupei os dados novamente por ano, somando os totais e ordenando os 10 primeiros anos de forma crescente. Por fim, converti o DataFrame final de volta para DynamicFrame e escrevi os dados no S3 no formato JSON, particionando pelas colunas sexo e ano.

In [8]:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, upper, sum as _sum, row_number
from pyspark.sql.window import Window

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_INPUT_PATH', 'S3_TARGET_PATH'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df_dynamic = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [args['S3_INPUT_PATH']]},
    format="csv",
    format_options={"withHeader": True, "separator": ","}
)

df = df_dynamic.toDF()

df.printSchema()

df_upper = df.withColumn("nome", upper(col("nome")))

print("Total de linhas no dataframe:", df_upper.count())

df_grouped = df_upper.groupBy("ano", "sexo").agg(_sum("total").alias("total_registros"))
df_grouped.orderBy(col("ano").desc()).show()

window_spec = Window.partitionBy("sexo").orderBy(col("total").desc())
df_ranked = df_upper.withColumn("rank", row_number().over(window_spec))
df_ranked.filter(col("rank") == 1).select("sexo", "nome", "ano", "total").show()

df_total_ano = df_upper.groupBy("ano").agg(_sum("total").alias("total_registros"))
df_total_ano.orderBy("ano").show(10)

df_final = DynamicFrame.fromDF(df_upper, glueContext, "df_final")

glueContext.write_dynamic_frame.from_options(
    frame=df_final,
    connection_type="s3",
    connection_options={
        "path": args['S3_TARGET_PATH'],
        "partitionKeys": ["sexo", "ano"]
    },
    format="json"
)

job.commit()

ModuleNotFoundError: No module named 'awsglue'