<a href="https://colab.research.google.com/github/pedroafleite/data_etl/blob/main/data_etl_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark

from google.colab import drive
drive.mount('/content/drive')

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=bb91adee1cb59c268f4c73db8502e5ad859d882a09d77a7c9e0d1d34f919f64f
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Mounted at /content/drive


In [3]:
import pandas as pd
from pyspark.sql import SparkSession
import json

from pyspark.sql.types import StructType, StructField, ArrayType
from pyspark.sql.types import StringType, DoubleType, BooleanType

from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
schema_infomix = StructType([
    StructField("cnpj", StringType(), True),
    StructField("gtin", StringType(), True),
    StructField("category", StringType(), True)
])

In [6]:
infomix = spark.read.option("delimiter", "\t").csv(
    "/content/drive/MyDrive/Colab Notebooks/data_etl/data_etl_test/infomix.tsv", 
    header=True, 
    mode="DROPMALFORMED", 
    schema=schema_infomix
)

In [7]:
infomix.show()

+--------------+--------------+--------------------+
|          cnpj|          gtin|            category|
+--------------+--------------+--------------------+
|08540603000198|06890101502534|     massas e molhos|
|71322150004408|07898605250417|            cervejas|
|43235985000732|07896000724465|              cabelo|
|47508411134004|07896102589122|conservas e enlat...|
|47508411135833|07895000474646|          bomboniere|
|09379531000101|05893202502558|     massas e molhos|
|47508411135833|07891008188271|          bomboniere|
|39346861026802|07891025112983|iogurtes e sobrem...|
|47508411134004|07896262304368|          bomboniere|
|41185455001501|07896085393488|               leite|
|75315333008002|07896036095089|     massas e molhos|
|06057223023112|07898909755533|  doces e sobremesas|
|06057223023112|07896102503708|     massas e molhos|
|75315333008002|07896038308057|               arroz|
|06057223023112|07898027658112|  doces e sobremesas|
|00063960023575|07896005277195|     massas e m

