#### Bibliotecas

In [0]:
# Importando os pacotes a serem utilizados
import pandas as pd
import numpy as np

### DADOS DIRETORIO MNT APÓS INGESTÃO

In [0]:
%fs ls "dbfs:/mnt/raw/pctgroup/olist"

path,name,size,modificationTime
dbfs:/mnt/raw/pctgroup/olist/customers.csv,customers.csv,9457154,1710773352000
dbfs:/mnt/raw/pctgroup/olist/geolocation.csv,geolocation.csv,48740107,1710776459000
dbfs:/mnt/raw/pctgroup/olist/order_items.csv,order_items.csv,16810742,1710777362000
dbfs:/mnt/raw/pctgroup/olist/order_payments.csv,order_payments.csv,6581977,1710777483000
dbfs:/mnt/raw/pctgroup/olist/order_reviews.csv,order_reviews.csv,12231296,1710778616000


In [0]:
# LENDO ARQUIVO CSV E CRIANDO UM DATA FRAME

df_customers_dataset = spark.read.csv('dbfs:/mnt/raw/pctgroup/olist/customers*.csv', sep=',', header=True)
df_geolocation_dataset = spark.read.csv('dbfs:/mnt/raw/pctgroup/olist/geolocation*.csv', sep=',', header=True)
df_order_items_dataset = spark.read.csv('dbfs:/mnt/raw/pctgroup/olist/order_items*.csv', sep=',', header=True)
df_order_payments_dataset = spark.read.csv('dbfs:/mnt/raw/pctgroup/olist/order_payments*.csv', sep=',', header=True)
df_order_reviews_dataset = spark.read.csv('dbfs:/mnt/raw/pctgroup/olist/order_reviews*.csv', sep=',', header=True)


### GRAVAR EM PARQUET

In [0]:
# GRAVANDO EM PARQUET

df_customers_dataset.write.mode("overwrite").parquet("/mnt/bs-stage/pctgroup/customers.parquet")
df_geolocation_dataset.write.mode("overwrite").parquet("/mnt/bs-stage/pctgroup/geolocation.parquet")
df_order_items_dataset.write.mode("overwrite").parquet("/mnt/bs-stage/pctgroup/order_items.parquet")
df_order_payments_dataset.write.mode("overwrite").parquet("/mnt/bs-stage/pctgroup/order_payments.parquet")
df_order_reviews_dataset.write.mode("overwrite").parquet("/mnt/bs-stage/pctgroup/order_reviews.parquet")


In [0]:
%fs ls "dbfs:/mnt/bs-stage/pctgroup"

path,name,size,modificationTime
dbfs:/mnt/bs-stage/pctgroup/customers.parquet/,customers.parquet/,0,0
dbfs:/mnt/bs-stage/pctgroup/geolocation.parquet/,geolocation.parquet/,0,0
dbfs:/mnt/bs-stage/pctgroup/order_items.parquet/,order_items.parquet/,0,0
dbfs:/mnt/bs-stage/pctgroup/order_payments.parquet/,order_payments.parquet/,0,0
dbfs:/mnt/bs-stage/pctgroup/order_reviews.parquet/,order_reviews.parquet/,0,0


### LER ARQUIVO PARQUET E CRIAR TABELA TEMPORARIA

##### customers

In [0]:
# load data into a python dataframe
ds_customers = spark.read.parquet("/mnt/bs-stage/pctgroup/customers.parquet")

# register as a temporary view [sql]
ds_customers.createOrReplaceTempView("py_ds_customers")


##### geolocation

In [0]:
# load data into a python dataframe
ds_geolocation = spark.read.parquet("/mnt/bs-stage/pctgroup/geolocation.parquet")

# register as a temporary view [sql]
ds_geolocation.createOrReplaceTempView("py_ds_geolocation")

##### order_items

In [0]:
# load data into a python dataframe
ds_order_items = spark.read.parquet("/mnt/bs-stage/pctgroup/order_items.parquet")

