## Creating Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .enableHiveSupport() \
    .getOrCreate()


## Check tables

In [2]:

spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [3]:
df1=spark.sql("SELECT count (*) FROM ecommerce.hue__tmp_olist_geolocation_dataset")



AnalysisException: "Table or view not found: `ecommerce`.`hue__tmp_olist_geolocation_dataset`; line 1 pos 22;\n'Aggregate [unresolvedalias(count(1), None)]\n+- 'UnresolvedRelation `ecommerce`.`hue__tmp_olist_geolocation_dataset`\n"

In [None]:
df1.show()


In [None]:
df1.printSchema()

## Reading CSV

### customers

In [None]:
df_customers=spark.read.format("csv").option("header", True).option("inferSchema", True).option("delimiter", ",").load("/user/hive/warehouse/loading_zone/olist_customers/olist_customers_dataset.csv")

In [None]:
df_customers.show()

### geolocation

In [4]:
df_geolocation=spark.read.format("csv").option("header", True).option("inferSchema", True).option("delimiter", ",").load("/user/hive/warehouse/loading_zone/olist_geolocation/olist_geolocation_dataset.csv")

In [5]:
df_geolocation.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       sao paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       são paulo|               SP|
|                       1047|-23.546273112412678| -46.6

In [None]:
df_geolocation.count()

In [None]:
df_geolocation.head(3)

In [24]:
from pyspark.sql.functions import when

df_geolocation2 = df_geolocation.withColumn("geolocation_city", when(df_geolocation.geolocation_city == "sao paulo", "São Paulo")\
.when(df_geolocation.geolocation_city == "são paulo", "São Paulo")\
.otherwise(df_geolocation.geolocation_city))



In [25]:
df_geolocation2.select("geolocation_city").distinct().count()

8010

In [26]:
df_geolocation2.select("geolocation_city").distinct().show()

+--------------------+
|    geolocation_city|
+--------------------+
|   redencao da serra|
|  aguas de sao pedro|
|            itirapuã|
|           indiaporã|
|         paulistânia|
|        capão bonito|
|              taguaí|
|                iepe|
|         teresópolis|
|              bacaxa|
|divino de sao lou...|
|divino das laranj...|
|            vermelho|
|            caçarema|
|          cacaratiba|
|                pote|
|            camacari|
|           itaberaba|
|              gloria|
|           arapiraca|
+--------------------+
only showing top 20 rows



In [27]:
df_geolocation2.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       São Paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       São Paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       São Paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       São Paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       São Paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       São Paulo|               SP|
|                       1047|-23.546273112412678| -46.6

In [28]:
from pyspark.sql.functions import initcap, col
df_geolocation2.select("*", initcap(col('geolocation_city'))).head(50)