In [8]:
schema_cnpj = StructType([
    StructField("cnpj", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("id", StringType(), True),
    StructField("response", StructType([
      StructField("abertura", StringType(), True),
      StructField("atividade_principal", ArrayType(StructType([
        StructField("code", StringType(), True),
        StructField("text", StringType(), True)                                                           
      ])), True),
      StructField("atividades_secundarias", ArrayType(StructType([
        StructField("code", StringType(), True),
        StructField("text", StringType(), True)                                          
      ])), True),
      StructField("bairro", StringType(), True),
      StructField("billing", StructType([
        StructField("database", BooleanType(), True),
        StructField("free", BooleanType(), True)                       
      ]), True),
      StructField("capital_social", StringType(), True),
      StructField("cep", StringType(), True),
      StructField("cnpj", StringType(), True),
      StructField("complemento", StringType(), True),
      StructField("data_situacao", StringType(), True),
      StructField("data_situacao_especial", StringType(), True),
      StructField("efr", StringType(), True),
      StructField("email", StringType(), True),
      StructField("fantasia", StringType(), True),
      StructField("google_place_id", StringType(), True),
      StructField("latitude", StringType(), True),
      StructField("logradouro", StringType(), True),
      StructField("longitude", StringType(), True),
      StructField("motivo_situacao", StringType(), True),
      StructField("municipio", StringType(), True),
      StructField("natureza_juridica", StringType(), True),
      StructField("nome", StringType(), True),
      StructField("numero", StringType(), True),
      StructField("qsa", ArrayType(StructType([
        StructField("nome", StringType(), True),
        StructField("qual", StringType(), True)                                          
      ])), True),
      StructField("situacao", StringType(), True),
      StructField("situacao_especial", StringType(), True),
      StructField("status", StringType(), True),
      StructField("telefone", StringType(), True),
      StructField("tipo", StringType(), True),
      StructField("uf", StringType(), True),
      StructField("ultima_atualizacao", StringType(), True)
    ]), True)
  ])

In [9]:
df = spark.createDataFrame(schema=schema_cnpj, data={})

In [10]:
df.printSchema()

root
 |-- cnpj: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- response: struct (nullable = true)
 |    |-- abertura: string (nullable = true)
 |    |-- atividade_principal: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- atividades_secundarias: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- billing: struct (nullable = true)
 |    |    |-- database: boolean (nullable = true)
 |    |    |-- free: boolean (nullable = true)
 |    |-- capital_social: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cnpj: string (nullable = true)
 |    |-- complemento: string (nullable = true)
 |    |-- 

In [11]:
data = []
with open("/content/drive/MyDrive/Colab Notebooks/data_etl/data_etl_test/cnpjs_receita_federal.jl") as f:
    for line in f:
        data.append(json.loads(line))

In [12]:
cnpj = spark.createDataFrame(
    data=data,
    schema=schema_cnpj
)

cnpj.show()

+--------------+-------------------+--------------------+--------------------+
|          cnpj|         created_at|                  id|            response|
+--------------+-------------------+--------------------+--------------------+
|68093095000179|2018-10-19 21:36:11|99068e55-1018-47e...|{01/07/1992, [{10...|
|13492669000190|2018-10-20 17:17:03|c475b934-eef1-488...|{17/03/2011, [{46...|
|74683392000177|2018-11-21 08:21:30|e013ecef-c884-415...|{29/04/1994, [{47...|
|12423658000195|2018-10-19 21:20:15|03bc0063-69d6-438...|{20/08/2010, [{10...|
|00588458000103|2018-10-15 12:36:16|07e1f38f-cef6-49d...|{31/03/1995, [{10...|
|61365557000110|2018-10-18 21:15:05|58dbdec7-2502-43e...|{29/07/1966, [{10...|
|75315333008274|2018-09-26 10:02:18|17266223-0c0b-4d8...|{12/09/2008, [{47...|
|33033028000184|2018-08-24 01:25:49|717e6f70-7058-49b...|{24/01/1973, [{82...|
|04939545000119|2018-10-18 21:52:18|0c116d6a-1708-47a...|{04/03/2002, [{47...|
|46842894000591|2018-10-19 22:08:45|6551306a-b8b8-47

In [13]:
df_cnpj = cnpj.select("cnpj", "response.nome", "response.municipio", "response.uf")
df_cnpj.show(truncate=False)

+--------------+----------------------------------------------------------------------------+------------------------+---+
|cnpj          |nome                                                                        |municipio               |uf |
+--------------+----------------------------------------------------------------------------+------------------------+---+
|68093095000179|FROOTY COMERCIO E INDUSTRIA DE ALIMENTOS S.A.                               |ATIBAIA                 |SP |
|13492669000190|ESTRELLA DE GALICIA IMPORTACAO E COMERCIALIZACAO DE BEBIDAS E ALIMENTOS LTDA|SAO PAULO               |SP |
|74683392000177|DROGARIA NOVA SABARA LTDA                                                   |SAO PAULO               |SP |
|12423658000195|DJ INDUSTRIA E COMERCIO DE ALIMENTOS LTDA                                   |CARPINA                 |PE |
|00588458000103|FUGINI ALIMENTOS LTDA                                                       |MONTE ALTO              |SP |
|61365557000110|

In [14]:
schema_descricoes = StructType([
    StructField("id", StringType(), True),
    StructField("gtin", StringType(), True),
    StructField("description", StringType(), True),
    StructField("flag_infoprice", StringType(), True)
])

In [15]:
descricoes = spark.read.option("delimiter", "\t").csv(
    "/content/drive/MyDrive/Colab Notebooks/data_etl/data_etl_test/descricoes_externas.tsv", 
    header=True, 
    mode="DROPMALFORMED", 
    schema=schema_descricoes
)

descricoes.show()

+-------+--------------+--------------------+--------------+
|     id|          gtin|         description|flag_infoprice|
+-------+--------------+--------------------+--------------+
|2482686|07896512911933|SABONETE INFANTIL...|          true|
|2482685|07896512911933|SAB GRANADO BEBE ...|         false|
|2482684|07896512911933|SAB LIQ.GRAN.BEBE...|         false|
|2482683|07896512911933|SAB GRANADO GLICL...|         false|
|2482682|07896512911933|SAB BABY GRA LA 2...|         false|
|2482681|07896512911933|SABONETE LIQ GRAN...|         false|
|2057830|07896005286579|MACARRAO COM OVOS...|          true|
|2057829|07896005286579|M.D.BENTA ESPAGUE...|         false|
|2057828|07896005286579|MAC ESPAG DONA BE...|         false|
|2057827|07896005286579|ESPAGUETE DONA BE...|         false|
|2057826|07896005286579|MAC ESP C OVOS DO...|         false|
|2057825|07896005286579|MAC D.BENTA C/OVO...|         false|
|3133507|07898142855823|CHOCOLATE BOMBOM ...|          true|
|3133506|07898142855823|

In [16]:
schema_cosmos = StructType([
    StructField("created_at", StringType(), True),
    StructField("gtin", StringType(), True),
    StructField("id", StringType(), True),
    StructField("response", StructType([
      StructField("autited", StringType(), True),
      StructField("brand", StringType(), True),
      StructField("description", StringType(), True),
      StructField("gcp", StructType([
        StructField("code", StringType(), True),
        StructField("description", StringType(), True)                     
      ]), True),
      StructField("gtin", StringType(), True),
      StructField("image", StringType(), True),
      StructField("ncm", StructType([
        StructField("code", StringType(), True),
        StructField("description", StringType(), True)                     
      ]), True),
      StructField("status", StringType(), True),
      StructField("units", ArrayType(StructType([
        StructField("gtin", StringType(), True),
        StructField("packing_size", StringType(), True),
        StructField("type", StringType(), True)                                                        
      ])), True)
    ]), True)
  ])

In [17]:
data_cosmos = []
with open("/content/drive/MyDrive/Colab Notebooks/data_etl/data_etl_test/cosmos.jl") as f:
    for line in f:
        data_cosmos.append(json.loads(line))

In [18]:
cosmos = spark.createDataFrame(
    data=data_cosmos,
    schema=schema_cosmos
)

cosmos.show()

+-------------------+--------------+--------------------+--------------------+
|         created_at|          gtin|                  id|            response|
+-------------------+--------------+--------------------+--------------------+
|2018-10-22 19:24:28|07896040700276|16bde255-988c-466...|{, null, LAVA ROU...|
|2018-10-20 00:37:14|00000078909434|96f61c2b-5057-439...|{AUDITADO, FERRER...|
|2018-10-25 04:44:33|07898917945100|8ee42cd3-4200-45d...|{null, null, null...|
|2018-10-23 05:23:08|27702018072396|bda1d82f-cfad-482...|{null, null, null...|
|2019-02-07 21:53:39|07896005279595|427b4261-d73d-4c1...|{null, null, null...|
|2018-10-22 19:32:16|07896221600036|826540b0-f18e-449...|{NÃO AUDITADO, nu...|
|2018-10-21 16:09:29|07891025113157|5a1bf587-0362-468...|{null, null, null...|
|2018-10-21 15:25:41|07896259412885|286a0add-c333-4b2...|{null, null, null...|
|2018-10-21 15:24:02|07891150057937|097235d0-51e5-4c8...|{null, null, null...|
|2018-10-23 03:08:26|00891454037366|629e26c3-2c08-40

In [19]:
df_cosmos = cosmos.select("gtin", "response.description")
df_cosmos = df_cosmos.withColumnRenamed('description', 'description_cosmos')
df_cosmos.show()

+--------------+--------------------+
|          gtin|  description_cosmos|
+--------------+--------------------+
|07896040700276|LAVA ROUPAS COQUE...|
|00000078909434|BOMBOM FERRERO RO...|
|07898917945100|                null|
|27702018072396|                null|
|07896005279595|                null|
|07896221600036|AGUA SANITARIA DR...|
|07891025113157|                null|
|07896259412885|                null|
|07891150057937|                null|
|00891454037366|                null|
|07891150038554|REFIL LIMPADOR CI...|
|07500435125772|                null|
|08076809571944|                null|
|04896240121041|                null|
|07891080402166|     MARG PRIMOR 3KG|
|07891150060722|                null|
|08480017017840|RACAO DIA BIFINHO...|
|07898525450072|                null|
|07891050002747|                null|
|07896108300226|VINAGRE REGINA 75...|
+--------------+--------------------+
only showing top 20 rows



In [20]:
schema_gs1 = StructType([
    StructField("gtin", StringType(), True),
    StructField("response", StructType([
      StructField("status", StringType(), True)                              
    ]), True)
  ])

In [21]:
data_gs1 = []
with open("/content/drive/MyDrive/Colab Notebooks/data_etl/data_etl_test/gs1.jl") as f:
    for line in f:
        data_gs1.append(json.loads(line))

In [23]:
gs1 = spark.createDataFrame(
    data=data_gs1,
    schema=schema_gs1
)


gs1.select("gtin", "response.status").show()

+--------------+------+
|          gtin|status|
+--------------+------+
|07891000258491|    OK|
|07898142863118|    OK|
|07896005213018|    OK|
|08008245003420|    OK|
|07898043230798|    OK|
|07891000107508|    OK|
|07898292888436|    OK|
|07896051115014|    OK|
|07896281700745|    OK|
|07790070225580|    OK|
|07896037916338|    OK|
|07898943163110|    OK|
|05000299225028|    OK|
|07895800412794|    OK|
|07506295369523|    OK|
|07891132001019|    OK|
|07896022201746|    OK|
|07891203063359|    OK|
|07891737243456|    OK|
|07898403780062|    OK|
+--------------+------+
only showing top 20 rows



In [24]:
df = infomix.join(df_cnpj, infomix['cnpj'] == df_cnpj['cnpj'], "inner").drop(df_cnpj['cnpj'])\
  .join(descricoes, infomix['gtin'] == descricoes['gtin'], "inner").drop(descricoes['gtin'])\
  .join(df_cosmos, infomix['gtin'] == df_cosmos['gtin'], "inner").drop(df_cosmos['gtin'])\
  .join(gs1, infomix['gtin'] == gs1['gtin'], "inner").drop(gs1['gtin'])

# validacao consiste em verificar se o status na tabela gs1 está OK
df = df.filter(df['response.status'] == "OK")
df = df.dropDuplicates(["cnpj", "gtin"]) # passei um dropduplicates aqui, pois imagino que não haverá problemas. 
df = df.select("cnpj", "gtin", "nome", "municipio", "uf", coalesce(df['description'],df['description_cosmos']), "category")
df = df.withColumnRenamed('coalesce(description, description_cosmos)', 'description')
df.show()

+--------------+--------------+--------------------+--------------------+---+--------------------+--------------------+
|          cnpj|          gtin|                nome|           municipio| uf|         description|            category|
+--------------+--------------+--------------------+--------------------+---+--------------------+--------------------+
|00063960005917|00000078905320|WAL MART BRASIL LTDA|SAO JOSE DO RIO P...| SP|CERVEJA SERRAMALT...|            cervejas|
|00063960005917|00000078905351|WAL MART BRASIL LTDA|SAO JOSE DO RIO P...| SP|CERVEJA ANTARTICA...|            cervejas|
|00063960005917|00084380957345|WAL MART BRASIL LTDA|SAO JOSE DO RIO P...| SP|     GELE ST.STRA 28|  doces e sobremesas|
|00063960005917|00084380957543|WAL MART BRASIL LTDA|SAO JOSE DO RIO P...| SP|ST DALFOUR GELEIA...|  doces e sobremesas|
|00063960005917|03045320518757|WAL MART BRASIL LTDA|SAO JOSE DO RIO P...| SP|GELEIA FRA BONNE ...|  doces e sobremesas|
|00063960005917|04006894111804|WAL MART 

In [26]:
df.write.parquet("/content/drive/MyDrive/Colab Notebooks/data_etl/output/df.parquet")