# register as a temporary view [sql]
ds_order_items.createOrReplaceTempView("py_ds_order_items")

##### order_payments

In [0]:
# load data into a python dataframe
ds_order_payments = spark.read.parquet("/mnt/bs-stage/pctgroup/order_payments.parquet")

# register as a temporary view [sql]
ds_order_payments.createOrReplaceTempView("py_ds_order_payments")

##### order_reviews

In [0]:
# load data into a python dataframe
ds_order_reviews = spark.read.parquet("/mnt/bs-stage/pctgroup/order_reviews.parquet")

# register as a temporary view [sql]
ds_order_reviews.createOrReplaceTempView("py_ds_order_reviews")

### TRATAMENTO DE DADOS

### precisa corrigir erro de cabeçalho para aplicar o tratamento dos codigos

##### Tratamento de Nulos

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("dataset", StringType(), nullable=False),
    StructField("n_rows", IntegerType(), nullable=False),
    StructField("n_cols", IntegerType(), nullable=False),
    StructField("null_amount", StringType(), nullable=False),
    StructField("qty_null_columns", IntegerType(), nullable=False),
    StructField("null_columns", StringType(), nullable=False)
])

data_info = spark.createDataFrame([], schema)

datasets = [ds_customers, ds_geolocation, ds_order_items, ds_order_payments, ds_order_reviews]
names = ['ds_customers', 'ds_geolocation', 'ds_order_items', 'ds_order_payments', 'ds_order_reviews']

for name, df in zip(names, datasets):
    n_rows = df.count()
    n_cols = len(df.columns)
    null_amount = sum([col(c).isNull().cast('int').alias(c) for c in df.columns])
    qty_null_columns = sum([col(c).isNull().cast('int') for c in df.columns])
    null_columns = ', '.join([c for c, qty_null in zip(df.columns, qty_null_columns.collect()) if qty_null > 0])
    data_info = data_info.union(spark.createDataFrame([(name, n_rows, n_cols, null_amount, len(null_columns.split(', ')), null_columns)], schema))

display(data_info)

#### Regra - Date

In [0]:
from pyspark.sql.functions import col, to_date

df_dt2 = ds_order_reviews.select(col("review_creation_date"), to_date(col("review_answer_timestamp"), "MM-dd-yyyy").alias("date2"))

df_dt2.head(10)

[Row(review_creation_date='2018-01-18 00:00:00', date2=None),
 Row(review_creation_date='2018-03-10 00:00:00', date2=None),
 Row(review_creation_date='2018-02-17 00:00:00', date2=None),
 Row(review_creation_date='2017-04-21 00:00:00', date2=None),
 Row(review_creation_date='2018-03-01 00:00:00', date2=None),
 Row(review_creation_date='2018-04-13 00:00:00', date2=None),
 Row(review_creation_date='2017-07-16 00:00:00', date2=None),
 Row(review_creation_date='2018-08-14 00:00:00', date2=None),
 Row(review_creation_date='2017-05-17 00:00:00', date2=None),
 Row(review_creation_date='2018-05-22 00:00:00', date2=None)]

#### Regra - Join

In [0]:
ds_order_items.head(10)

