# <font color='black'>Work at Olist Data</font>

# <font color='black'>Rodrigo Romanzini</font>

# Teste - Engenheiro de dados
<ul>
<li>Gostaríamos de analisar suas habilidades com SQL, modelagem dimensional e integração de dados. Mostre seus conhecimento em processos de ETL e conceitos de Data Warehouse? Que tal replicar nossos datasets, remodelar em um banco de dados e apresentar as melhorias realizadas em sua criação?</li>
<li>É possível utilizar o modelo proposto em um ambiente cloud? Quais plataformas ou serviços você utilizaria? Quais as vantagens do modelo escolhido em questões de performance?</li>
<li>Alguns membros do time dizem que a atual modelagem do banco de dados é adequada para o uso dos cientistas de dados e analistas de BI, porém, outros dizem que existem formas de modelar bancos de dados que trarão mais eficiência. Qual é a sua opinião sobre isso?</li>
<li>Estamos preocupados com o vertiginoso aumento do volume em nosso banco de dados atual? Você consideraria uma opção mais escalável ou devemos manter a estrutura existente?</li>
<li>Nossa ferramenta de visualização de dashboards está lenta e o nosso time detectou que o problema está na infraestrutura de dados. Como você abordaria esta situação do ponto vista de arquitetura de dados?</li>
<li>Nosso banco de dados está hospedado na nuvem e nossas ferramentas de análise de dados são "on premisses". Você manteria este arranjo ou faria mudanças visando mais performance?</li>
<li>Nossa área operacional necessita de informações em tempo real, porém os diretores da empresa, que acompanham somente informações de KPIs mensais, alegam que isso é desnecessário e acarretaria custos. Qual é o seu posicionamento sobre isso?</li>
<li>Nosso time que está focado em Governança de Dados alega que documentar os processos é mais importante do que refatorar os mais de 500 scripts que estão funcionando com lentidão. Como você atuaria neste impasse, se tivesse que priorizar o trabalho?</li>
<li>Aqui no olist, somos muito mão na massa! Como Engenheiro(a) de dados, mostre pra gente o que você consegue fazer na prática com esse nosso banco de dados. (Sabemos que é uma amostra, mas imagine que o todo pode ser petabytes de dados)</li>
<li>O que acha de escrever um relatório ou slides sobre a sua abordagem na solução de alguns desses problemas?</li>
<li>Fique livre para criar sua própria abordagem, caso considere que as dicas anteriores não sejam pertinentes.</li>
</ul>

# Spark SQL

O Spark SQL é usado para acessar dados estruturados com Spark.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row

In [2]:
# Spark Session - usada quando se trabalha com Dataframes no Spark
spSession = SparkSession.builder.master("local").appName("Work-At-Olist-Data").config("spark.some.config.option", "some-value").getOrCreate()

In [3]:
# Criando o SQL Context para trabalhar com Spark SQL
sqlContext = SQLContext(sc)

In [4]:
# Importando o arquivo e criando um DF
customerDF = spSession.read.csv('datasets/olist_customers_dataset.csv',inferSchema =True, header = True)
geolocationDF = spSession.read.csv('datasets/olist_geolocation_dataset.csv',inferSchema =True, header = True)
orderItemDF = spSession.read.csv('datasets/olist_order_items_dataset.csv',inferSchema =True, header = True)
orderDF = spSession.read.csv('datasets/olist_orders_dataset.csv',inferSchema =True, header = True)
productDF = spSession.read.csv('datasets/olist_products_dataset.csv',inferSchema =True, header = True)
sellerDF = spSession.read.csv('datasets/olist_sellers_dataset.csv',inferSchema =True, header = True)

In [None]:
# Verificando o schema 

In [5]:
customerDF.dtypes

[('customer_id', 'string'),
 ('customer_unique_id', 'string'),
 ('customer_zip_code_prefix', 'int'),
 ('customer_city', 'string'),
 ('customer_state', 'string')]

In [6]:
geolocationDF.dtypes

[('geolocation_zip_code_prefix', 'int'),
 ('geolocation_lat', 'double'),
 ('geolocation_lng', 'double'),
 ('geolocation_city', 'string'),
 ('geolocation_state', 'string')]

In [7]:
orderItemDF.dtypes

[('order_id', 'string'),
 ('order_item_id', 'int'),
 ('product_id', 'string'),
 ('seller_id', 'string'),
 ('shipping_limit_date', 'timestamp'),
 ('price', 'double'),
 ('freight_value', 'double')]

In [8]:
orderDF.dtypes

[('order_id', 'string'),
 ('customer_id', 'string'),
 ('order_status', 'string'),
 ('order_purchase_timestamp', 'timestamp'),
 ('order_approved_at', 'timestamp'),
 ('order_delivered_carrier_date', 'timestamp'),
 ('order_delivered_customer_date', 'timestamp'),
 ('order_estimated_delivery_date', 'timestamp')]

