###  Olist dataset

##### Consumers dataset
https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce?select=olist_customers_dataset.csv

##### Sellers dataset
https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce?select=olist_sellers_dataset.csv

##### Geolocation dataset
https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce?select=olist_geolocation_dataset.csv

**OBJETIVO**

Ao final desse notebook uma tabela de geolocalizacao deve ser gerada no Hive com todos os seus dados atualizados. 


### B) Validação do geolocation dataset

- carregar os dados do geolocation dataset para dataframe **(geolocation_df)** 

- validar se existe CEPs nulos em **(geolocation_df)**
    - caso seja encontrado, analisar o percentual em relação ao total de registros. 
        - Remover esses registros dos dataframes por questão de facilidade e celeridade por se tratar de trabalho academico. Entretanto em projeto real, isso seria validado de alguma maneira (talvez analisando o percentual de casos em relacao ao total de registros e vendo uma forma de obter os valores corretos vindos da fonte)

- validar se existe CEPs com menos de 5 digitos no **(geolocation_df)**
    - caso seja encontrado, analisar o percentual em relação ao total de registros. 
        - colocar em dataframe separado para analisar se vai levar ou não em consideração.




In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length, udf
from pyspark.sql.types import StructType, StringType, DoubleType
import time

spark = SparkSession.builder\
    .master("local") \
    .appName("update_geolocalization")\
    .enableHiveSupport()\
    .getOrCreate()

In [2]:
schema = StructType() \
      .add("geolocation_zip_code_prefix",StringType(),True) \
      .add("geolocation_lat",DoubleType(),True) \
      .add("geolocation_lng",DoubleType(),True) \
      .add("geolocation_city",StringType(),True) \
      .add("geolocation_state",StringType(),True) 


geolocation_df = spark.read.option("header",True) \
                       .option("delimiter",",") \
                       .schema(schema) \
                       .csv("hdfs://namenode:8020/datalake/olist/raw/csv/olist_geolocation_dataset.csv")

print(geolocation_df.count())
print(geolocation_df.printSchema())
geolocation_df.cache()

1000163
root
 |-- geolocation_zip_code_prefix: string (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)

None


DataFrame[geolocation_zip_code_prefix: string, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string]

In [3]:
geoclocation_cep_distinct_df = geolocation_df.select("geolocation_zip_code_prefix").distinct()
geoclocation_cep_distinct_df.cache()

print(geoclocation_cep_distinct_df.count())


19015


In [4]:
def formatCep(cep):
    cep_formatado = cep
    
    cep_formatado.zfill(5)
    
    if(len(cep_formatado) == 5):
        cep_formatado = cep_formatado + "-000"
        
    return cep_formatado

formatCepUDF = udf(lambda z:formatCep(z),StringType())   


In [5]:
geolocation_formattedCep_distinct_df = geoclocation_cep_distinct_df.select(formatCepUDF(col("geolocation_zip_code_prefix")).alias("geolocation_zip_code_prefix"))\
                                                                .sort("geolocation_zip_code_prefix")

geolocation_formattedCep_distinct_df.cache()


DataFrame[geolocation_zip_code_prefix: string]

In [6]:
geolocation_formattedCep_distinct_df.show()

+---------------------------+
|geolocation_zip_code_prefix|
+---------------------------+
|                  01001-000|
|                  01002-000|
|                  01003-000|
|                  01004-000|
|                  01005-000|
|                  01006-000|
|                  01007-000|
|                  01008-000|
|                  01009-000|
|                  01010-000|
|                  01011-000|
|                  01012-000|
|                  01013-000|
|                  01014-000|
|                  01015-000|
|                  01016-000|
|                  01017-000|
|                  01018-000|
|                  01019-000|
|                  01020-000|
+---------------------------+
only showing top 20 rows



In [34]:
! pip install pycep-correios

Collecting pycep-correios
  Downloading pycep_correios-5.0.0-py2.py3-none-any.whl (7.1 kB)
Collecting zeep>=2.0.0
  Downloading zeep-4.1.0-py2.py3-none-any.whl (100 kB)