[Row(00010242fe8c5a6d1ba2dd792cb16214='a09e2bca86a90e8ab0bd8e504426e2e1', 1='1', 4244733e06e7ecb4970a6e2683c13e61='1a428b685ede76217c9efb550c4aaa59', 48436dade18ac8b2bce089ec2a041202='d2374cbcbb3ca4ab1086534108cc3ab7', 2017-09-19 09:45:35='2018-01-23 15:11:26', 58.9000015='22.8999996', 13.29='11.8500004'),
 Row(00010242fe8c5a6d1ba2dd792cb16214='a09e2bca86a90e8ab0bd8e504426e2e1', 1='2', 4244733e06e7ecb4970a6e2683c13e61='1a428b685ede76217c9efb550c4aaa59', 48436dade18ac8b2bce089ec2a041202='d2374cbcbb3ca4ab1086534108cc3ab7', 2017-09-19 09:45:35='2018-01-23 15:11:26', 58.9000015='22.8999996', 13.29='11.8500004'),
 Row(00010242fe8c5a6d1ba2dd792cb16214='a09e47caedbb9a7731f26449cc31e7a8', 1='1', 4244733e06e7ecb4970a6e2683c13e61='00636f9286f69c9f1bdabe76e670fb50', 48436dade18ac8b2bce089ec2a041202='cca3071e3e9bb7d12640c9fbe2301306', 2017-09-19 09:45:35='2017-03-10 20:21:48', 58.9000015='27.8999996', 13.29='10.96'),
 Row(00010242fe8c5a6d1ba2dd792cb16214='a09ed5d1761ac2a730a3447189e6eeea', 1='1', 

In [0]:
#TRATAMENTO DE VALORES NULOS




#### Removendo linhas duplicadas

In [0]:
# Removendo linhas duplicadas

ds_customers.count()

99440

In [0]:
ds_customers.drop_duplicates().count()

99440

COLUNA CACULADA

In [0]:
ds_geolocation.show(10)

+-----+-----------+-----------+--------------+---+
| 1037|-23.5456219|-46.6392937|     sao paulo| SP|
+-----+-----------+-----------+--------------+---+
|30160| -19.927597|-43.9381638|belo horizonte| MG|
|30170|  -19.93256|-43.9530449|belo horizonte| MG|
|30140| -19.928072|-43.9395065|belo horizonte| MG|
|30112|-19.9391117|-43.9366684|belo horizonte| MG|
|30170|-19.9306755|-43.9528389|belo horizonte| MG|
|30140|-19.9314022|-43.9260902|belo horizonte| MG|
|30140|-19.9239311|-43.9211273|belo horizonte| MG|
|30140|-19.9303627|-43.9301796|belo horizonte| MG|
|30112|-19.9333134|-43.9271278|belo horizonte| MG|
|30170|-19.9329128|-43.9517899|belo horizonte| MG|
+-----+-----------+-----------+--------------+---+
only showing top 10 rows



### GRAVANDO ARQUIVO EM FORMATO DELTA

In [0]:
ds_customers.write.mode("overwrite").format("delta").save("dbfs:/mnt/bs-production/delta/customers/")


### LEITURA TABELA DELTA E CRIANDO TABELA TEMPORARIA

In [0]:
# read delta table [querying from data lakehouse]
ds_customers_delta = spark.read.format("delta").load("dbfs:/mnt/bs-production/delta/customers/")

# make it available on sql engine
ds_customers_delta.createOrReplaceTempView("customers_delta")

In [0]:
%sql

select * from customers_delta limit 10

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
,,,,,18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
,,,,,4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
,,,,,b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
,,,,,4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP
,,,,,879864dab9bc3047522c92c82e1212b8,4c93744516667ad3b8f1fb645a3116a4,89254,jaragua do sul,SC
,,,,,fd826e7cf63160e536e0908c76c3f441,addec96d2e059c80c30fe6871d30d177,4534,sao paulo,SP
,,,,,5e274e7a0c3809e14aba7ad5aae0d407,57b2a98a409812fe9618067b6b8ebe4f,35182,timoteo,MG
,,,,,5adf08e34b2e993982a47070956c5c65,1175e95fb47ddff9de6b2b06188f7e0d,81560,curitiba,PR
,,,,,4b7139f34592b3a31687243a302fa75b,9afe194fb833f79e300e37e580171f22,30575,belo horizonte,MG
,,,,,9fb35e4ed6f0a14a4977cd9aea4042bb,2a7745e1ed516b289ed9b29c7d0539a5,39400,montes claros,MG