[Row(geolocation_zip_code_prefix=1037, geolocation_lat=-23.54562128115268, geolocation_lng=-46.63929204800168, geolocation_city='São Paulo', geolocation_state='SP', initcap(geolocation_city)='São Paulo'),
 Row(geolocation_zip_code_prefix=1046, geolocation_lat=-23.546081127035535, geolocation_lng=-46.64482029837157, geolocation_city='São Paulo', geolocation_state='SP', initcap(geolocation_city)='São Paulo'),
 Row(geolocation_zip_code_prefix=1046, geolocation_lat=-23.54612896641469, geolocation_lng=-46.64295148361138, geolocation_city='São Paulo', geolocation_state='SP', initcap(geolocation_city)='São Paulo'),
 Row(geolocation_zip_code_prefix=1041, geolocation_lat=-23.5443921648681, geolocation_lng=-46.63949930627844, geolocation_city='São Paulo', geolocation_state='SP', initcap(geolocation_city)='São Paulo'),
 Row(geolocation_zip_code_prefix=1035, geolocation_lat=-23.541577961711493, geolocation_lng=-46.64160722329613, geolocation_city='São Paulo', geolocation_state='SP', initcap(geoloc

In [29]:
df_geolocation2.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       São Paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       São Paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       São Paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       São Paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       São Paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       São Paulo|               SP|
|                       1047|-23.546273112412678| -46.6

In [None]:
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter

geolocator = Nominatim(user_agent="fiap_mba")
geocode = RateLimiter(geolocator.geocode, min_delay_seconds = 1,   return_value_on_exception = None) 
# adding 1 second padding between calls

In [None]:
location = geocode.reverse((lat, long))

## Transforming to parquet

In [None]:
customers_parquet= df_customers.write.mode("overwrite").format("parquet").save("/user/hive/warehouse/raw_zone/")

In [None]:
customers_parquet

## Using geopy

In [7]:
from geopy.geocoders import Nominatim

In [8]:
geolocator = Nominatim(user_agent="fiap_mba")

In [9]:
location = geolocator.reverse("-23.545884279214015, -46.643163191240035")

In [10]:
print(location)

Edifício Comandante Linneu Gomes, 86, Avenida São Luís, República, São Paulo, Região Imediata de São Paulo, Região Metropolitana de São Paulo, Região Geográfica Intermediária de São Paulo, São Paulo, Região Sudeste, 01046-000, Brasil


## Using pycep_correios

In [11]:
from pycep_correios import get_address_from_cep, WebService

address = get_address_from_cep('01037000', webservice=WebService.APICEP)

In [12]:
address

{'bairro': 'República',
 'cep': '01037-000',
 'cidade': 'São Paulo',
 'logradouro': 'Rua Conselheiro Crispiniano - lado par',
 'uf': 'SP',
 'complemento': ''}

In [13]:
import pandas as pd

In [14]:
df = pd.DataFrame([
[-87.548627, 41.728184],
[-87.737227, 41.749111],
[-87.743974, 41.924143],
[-87.659294, 41.869314],
 [-87.727808, 41.877007],
], columns=["Longitude", "Latitude"])

In [15]:
df

Unnamed: 0,Longitude,Latitude
0,-87.548627,41.728184
1,-87.737227,41.749111
2,-87.743974,41.924143
3,-87.659294,41.869314
4,-87.727808,41.877007


In [17]:
from tqdm import tqdm
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter

tqdm.pandas()
geolocator = Nominatim(user_agent="fiap_mba2")
reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1)


In [18]:
df

Unnamed: 0,Longitude,Latitude
0,-87.548627,41.728184
1,-87.737227,41.749111
2,-87.743974,41.924143
3,-87.659294,41.869314
4,-87.727808,41.877007


In [19]:
df['Location'] = df.progress_apply(
lambda row: reverse((row['Latitude'], row['Longitude'])),
axis=1
)

100%|██████████| 5/5 [00:04<00:00,  1.07it/s]


In [20]:
df

Unnamed: 0,Longitude,Latitude,Location
0,-87.548627,41.728184,"(92nd Street & Baltimore, East 92nd Street, So..."
1,-87.737227,41.749111,"(79th Street & Kilpatrick EB, West 79th Street..."
2,-87.743974,41.924143,"(4701, West Fullerton Avenue, Belmont Cragin, ..."
3,-87.659294,41.869314,"(1301-1307, West Taylor Street, Illinois Medic..."
4,-87.727808,41.877007,"(Jackson & Karlov, West Jackson Boulevard, Wes..."


In [55]:
def parse_zipcode(location):
    if location and location.raw.get('address') and location.raw['address'].get('postcode'):
        return location.raw['address']['postcode']
    else:
        return None

df['Zipcode'] = df['Location'].apply(parse_zipcode)

In [30]:
df_geolocation3=df_geolocation2.toPandas()

In [31]:
print(df_geolocation3)

         geolocation_zip_code_prefix  geolocation_lat  geolocation_lng  \
0                               1037       -23.545621       -46.639292   
1                               1046       -23.546081       -46.644820   
2                               1046       -23.546129       -46.642951   
3                               1041       -23.544392       -46.639499   
4                               1035       -23.541578       -46.641607   
...                              ...              ...              ...   
1000158                        99950       -28.068639       -52.010705   
1000159                        99900       -27.877125       -52.224882   
1000160                        99950       -28.071855       -52.014716   
1000161                        99980       -28.388932       -51.846871   
1000162                        99950       -28.070104       -52.018658   

        geolocation_city geolocation_state  
0              São Paulo                SP  
1              São Pa

In [32]:
df_geolocation3['geolocation_city'] = df_geolocation3['geolocation_city'].str.capitalize()

In [33]:
print(df_geolocation3)

         geolocation_zip_code_prefix  geolocation_lat  geolocation_lng  \
0                               1037       -23.545621       -46.639292   
1                               1046       -23.546081       -46.644820   
2                               1046       -23.546129       -46.642951   
3                               1041       -23.544392       -46.639499   
4                               1035       -23.541578       -46.641607   
...                              ...              ...              ...   
1000158                        99950       -28.068639       -52.010705   
1000159                        99900       -27.877125       -52.224882   
1000160                        99950       -28.071855       -52.014716   
1000161                        99980       -28.388932       -51.846871   
1000162                        99950       -28.070104       -52.018658   

        geolocation_city geolocation_state  
0              São paulo                SP  
1              São pa

In [34]:
df_geolocation3 = df_geolocation3.head(20)

In [35]:
df_geolocation3

Unnamed: 0,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
0,1037,-23.545621,-46.639292,São paulo,SP
1,1046,-23.546081,-46.64482,São paulo,SP
2,1046,-23.546129,-46.642951,São paulo,SP
3,1041,-23.544392,-46.639499,São paulo,SP
4,1035,-23.541578,-46.641607,São paulo,SP
5,1012,-23.547762,-46.635361,São paulo,SP
6,1047,-23.546273,-46.641225,São paulo,SP
7,1013,-23.546923,-46.634264,São paulo,SP
8,1029,-23.543769,-46.634278,São paulo,SP
9,1011,-23.54764,-46.636032,São paulo,SP


In [36]:
from tqdm import tqdm
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter


tqdm.pandas()
geolocator = Nominatim(user_agent="fiap_mba")
reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1)

In [37]:
df_geolocation3['address'] = df_geolocation3.progress_apply(
lambda row: reverse((row['geolocation_lat'], row['geolocation_lng'])),
axis=1
)

100%|██████████| 20/20 [00:19<00:00,  1.01it/s]


In [38]:
df_geolocation3

Unnamed: 0,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state,address
0,1037,-23.545621,-46.639292,São paulo,SP,"(Casas Bahia, 131, Praça Ramos de Azevedo, Rep..."
1,1046,-23.546081,-46.64482,São paulo,SP,"(303, Avenida Ipiranga, Vila Buarque, Repúblic..."
2,1046,-23.546129,-46.642951,São paulo,SP,"(Confidence Cambio, Calçada Compartilhada Av S..."
3,1041,-23.544392,-46.639499,São paulo,SP,"(Galeria R Monteiro, 77, Rua 24 de Maio, Repúb..."
4,1035,-23.541578,-46.641607,São paulo,SP,"(816, Avenida São João, República, São Paulo, ..."
5,1012,-23.547762,-46.635361,São paulo,SP,"(Edifício George Loeb, 113, Rua da Quitanda, G..."
6,1047,-23.546273,-46.641225,São paulo,SP,"(Edifício Vicentina, Rua Doutor Bráulio Gomes,..."
7,1013,-23.546923,-46.634264,São paulo,SP,"(Edifício Canadá, Rua 15 de Novembro, Glicério..."
8,1029,-23.543769,-46.634278,São paulo,SP,"(Mosteiro de São Bento, Largo São Bento, Glicé..."
9,1011,-23.54764,-46.636032,São paulo,SP,"(30, Praça do Patriarca, Glicério, Sé, São Pau..."


In [39]:
def parse_zipcode(location):
    if location and location.raw.get('address') and location.raw['address'].get('postcode'):
        return location.raw['address']['postcode']
    else:
        return None

df_geolocation3['Zipcode'] = df_geolocation3['address'].progress_apply(parse_zipcode)

100%|██████████| 20/20 [00:00<00:00, 1357.07it/s]


In [40]:
df_geolocation3

Unnamed: 0,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state,address,Zipcode
0,1037,-23.545621,-46.639292,São paulo,SP,"(Casas Bahia, 131, Praça Ramos de Azevedo, Rep...",01037-000
1,1046,-23.546081,-46.64482,São paulo,SP,"(303, Avenida Ipiranga, Vila Buarque, Repúblic...",01046-010
2,1046,-23.546129,-46.642951,São paulo,SP,"(Confidence Cambio, Calçada Compartilhada Av S...",01046-001
3,1041,-23.544392,-46.639499,São paulo,SP,"(Galeria R Monteiro, 77, Rua 24 de Maio, Repúb...",01041-001
4,1035,-23.541578,-46.641607,São paulo,SP,"(816, Avenida São João, República, São Paulo, ...",01036-100
5,1012,-23.547762,-46.635361,São paulo,SP,"(Edifício George Loeb, 113, Rua da Quitanda, G...",01012-010
6,1047,-23.546273,-46.641225,São paulo,SP,"(Edifício Vicentina, Rua Doutor Bráulio Gomes,...",01043-000
7,1013,-23.546923,-46.634264,São paulo,SP,"(Edifício Canadá, Rua 15 de Novembro, Glicério...",01013-001
8,1029,-23.543769,-46.634278,São paulo,SP,"(Mosteiro de São Bento, Largo São Bento, Glicé...",01029-010
9,1011,-23.54764,-46.636032,São paulo,SP,"(30, Praça do Patriarca, Glicério, Sé, São Pau...",01002-010


In [None]:
df_geolocation3.dtypes

In [None]:
s = pd.Series(df_geolocation3['Zipcode'] )


In [None]:
#from pycep_correios import get_address_from_cep, WebService

#df_geolocation3['new_address'] = df_geolocation3.progress_apply(
 #   lambda row: get_address_from_cep(row['Zipcode'], webservice=WebService.APICEP), axis=1
#)

#df.apply(lambda row: row['A'] + row['B'], axis=1)


In [None]:
df_geolocation3

In [None]:
df_geolocation3 = df_geolocation3.coalesce()

In [None]:
print(df_geolocation3.rdd.getNumPartitions())

In [75]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, udf


def general_f(reverse_method):
    @udf(returnType = StringType())
    def get_address(lat, lon):
        concat = lat + ', ' + lon
        reverse_method((concat))
        return ''
    return get_address
   

In [76]:
df_geolocation2.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+-------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|address|
+---------------------------+-------------------+-------------------+----------------+-----------------+-------+
|                       1037| -23.54562128115268| -46.63929204800168|       São Paulo|               SP|      1|
|                       1046|-23.546081127035535| -46.64482029837157|       São Paulo|               SP|      1|
|                       1046| -23.54612896641469| -46.64295148361138|       São Paulo|               SP|      1|
|                       1041|  -23.5443921648681| -46.63949930627844|       São Paulo|               SP|      1|
|                       1035|-23.541577961711493| -46.64160722329613|       São Paulo|               SP|      1|
|                       1012|-23.547762303364266| -46.63536053788448|       São Paulo|          

In [77]:
df_geolocation2 = df_geolocation2.withColumn('address', general_f(reverse)(col('geolocation_lat'), col('geolocation_lng')))

Traceback (most recent call last):
  File "/opt/spark/python/pyspark/serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/opt/spark/python/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/opt/spark/python/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/opt/anaconda3/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/opt/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/anaconda3/lib/python3.6/pickle.py", line 736, in save_tuple
    save(element)
  File "/opt/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/spark/python/pyspark/cloudpickle.py", line 400, in save_function
    self.save_function_tuple(obj)
  File "/opt/spark/python/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/opt/anaconda3/lib/p

PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

In [71]:
df_geolocation2.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)
 |-- address: string (nullable = true)



In [48]:
df_geolocation2.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+-------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|address|
+---------------------------+-------------------+-------------------+----------------+-----------------+-------+
|                       1037| -23.54562128115268| -46.63929204800168|       São Paulo|               SP|      1|
|                       1046|-23.546081127035535| -46.64482029837157|       São Paulo|               SP|      1|
|                       1046| -23.54612896641469| -46.64295148361138|       São Paulo|               SP|      1|
|                       1041|  -23.5443921648681| -46.63949930627844|       São Paulo|               SP|      1|
|                       1035|-23.541577961711493| -46.64160722329613|       São Paulo|               SP|      1|
|                       1012|-23.547762303364266| -46.63536053788448|       São Paulo|          