In [2]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

# iniciar uma sessão local e importar dados do Airbnb
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
import json
from pyspark.sql import functions as F

### Entregavel 2

A primeira avaliação feita foi para buscar dados faltantes no dataset.
Neste caso temos um dado de desconto faltante na transação 4, constatado isso, tomei a decisão de usar a função fillna para substituir o valor por 0 pois não afetaria nosso cálculo posterior.

Como não temos nenhuma linha com o dado de total_bruto faltante não foi necessário fazer um filtro para isso. Caso existisse minha indicação seria criar uma validação para esse dado e um alerta na pipeline para que o possível erro possa ser investigado.

In [10]:
transacoes = [
    {'transacao_id':1, 'total_bruto':3000, 'percentual_desconto': 6.99},
    {'transacao_id':2, 'total_bruto':57989, 'percentual_desconto': 1.45},
    {'transacao_id':4, 'total_bruto':1, 'percentual_desconto': None},
    {'transacao_id':5, 'total_bruto':34, 'percentual_desconto': 0.0}
    ]

df_transacoes = spark.createDataFrame(transacoes)

df_transacoes = df_transacoes.fillna(0)
df_transacoes = df_transacoes.withColumn('res', df_transacoes['total_bruto'] - (df_transacoes['total_bruto'] * (df_transacoes['percentual_desconto']/100)))

lucro = round(df_transacoes.select(F.sum('res')).collect()[0][0],2)

print(f"Lucro: R$ {lucro}")



Lucro: R$ 59973.46


### Entregavel 3.1

In [5]:
data = [
   {
      "CreateDate":"2021-05-24T20:21:34.79",
      "EmissionDate":"2021-05-24T00:00:00",
      "Discount":0.0,
      "NFeNumber":501,
      "NFeID":1,
      "ItemList":[
         {
            "ProductName":"Rice",
            "Value":35.55,
            "Quantity":2
         },
         {
            "ProductName":"Flour",
            "Value":11.55,
            "Quantity":5
         },
         {
            "ProductName":"Bean",
            "Value":27.15,
            "Quantity":7
         }
      ]
   },
   {
      "CreateDate":"2021-05-24T20:21:34.79",
      "EmissionDate":"2021-05-24T00:00:00",
      "Discount":0.0,
      "NFeNumber":502,
      "NFeID":2,
      "ItemList":[
         {
            "ProductName":"Tomate",
            "Value":12.25,
            "Quantity":10
         },
         {
            "ProductName":"Pasta",
            "Value":7.55,
            "Quantity":5
         }
      ]
   },
   {
      "CreateDate":"2021-05-24T20:21:34.79",
      "EmissionDate":"2021-05-24T00:00:00",
      "Discount":0.0,
      "NFeNumber":503,
      "NFeID":3,
      "ItemList":[
         {
            "ProductName":"Beer",
            "Value":9.00,
            "Quantity":6
         },
         {
            "ProductName":"French fries",
            "Value":10.99,
            "Quantity":2
         },
         {
            "ProductName":"Ice cream",
            "Value":27.15,
            "Quantity":1
         }
      ]
   }
]

No primeiro entregável do item 3 optei por usar um modelo parecido com o modelo de OBT usando a quantidade de itens na lista para construir o DataFrame.
minha decisão foi essa por entender que apenas usar o explode na coluna ItemList eu teria um resultado muito similar ao segundo entregavel descrito abaixo.

In [6]:
df = spark.createDataFrame(data)
df_tmp = df.withColumn('tamanho', F.size(df.ItemList))
colunas = df_tmp.select(F.max(df_tmp.tamanho)).take(1)[0][0]

for i in range(colunas):
  df = df.withColumn(f'ValorProduto{i}', df.ItemList[i]['Value'])
  df = df.withColumn(f'NomeProduto{i}', df.ItemList[i]['ProductName'])
  df = df.withColumn(f'QuantityProduto{i}', df.ItemList[i]['Quantity'])

df = df.drop(df.ItemList)

df.show()






+--------------------+--------+-------------------+-----+---------+-------------+------------+----------------+-------------+------------+----------------+-------------+------------+----------------+
|          CreateDate|Discount|       EmissionDate|NFeID|NFeNumber|ValorProduto0|NomeProduto0|QuantityProduto0|ValorProduto1|NomeProduto1|QuantityProduto1|ValorProduto2|NomeProduto2|QuantityProduto2|
+--------------------+--------+-------------------+-----+---------+-------------+------------+----------------+-------------+------------+----------------+-------------+------------+----------------+
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    1|      501|        35.55|        Rice|               2|        11.55|       Flour|               5|        27.15|        Bean|               7|
|2021-05-24T20:21:...|     0.0|2021-05-24T00:00:00|    2|      502|        12.25|      Tomate|              10|         7.55|       Pasta|               5|         null|        null|            null|


### Entregavel 3.2

Para o segundo entregável do item 3 optei por fazer a operação de explode na coluna ItemList e usar a função getItem para acessar os dados dentro do dicionário resultante.

Com isso temos 2 tabelas como resultado do entregável

- df_itens:
  - CreateDate
  - NFeNumber
  - Value
  - Quantity
  - ProductName

- df_nfe:
  - CreateDate
  - EmissionDate
  - NFeNumber
  - Discount
  - EmissionDate

O join entre as tabelas pode ser feito através das colunas CreateDate e NFeNumber, optei por manter duas colunas para o join pois em um ambiente de muitas requisições podemos ter duas requisições no mesmo timestamp e neste cenário fazer o join apenas pela coluna de CreateDate geraria um problema

In [7]:
df = spark.createDataFrame(data)
df_itens = df.select(df.CreateDate, df.NFeNumber, F.explode(df.ItemList))

df_itens = df_itens.withColumn("Value", df_itens.col.getItem("Value")) \
        .withColumn("Quantity", df_itens.col.getItem("Quantity")) \
        .withColumn("ProductName", df_itens.col.getItem("ProductName")) \
        .drop("col")

df_itens.show(truncate=False)



+----------------------+---------+-----+--------+------------+
|CreateDate            |NFeNumber|Value|Quantity|ProductName |
+----------------------+---------+-----+--------+------------+
|2021-05-24T20:21:34.79|501      |35.55|2       |Rice        |
|2021-05-24T20:21:34.79|501      |11.55|5       |Flour       |
|2021-05-24T20:21:34.79|501      |27.15|7       |Bean        |
|2021-05-24T20:21:34.79|502      |12.25|10      |Tomate      |
|2021-05-24T20:21:34.79|502      |7.55 |5       |Pasta       |
|2021-05-24T20:21:34.79|503      |9.0  |6       |Beer        |
|2021-05-24T20:21:34.79|503      |10.99|2       |French fries|
|2021-05-24T20:21:34.79|503      |27.15|1       |Ice cream   |
+----------------------+---------+-----+--------+------------+



In [9]:
df_nf = df.select(df.CreateDate, df.NFeNumber, df.NFeID, df.Discount, df.EmissionDate)
df_nf.show(10)

+--------------------+---------+-----+--------+-------------------+
|          CreateDate|NFeNumber|NFeID|Discount|       EmissionDate|
+--------------------+---------+-----+--------+-------------------+
|2021-05-24T20:21:...|      501|    1|     0.0|2021-05-24T00:00:00|
|2021-05-24T20:21:...|      502|    2|     0.0|2021-05-24T00:00:00|
|2021-05-24T20:21:...|      503|    3|     0.0|2021-05-24T00:00:00|
+--------------------+---------+-----+--------+-------------------+



### Entregavel 4