# Primeiro passo é importar as funções necessárias do framework Pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Em seguida, iniciamos a sessão Spark (SparkSession), montando-a, dando um nome para a aplicação e a configurando. Nesse momento, na parte de configurações, utilizamos o driver JDBC, que é um driver fornecido pela Microsoft que tem como função fazer a conexão entre o Pyspark e o SQL Server. Fornecemos o caminho local do arquivo nas configurações.

In [None]:
spark = SparkSession.builder \
    .appName("Calculo do Total Liquido") \
    .config("spark.jars", "CaminhoLocalDoDriver\mssql-jdbc-12.4.2.jre8.jar") \
    .getOrCreate()

# Agora, carregamos os dados do banco de dados criado com a Stored Procedure fornecida pelo repositório do teste técnico:

In [None]:
df_contrato = spark.read.format("jdbc") \
    .option("url", "jdbc:sqlserver://localhost:XXXX;databaseName=desafio_engenheiro;integratedSecurity=true") \
    .option("dbtable", "dbo.contrato") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

df_transacao = spark.read.format("jdbc") \
    .option("url", "jdbc:sqlserver://localhost:XXXX;databaseName=desafio_engenheiro;integratedSecurity=true") \
    .option("dbtable", "dbo.transacao") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

# Fazemos o "Join", ou seja, juntamos os dois dataframes para fins de efetuamos o cálculo do total líquido:

In [None]:
df_joined = df_transacao.join(df_contrato, df_transacao.contrato_id == df_contrato.contrato_id)

# Efetuamos o cálculo do total líquido da empresa através da lógica fornecida pelo teste técnico:


In [None]:
df_total_liquido = df_joined.withColumn("total_liquido", col("valor_total") - (col("valor_total") * (col("percentual_desconto") / 100)))
total_liquido = df_total_liquido.agg(sum("total_liquido")).collect()[0][0]

# Finalmente, mostramos o resultado final, isto é, o Total Líquido da empresa:


In [None]:
print(f"Total Líquido: {total_liquido}")

# Por fim, encerramos a Sessão Spark:


In [None]:
spark.stop()