[K     |████████████████████████████████| 100 kB 5.3 MB/s ta 0:00:01
Collecting isodate>=0.5.4
  Downloading isodate-0.6.1-py2.py3-none-any.whl (41 kB)
[K     |████████████████████████████████| 41 kB 1.0 MB/s  eta 0:00:01
[?25hCollecting cached-property>=1.3.0
  Downloading cached_property-1.5.2-py2.py3-none-any.whl (7.6 kB)
Collecting lxml>=4.6.0
  Downloading lxml-4.8.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl (6.4 MB)
[K     |████████████████████████████████| 6.4 MB 13.5 MB/s eta 0:00:01
Collecting requests-file>=1.5.1
  Downloading requests_file-1.5.1-py2.py3-none-any.whl (3.7 kB)
Collecting requests-toolbelt>=0.7.1
  Downloading requests_toolbelt-0.9.1-py2.py3-none-any.whl (54 kB)
[K     |████████████████████████████████| 54 kB 5.1 MB/s  eta 0:00:01
[?25hCollecting platf

In [35]:
from pycep_correios import get_address_from_cep, WebService, exceptions


def addAddressByCorreios(row):    
    rowDict = row.asDict()
    cep = rowDict.get("geolocation_zip_code_prefix")
    print(cep)

    try:
        address_json = get_address_from_cep(str(cep), webservice=WebService.VIACEP)
        rowDict["geolocation_city"] = address_json["cidade"]
        rowDict["geolocation_state"] = address_json["uf"]
        rowDict["geolocation_street"] = address_json["logradouro"]
        
    except exceptions.CEPNotFound as ecnf:
        rowDict["geolocation_city"] = ''
        rowDict["geolocation_state"] = ''
        rowDict["geolocation_street"] = ''

    except exceptions.ConnectionError as errc:
        rowDict["geolocation_city"] = '-1'
        rowDict["geolocation_state"] = '-1'
        rowDict["geolocation_street"] = '-1'
        
    except exceptions.InvalidCEP as icnf:
        rowDict["geolocation_city"] = '-2'
        rowDict["geolocation_state"] = '-2'
        rowDict["geolocation_street"] = '-2'
    
    print(rowDict)
    return rowDict

In [36]:
geolocation_formattedCep_rows = geolocation_formattedCep_distinct_df.collect()

In [37]:
geolocation_formattedCep_address = [
    addAddressByCorreios(row) for row in geolocation_formattedCep_rows]


01001-000
{'geolocation_zip_code_prefix': '01001-000', 'geolocation_city': 'São Paulo', 'geolocation_state': 'SP', 'geolocation_street': 'Praça da Sé'}
01002-000
{'geolocation_zip_code_prefix': '01002-000', 'geolocation_city': 'São Paulo', 'geolocation_state': 'SP', 'geolocation_street': 'Rua Direita'}
01003-000
{'geolocation_zip_code_prefix': '01003-000', 'geolocation_city': 'São Paulo', 'geolocation_state': 'SP', 'geolocation_street': 'Rua José Bonifácio'}
01004-000
{'geolocation_zip_code_prefix': '01004-000', 'geolocation_city': 'São Paulo', 'geolocation_state': 'SP', 'geolocation_street': 'Rua Barão de Paranapiacaba'}
01005-000
{'geolocation_zip_code_prefix': '01005-000', 'geolocation_city': 'São Paulo', 'geolocation_state': 'SP', 'geolocation_street': 'Rua Benjamim Constant'}
01006-000
{'geolocation_zip_code_prefix': '01006-000', 'geolocation_city': 'São Paulo', 'geolocation_state': 'SP', 'geolocation_street': 'Rua Senador Feijó'}
01007-000
{'geolocation_zip_code_prefix': '01007-0

KeyboardInterrupt: 

##### ATENÇÃO!!! 

o codigo comentado acima funciona mas não foi executado até o final aqui devido ao tempo de processamento, pois não foi feita a implementação em bloco.

Como alternativa, a execução foi feita por outro colega e o resultado foi importado para a tabela no hive **olist_cleansed_db.geolocation_tb**

Entretanto, segue uma descrição sobre o objetivo da implementação acima:

Percebeu-se que alguns CEPS são invalidos, mesmo na base dos Correios. Esses casos foram mapeados pela exception InvalidCEP e marcamos para tratados posteriormente através da api geopy. A partir das coordenadas geográficas associadas no dataset geolocation, iriamos pegar o cep correto. 

Percebeu-se também que alguns CEPs não eram encontrados na pycep_correios via APICEP, mas era encontrado via VIACEP, por exemplo. Para esses casos, fizemos uma marcação para que fosse executado novamente apenas os casos que não foram encontrados para testar todas as opções do pycep. Caso nenhuma opção funcionasse para encontrar o CEP, a gente iria fazer o mesmo tratamento descrito anteriormente: usar a api geopy para pegar o cep correto. 

Fizemos um outro tratamento de exceção tb para os casos de problema de conexão. Marcamos os ceps com problema para que  pudessemos rodar novamente o pycep_correios apenas para os casos com problema. 


In [7]:
#CSV com o resultado do codigo acima

os.chdir(r'/mnt/notebooks/grupof/olist')

! hadoop fs -put geolocation_cep_correios.csv /datalake/olist/raw/csv


put: `/datalake/olist/raw/csv/geolocation_cep_correios.csv': File exists


In [64]:
schema = StructType() \
      .add("geo_zip_code_prefix",StringType(),True) \
      .add("geolocation_city",StringType(),True) \
      .add("geolocation_street",StringType(),True) \
      .add("geolocation_state",StringType(),True) 


geolocation_correios_df = spark.read.option("header",True) \
                       .option("delimiter",",") \
                       .schema(schema) \
                       .csv("hdfs://namenode:8020/datalake/olist/raw/csv/geolocation_cep_correios.csv")

In [14]:
#Exibição do conteudo que foi importado pelo Hive emm notebook anterior

from pyspark.sql import HiveContext

hive_context = HiveContext(spark)
geolocation_tb = hive_context.table("olist_cleansed_db.geolocation_tb")
geolocation_tb.show()


+---------------------------+--------------------+-----------------+--------------------+
|geolocation_zip_code_prefix|    geolocation_city|geolocation_state|  geolocation_street|
+---------------------------+--------------------+-----------------+--------------------+
|       geolocation_zip_c...|    geolocation_city|geolocation_state|  geolocation_street|
|                 02147-000 |São Paulo        ...|            SP   |Alameda Segundo-S...|
|                 02248-000 |São Paulo        ...|            SP   |Rua Cruz de Malta...|
|                 02240-000 |São Paulo        ...|            SP   |Avenida Doutor An...|
|                 02422-000 |São Paulo        ...|            SP   |Praça Doutor Poli...|
|                 02406-000 |São Paulo        ...|            SP   |Rua Dona Luiza To...|
|                 02407-000 |São Paulo        ...|            SP   |Rua Jerônima Dias...|
|                 02854-000 |São Paulo        ...|            SP   |Rua Rodolfo Perei...|
|         

In [23]:
df=spark.sql("show databases")
df.show()

+--------------------+
|        databaseName|
+--------------------+
|             default|
|   olist_cleansed_db|
|olist_consumption_db|
|   olist_enriched_db|
|    olist_trusted_db|
+--------------------+



In [63]:
df1=spark.sql("select * from olist_cleansed_db.geolocation_tb")
df1.show()

+---------------------------+--------------------+-----------------+--------------------+
|geolocation_zip_code_prefix|    geolocation_city|geolocation_state|  geolocation_street|
+---------------------------+--------------------+-----------------+--------------------+
|       geolocation_zip_c...|    geolocation_city|geolocation_state|  geolocation_street|
|                 02147-000 |São Paulo        ...|            SP   |Alameda Segundo-S...|
|                 02248-000 |São Paulo        ...|            SP   |Rua Cruz de Malta...|
|                 02240-000 |São Paulo        ...|            SP   |Avenida Doutor An...|
|                 02422-000 |São Paulo        ...|            SP   |Praça Doutor Poli...|
|                 02406-000 |São Paulo        ...|            SP   |Rua Dona Luiza To...|
|                 02407-000 |São Paulo        ...|            SP   |Rua Jerônima Dias...|
|                 02854-000 |São Paulo        ...|            SP   |Rua Rodolfo Perei...|
|         

In [50]:
geoclocation_cep_distinct_df.join(geolocation_df,geolocation_df.geolocation_zip_code_prefix == geoclocation_cep_distinct_df.geolocation_zip_code_prefix, "inner")\
.drop(geoclocation_cep_distinct_df["geolocation_zip_code_prefix"])\
.count()


1000163

In [51]:
geoclocation_cep_distinct_df.count()

19015

In [53]:
! pip install geopy

Collecting geopy
  Downloading geopy-2.2.0-py3-none-any.whl (118 kB)
[K     |████████████████████████████████| 118 kB 5.4 MB/s eta 0:00:01
[?25hCollecting geographiclib<2,>=1.49
  Downloading geographiclib-1.52-py3-none-any.whl (38 kB)
Installing collected packages: geographiclib, geopy
Successfully installed geographiclib-1.52 geopy-2.2.0


In [94]:
from geopy.geocoders import Nominatim

#adding new row with address provided by geopy
def getCepByLatLon(row):    
    rowDict = row.asDict()
    
    
    city = rowDict.get("geolocation_city")
    street = rowDict.get("geolocation_state")
    print(city)
    print(street)
    
    if(street!= "" and street!="-1" and street!="-2"):
        address = "{street} {city}"
        geolocator = Nominatim(user_agent="update_geolocation")
        location = geolocator.geocode(address.format(street=street,city=city))
        print(location.latitude)
        rowDict["geolocation_lat"] = str(location.latitude)
        rowDict["geolocation_lon"] = str(location.longitude)

    return rowDict

In [95]:
geolocation_valid_df = geolocation_correios_df.select("*").where(col("geo_zip_code_prefix")!="geolocation_zip_code_prefix")

In [96]:
geolocation_valid_df.cache()


DataFrame[geo_zip_code_prefix: string, geolocation_city: string, geolocation_street: string, geolocation_state: string]

In [97]:
geolocation_valid_df = geolocation_valid_df.select("*").where(col("geolocation_street").isNotNull())

geolocation_valid_df.show()



+-------------------+--------------------+------------------+--------------------+
|geo_zip_code_prefix|    geolocation_city|geolocation_street|   geolocation_state|
+-------------------+--------------------+------------------+--------------------+
|         02147-000 |São Paulo        ...|             SP   |Alameda Segundo-S...|
|         02248-000 |São Paulo        ...|             SP   |Rua Cruz de Malta...|
|         02240-000 |São Paulo        ...|             SP   |Avenida Doutor An...|
|         02422-000 |São Paulo        ...|             SP   |Praça Doutor Poli...|
|         02406-000 |São Paulo        ...|             SP   |Rua Dona Luiza To...|
|         02407-000 |São Paulo        ...|             SP   |Rua Jerônima Dias...|
|         02854-000 |São Paulo        ...|             SP   |Rua Rodolfo Perei...|
|         03279-000 |São Paulo        ...|             SP   |Rua Herwis       ...|
|         03820-000 |São Paulo        ...|             SP   |Rua Olho D'Água d...|
|   

In [98]:
geolocation_valid_rows = geolocation_valid_df.collect()

In [99]:
geolocation_latlon_address = [
        getCepByLatLon(row)
        for row in geolocation_valid_rows
     ]
geolocation_latlon_address_df = spark.createDataFrame(geolocation_latlon_address)
geolocation_latlon_address_df.show()

São Paulo                                                                                           
Alameda Segundo-Sargento Rubens Leite                                                                                                                                                                                                                     
-23.4988758
São Paulo                                                                                           
Rua Cruz de Malta                                                                                                                                                                                                                                         
-23.703249
São Paulo                                                                                           
Avenida Doutor Antônio Maria Laet                                                                                                                                           

AttributeError: 'NoneType' object has no attribute 'latitude'

#### ATENÇÃO:

O código acima tem como objetivo trazer as coordenadas geograficas atualizadas a partir das informações validas dos correios. 

Ocorreu algum erro que não deu tempo de ser tratado. 

Após essa atualização, a intenção era armazenar uma nova tabela de geolocation no Hive, atualizada, com as colunas: CEP, Latitude, Longitude, Cidade, Estado. Na base de dados CLEANSED.