In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
!hdfs dfs -cat /user/silas/data/juros_selic/juros_selic

In [2]:
juros = spark.read.csv("/user/silas/data/juros_selic/juros_selic", sep=";", header="true")

In [None]:
#Alterar o formato do campo data para “MM/dd/yyyy”
juros.show(5)


In [3]:
juros_americano = juros.withColumn("data", unix_timestamp(col("data"), "dd/MM/yyyy"))
juros_americano.show(5)
juros_americano.printSchema()

+---------+-----+
|     data|valor|
+---------+-----+
|517968000| 1,27|
|520560000| 1,95|
|523238400| 2,57|
|525916800| 2,94|
|528508800| 1,96|
+---------+-----+
only showing top 5 rows

root
 |-- data: long (nullable = true)
 |-- valor: string (nullable = true)



In [4]:
juros_americano_new = juros_americano.withColumn("data", from_unixtime("data", "MM/dd/yyyy"))
#juros_americano_new.show(5)

In [5]:
#Com uso da função from_unixtime crie o campo “ano_unix”, com a informação do ano do campo data
juros_unixtime = juros_americano_new.withColumn("ano_unix",from_unixtime(unix_timestamp(col("data"),"dd/MM/yyy"),"yyyy"))
#juros_unixtime.show(3)

In [6]:
#Com uso de substring crie o campo “ano_str”, com a informação do ano do campo data
juros_substring = juros_unixtime.withColumn("ano_str", substring(col("data"),7,4))
#juros_substring.show(3)

In [7]:
#Com uso da função split crie o campo “ano_str”, com a informação do ano do campo data
juros_split = juros_substring.withColumn("ano_split", split(col("data"),"/").getItem(2))
#juros_split.show(3)

In [9]:
juros_split.write.csv("/user/silas/juros_selic", header="true")

In [11]:
#Criar um dataframe para ler o arquivo no HDFS /user/<nome/data/juros_selic/juros_selic
juros_selic = spark.read.csv("/user/silas/data/juros_selic/juros_selic", header="true", sep=";")
#juros_selic.show()

In [13]:
#Agrupar todas as datas pelo ano em ordem decrescente e salvar a quantidade de meses ocorridos, o valor médio, 
#mínimo e máximo do campo valor com a seguinte estrutura:
juros_ano = juros_selic.withColumn("ano", split(col("data"), "/").getItem(2))
juros_valor = juros_ano.withColumn("valor", regexp_replace(col("valor"),"\,","\.").cast(FloatType()))
juros_valor.printSchema()



root
 |-- data: string (nullable = true)
 |-- valor: float (nullable = true)
 |-- ano: string (nullable = true)



In [15]:
juros_relatorio = juros_valor.groupBy("ano").agg(count("ano").alias("Meses"),
                                                 format_number(avg("valor"),2).alias("Valor médio"),
                                                 min("valor").alias("Valor minimo"),
                                                 max("valor").alias("Valor máximo)")).sort(asc("ano"))
juros_relatorio.show(5)

+----+-----+-----------+------------+-------------+
| ano|Meses|Valor médio|Valor minimo|Valor máximo)|
+----+-----+-----------+------------+-------------+
|1986|    7|       2.65|        1.27|         5.47|
|1987|   12|      13.52|        7.99|        24.63|
|1988|   12|      22.73|       16.59|        30.24|
|1989|   12|      31.68|       11.43|        64.21|
|1990|   12|      25.40|        4.23|        82.04|
+----+-----+-----------+------------+-------------+
only showing top 5 rows



In [None]:
#Salvar no hdfs:///user/<nome>/relatorioAnual com compressão zlib e formato orc
#juros_relatorio.write.orc("user/silas/relatorio_anual", compression="zlib")

In [None]:
!hdfs dfs -ls hdfs://namenode:8020/user/root/user/silas/relatorio_anual