In [9]:
productDF.dtypes

[('product_id', 'string'),
 ('product_category_name', 'string'),
 ('product_name_lenght', 'int'),
 ('product_description_lenght', 'int'),
 ('product_photos_qty', 'int'),
 ('product_weight_g', 'int'),
 ('product_length_cm', 'int'),
 ('product_height_cm', 'int'),
 ('product_width_cm', 'int')]

In [10]:
sellerDF.dtypes

[('seller_id', 'string'),
 ('seller_zip_code_prefix', 'int'),
 ('seller_city', 'string'),
 ('seller_state', 'string')]

In [11]:
# Conhecendo os dados dos datasets

In [12]:
customerDF.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [13]:
geolocationDF.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       sao paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       são paulo|               SP|
|                       1047|-23.546273112412678| -46.6

In [14]:
orderItemDF.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|        18.14|
|00048cc3ae777c65d...|            1|ef92

In [16]:
orderDF.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [17]:
productDF.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [18]:
sellerDF.show()

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
|c240c4061717ac180...|                 20920|   rio de janeiro|          RJ|
|e49c26c3edfa46d22...|                 55325|           brejao|          PE|
|1b938a7ec6ac5061a...|                 16304|        penapolis|          SP|
|768a86e36ad6aae3d...|                  1529|        sao paulo|          SP|
|ccc4bbb5f32a6ab2b...|                 80310|         curitiba|          PR|

In [19]:
# Registrando os dataframe como uma Temp Table
customerDF.createOrReplaceTempView("customerTB")
geolocationDF.createOrReplaceTempView("geolocationTB")
orderItemDF.createOrReplaceTempView("orderItemTB")
orderDF.createOrReplaceTempView("orderTB")
productDF.createOrReplaceTempView("productTB")
sellerDF.createOrReplaceTempView("sellerTB")

In [28]:
# Criando novas tabelas
customerST = spSession.sql("select customer_id, customer_unique_id from customerTB")
productST = spSession.sql("select product_id, product_category_name, product_name_lenght, product_description_lenght, product_photos_qty, product_weight_g, product_length_cm, product_height_cm, product_width_cm from productTB")
sellerST = spSession.sql("select seller_id from sellerTB")
geolocationST = spSession.sql("select geolocation_zip_code_prefix as geolocation_id, geolocation_lat, geolocation_lng, geolocation_city, geolocation_state from geolocationTB")
spSession.sql("select date_format(order_purchase_timestamp, 'yMMdd') as date_id, \
               date_format(order_purchase_timestamp, 'y-MM-dd') as order_purchase, \
               dayofmonth(order_purchase_timestamp) as day_of_month, \
               dayofweek(order_purchase_timestamp) as day_of_week, \
               dayofyear(order_purchase_timestamp) as day_of_year, \
               month(order_purchase_timestamp) as month, \
               quarter(order_purchase_timestamp) as quarter, \
               weekofyear(order_purchase_timestamp) as week_of_year, \
               year(order_purchase_timestamp) as year from orderTB").createOrReplaceTempView("dateTB")
saleST = spSession.sql("select orderTB.order_id, orderTB.customer_id, order_status, dateTB.date_id, \
                       product_id, orderItemTB.seller_id, price, freight_value as freight, \
                       customer_zip_code_prefix as geolocation_customer_id, \
                       seller_zip_code_prefix as geolocation_saller_id \
                       from orderTB \
                       join dateTB on dateTB.date_id = date_format(orderTB.order_purchase_timestamp, 'yMMdd') \
                       join orderItemTB on orderTB.order_id = orderItemTB.order_id \
                       join customerTB on orderTB.customer_id = customerTB.customer_id \
                       join sellerTB on orderItemTB.seller_id = sellerTB.seller_id ")
dateST = spSession.sql("select * from dateTB")

In [32]:
saleST.dtypes

[('order_id', 'string'),
 ('customer_id', 'string'),
 ('order_status', 'string'),
 ('date_id', 'string'),
 ('product_id', 'string'),
 ('seller_id', 'string'),
 ('price', 'double'),
 ('freight', 'double'),
 ('geolocation_customer_id', 'int'),
 ('geolocation_saller_id', 'int')]

In [30]:
# Save DataFrame
dateST.write.save("./d_date.parquet", format="parquet")
customerST.write.save("./d_customer.parquet", format="parquet")
productST.write.save("./d_product.parquet", format="parquet")
sellerST.write.save("./d_seller.parquet", format="parquet")
geolocationST.write.save("./d_geolocation.parquet", format="parquet")
saleST.write.save("./f_sale.parquet", format="parquet")