In [52]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.util.Properties
import java.io.File
import java.nio.file.{Files, Paths, StandardCopyOption}

In [53]:
object Conexao {
  def criarSessaoSpark(): SparkSession = {
    val spark = SparkSession.builder()
      .appName("ETL")
      .master("spark://spark-master:7077")
      .config("spark.jars", "/opt/bitnami/spark/jars/mysql-connector-j-8.0.33.jar")
      .getOrCreate()
    spark
  }

  def propriedadesJDBC(): Properties = {
    val jdbcProperties = new Properties()
    jdbcProperties.setProperty("user", "sparkuser")
    jdbcProperties.setProperty("password", "sparkpass")
    jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
    jdbcProperties
  }

  val jdbcUrl: String = "jdbc:mysql://mysql:3306/desafio"
}

defined object Conexao


In [54]:
val associados = spark.read.jdbc(Conexao.jdbcUrl, "associado", Conexao.propriedadesJDBC())

associados = [id: int, nome: string ... 3 more fields]


[id: int, nome: string ... 3 more fields]

In [55]:
val contas = spark.read.jdbc(Conexao.jdbcUrl, "conta", Conexao.propriedadesJDBC())

contas = [id: int, tipo_conta: string ... 2 more fields]


[id: int, tipo_conta: string ... 2 more fields]

In [56]:
val cartoes = spark.read.jdbc(Conexao.jdbcUrl, "cartao", Conexao.propriedadesJDBC())

cartoes = [id: int, num_cartao: int ... 3 more fields]


[id: int, num_cartao: int ... 3 more fields]

In [57]:
val movimentos = spark.read.jdbc(Conexao.jdbcUrl, "movimento", Conexao.propriedadesJDBC())

movimentos = [id: int, vlr_transacao: decimal(10,2) ... 3 more fields]


[id: int, vlr_transacao: decimal(10,2) ... 3 more fields]

In [58]:
val movimento_flat_join = associados
  .join(contas, associados("id") === contas("id_associado"), "inner")
  .join(cartoes, associados("id") === cartoes("id_associado") && contas("id") === cartoes("id_conta"), "inner")
  .join(movimentos, cartoes("id") === movimentos("id_cartao"), "inner")


movimento_flat_join = [id: int, nome: string ... 17 more fields]


[id: int, nome: string ... 17 more fields]

In [59]:
val movimento_flat_columns = movimento_flat_join.select(
    col("nome").as("nome_associado"),
    col("sobrenome").as("sobrenome_associado"),
    col("idade").as("idade_associado"),
    col("vlr_transacao")as("vlr_transacao_movimento"),
    col("des_transacao")as("des_transacao_movimento"),
    col("data_movimento"),
    col("num_cartao")as("numero_cartao"),
    col("nom_impresso")as("nome_impresso_cartao"),
    col("tipo_conta"),
    col("data_criacao")as("data_criacao_conta")
    )
    


movimento_flat_columns = [nome_associado: string, sobrenome_associado: string ... 8 more fields]


[nome_associado: string, sobrenome_associado: string ... 8 more fields]

In [65]:
val movimento_flat= movimento_flat_columns.select(
    col("nome_associado").cast("string").as("nome_associado"),
    col("sobrenome_associado").cast("string").as("sobrenome_associado"),
    col("idade_associado").cast("string").as("idade_associado"),
    col("vlr_transacao_movimento").cast("string").as("vlr_transacao_movimento"),
    col("des_transacao_movimento").cast("string").as("des_transacao_movimento"),
    col("data_movimento").cast("string").as("data_movimento"),
    col("numero_cartao").cast("string").as("numero_cartao"),
    col("nome_impresso_cartao").cast("string").as("nome_impresso_cartao"),
    col("tipo_conta").cast("string").as("tipo_conta"),
    col("data_criacao_conta").cast("string").as("data_criacao_conta")
)

lastException = null
movimento_flat = [nome_associado: string, sobrenome_associado: string ... 8 more fields]


[nome_associado: string, sobrenome_associado: string ... 8 more fields]

In [66]:
// movimento_flat_cast.show()

Syntax Error.: 

In [68]:
val nome = "movimento_flat_scala.csv"
val dir_destino = "csv"

val dadosProcessados = movimento_flat.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .option("encoding", "ISO-8859-1")
  .csv(dir_destino)

val dir_saida = new File(dir_destino)
val arquivo = dir_saida.listFiles()
  .filter(arquivo => arquivo.getName.startsWith("part-") && arquivo.getName.endsWith(".csv"))
  .head  // Pega o primeiro (e único) CSV

Files.move(
  arquivo.toPath,
  Paths.get(s"$dir_destino/$nome"),
  StandardCopyOption.REPLACE_EXISTING
)

println(s"Arquivo salvo como $dir_destino/$nome")

Arquivo salvo como csv/movimento_flat.csv


lastException = null
dir_destino = csv
dadosProcessados = ()
dir_saida = csv
arquivo = csv/part-00000-aa594702-4feb-4e01-98c0-da54800e16ad-c000.csv
nome = movimento_flat.csv


movimento_flat.csv