<a href="https://colab.research.google.com/github/marcellamj/soulcode-martech/blob/main/AD2_PySpark_Dados_Ecommerce_by_Olist.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark

## Importações

### Instalando os requisitos necessários

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

### Importando as ferramentas e criando o ambiente virtual para o spark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

### Criando a sessão clusterizada do PySpark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql.functions import regexp_replace
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Para deixar a visualição das tabelas mais amigável
spark

In [None]:
import pandas as pd

## Leitura dos dados


Fonte: https://www.kaggle.com/code/harshggupta22/e-commerce-analysis-pyspark/notebook

In [None]:
# Realizando leitura dos dados: orders
df_orders = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load('/content/orders.csv')

# Realizando leitura dos dados: order items
df_order_items = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load('/content/order_items.csv')

# Realizando leitura dos dados: order payments
df_order_payments = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load('/content/order_payments.csv')

# Realizando leitura dos dados: order reviews
df_order_reviews = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load('/content/order_reviews.csv')

# Realizando leitura dos dados: products
df_products = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load('/content/products.csv')

# Realizando leitura dos dados: customers
df_customers = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load('/content/customers.csv')

# Realizando leitura dos dados: geolocation
df_geolocation = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load('/content/geolocation.csv')

# Realizando leitura dos dados: sellers
df_sellers = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load('/content/sellers.csv')

Alternativa, por tipagem manual:
```
# Importando tipos primitivos
from pyspark.sql.types import StructType, StructField, \
    StringType, IntegerType

# Definindo schema
customers_schema = StructType([
    StructField("customer_id", StringType(), nullable=True, metadata={"description": "Id do cliente"}),
    StructField("customer_unique_id", StringType(), nullable=True, metadata={"description": "Id único do cliente"}),
    StructField("customer_zip_code_prefix", IntegerType(), nullable=True, metadata={"description": "Prefixo do CEP do cliente"}),
    StructField("customer_city", StringType(), nullable=True, metadata={"description": "Cidade do cliente"}),
    StructField("customer_state", StringType(), nullable=True, metadata={"description": "Estado do cliente"})
])

# Realizando a leitura dos dados
df_customers = spark.read.format("csv")\
    .option("header", "true")\
    .schema(customers_schema)\
    .load(os.path.join(data_path, "customers/"))

# Verificando amostra dos dados
df_customers.printSchema()
df_customers.show(5)
```

## Exemplos fictícios

In [None]:
from pyspark.sql.functions import col
col("anomes") * 100 + 1
col("valor_A")>col("valor_B")

from pyspark.sql.functions import lpad,lower,split,expr
lpad(col("moeda"),2,"")
lower(col("sigla"))
split("nome_completo", "")

Column<'split(nome_completo, , -1)'>

## Manipulação dos dados

### Exibir

In [None]:
df_orders.show(2)
df_order_items.show(2)
df_order_payments.show(2)
df_order_reviews.show(2)
df_products.show(2)
df_geolocation.show(2)
df_sellers.show(2)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
+--------------------+--------------------+--

In [None]:
display(df_orders)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
e481f51cbdc54678b...,9ef432eb625129730...,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
53cdb2fc8bc7dce0b...,b0830fb4747a6c6d2...,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
47770eb9100c2d0c4...,41ce2a54c0b03bf34...,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
949d5b44dbf5de918...,f88197465ea7920ad...,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
ad21c59c0840e6cb8...,8ab97904e6daea886...,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00
a4591c265e18cb1dc...,503740e9ca751ccdd...,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00
136cce7faa42fdb2c...,ed0271e0b7da060a3...,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00
6514b8ad8028c9f2c...,9bdf08b4b3b52b552...,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00
76c6e866289321a7c...,f54a9f0e6b351c431...,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00
e69bfb5eb88e0ed6a...,31ad1d1b63eb99624...,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00


### Criando novo df de consulta

* colunas renomeadas e tipadas

In [None]:
df_orders_prep = df_orders.selectExpr(
    "order_id AS id_pedido",
    "to_date(order_purchase_timestamp) AS dt_compra",
    "to_date(order_approved_at) AS dt_aprovacao",
    "to_date(order_delivered_customer_date) AS dt_entrega",
    "upper(order_status) AS status_pedido",
    "datediff(order_delivered_customer_date, order_purchase_timestamp) AS dias_para_entrega",
    "case when upper(order_status) = 'DELIVERED' then 1 else 0 end as flag_entregue"
)

# Visualizando dados
df_orders_prep.show(5)

+--------------------+----------+------------+----------+-------------+-----------------+-------------+
|           id_pedido| dt_compra|dt_aprovacao|dt_entrega|status_pedido|dias_para_entrega|flag_entregue|
+--------------------+----------+------------+----------+-------------+-----------------+-------------+
|e481f51cbdc54678b...|2017-10-02|  2017-10-02|2017-10-10|    DELIVERED|                8|            1|
|53cdb2fc8bc7dce0b...|2018-07-24|  2018-07-26|2018-08-07|    DELIVERED|               14|            1|
|47770eb9100c2d0c4...|2018-08-08|  2018-08-08|2018-08-17|    DELIVERED|                9|            1|
|949d5b44dbf5de918...|2017-11-18|  2017-11-18|2017-12-02|    DELIVERED|               14|            1|
|ad21c59c0840e6cb8...|2018-02-13|  2018-02-13|2018-02-16|    DELIVERED|                3|            1|
+--------------------+----------+------------+----------+-------------+-----------------+-------------+
only showing top 5 rows



### Adicionando colunas em um df

In [None]:
# Adicionando colunas em um DataFrame
from pyspark.sql.functions import expr
df_payments_prep = df_order_payments\
    .withColumn("moeda", expr("'R$'"))\
    .withColumn("vlr_pgto_moeda", expr("concat(moeda, cast(payment_value AS string))"))\
    .withColumn("tipo_pagamento", split(col("payment_type"), "_")[0])

# .withColumn("moeda", expr("'R$'"))
# cria a coluna moeda preenchida com a espressão 'R$'

# Visualizando dados
df_payments_prep.show(5)

+--------------------+------------------+------------+--------------------+-------------+-----+--------------+--------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|moeda|vlr_pgto_moeda|tipo_pagamento|
+--------------------+------------------+------------+--------------------+-------------+-----+--------------+--------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|   R$|       R$99.33|        credit|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|   R$|       R$24.39|        credit|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|   R$|       R$65.71|        credit|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|   R$|      R$107.78|        credit|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|   R$|      R$128.45|        

In [None]:
'''
Cria novas colunas e renomeia as antigas do df_orders_payments e as armazena
em df_payments_prep
'''
df_payments_prep = df_order_payments\
    .withColumnRenamed("order_id", "id_pedido")\
    .withColumnRenamed("payment_sequential", "parcela_pgto")\
    .withColumnRenamed("payment_type", "tipo_pgto")\
    .withColumnRenamed("payment_installments", "qtd_parcelas")\
    .withColumnRenamed("payment_value", "vlr_pgto")\
    .withColumn("moeda", expr("'R$'"))\
    .withColumn("vlr_pgto_moeda", expr("concat(moeda, cast("'vlr_pgto'" AS string))"))\
    .withColumn("tipo_pagamento", split(col("tipo_pgto"), "_")[0])
# .withColumnRenamed("order_id", "id_pedido")\ renomeando a coluna order para id pedido
# .withColumnRenamed("payment_sequential", "parcela_pgto")\ remomeando a coluna payment sequential para parcela pgto
# .withColumnRenamed("payment_type", "tipo_pgto")\ Renomeando payment type para tipo de pgto
# .withColumnRenamed("payment_installments", "qtd_parcelas")\ renomeando payment installments para qtd parcelas
# .withColumnRenamed("payment_value", "vlr_pgto") renomeando a coluna payment value para vlr pagamento
# .withColumn("moeda", expr("'R$'"))\ Crie uma coluna chamada moeda e preencha com o valor R$
# .withColumn("vlr_pgto_moeda", expr("concat(moeda, cast("'vlr_pgto'" AS string))"))\ Crie a coluna valor pagamento moeda que receba os valores da coluna moeda com os valores da coluna valor pagamento tipando com string
# .withColumn("tipo_pagamento", split(col("tipo_pgto"), "_")[0]) crie a coluna tipo pagamento e a preencha com a divisão (o split) da coluna tipo pagamento na posição 0
# credit_card sendo splitado por underline vira ["credit", "card"], "credit" é o indice 0 e "card" é indice 1
'''
Crio uma coluna com o nome tipo_pagamento e vou na coluna tipo_pgto e splito (separo) os registros dessa coluna que estão a esquerda, ou seja, índice [0] do "_"
# Então do nome credit_card encontrado nos registros usaremos apenas o nome credit.
# col é usado para referenciar uma coluna existente, nesse caso a coluna tipo_pgto... By Bianka
'''
df_payments_prep.show(5)



+--------------------+------------+-----------+------------+--------+-----+--------------+--------------+
|           id_pedido|parcela_pgto|  tipo_pgto|qtd_parcelas|vlr_pgto|moeda|vlr_pgto_moeda|tipo_pagamento|
+--------------------+------------+-----------+------------+--------+-----+--------------+--------------+
|b81ef226f3fe1789b...|           1|credit_card|           8|   99.33|   R$|       R$99.33|        credit|
|a9810da82917af2d9...|           1|credit_card|           1|   24.39|   R$|       R$24.39|        credit|
|25e8ea4e93396b6fa...|           1|credit_card|           1|   65.71|   R$|       R$65.71|        credit|
|ba78997921bbcdc13...|           1|credit_card|           8|  107.78|   R$|      R$107.78|        credit|
|42fdf880ba16b47b5...|           1|credit_card|           2|  128.45|   R$|      R$128.45|        credit|
+--------------------+------------+-----------+------------+--------+-----+--------------+--------------+
only showing top 5 rows



"moeda" com o valor literal "'vlr_pgto'". O resultado será uma nova string que contém a coluna "moeda" seguida pelo valor "'vlr_pgto'".

moeda: É uma coluna existente no DataFrame, cujo valor será concatenado com o valor literal "'vlr_pgto'" na nova coluna.

cast("'vlr_pgto'" AS string): É uma expressão que realiza um casting do valor literal "'vlr_pgto'" para o tipo de dado "string". O valor "'vlr_pgto'" é fornecido como uma string literal, e o comando cast garante que ele seja tratado explici
Pedro Barrionovo16:22
crie a coluna tipo_pagamento splitando os valores da coluna payment_type, usando como separador o caracter underline e selecionando o índice 0 da lista splitada.

### Renomear colunas

In [None]:
display(df_sellers)

seller_id,seller_zip_code_prefix,seller_city,seller_state
3442f8959a84dea7e...,13023,campinas,SP
d1b65fc7debc3361e...,13844,mogi guacu,SP
ce3ad9de960102d06...,20031,rio de janeiro,RJ
c0f3eea2e14555b6f...,4195,sao paulo,SP
51a04a8a6bdcb23de...,12914,braganca paulista,SP
c240c4061717ac180...,20920,rio de janeiro,RJ
e49c26c3edfa46d22...,55325,brejao,PE
1b938a7ec6ac5061a...,16304,penapolis,SP
768a86e36ad6aae3d...,1529,sao paulo,SP
ccc4bbb5f32a6ab2b...,80310,curitiba,PR


In [None]:
df_sellers_prep = df_sellers.select(
    col("seller_id").alias("id_vendedor"),
    col("seller_zip_code_prefix").alias("cep_vendedor"),
    col("seller_city").alias("cidade_vendedor"),
    col("seller_state").alias("estado_vendedor")
)

# Visualizando dados
df_sellers_prep.show(10)

+--------------------+------------+-----------------+---------------+
|         id_vendedor|cep_vendedor|  cidade_vendedor|estado_vendedor|
+--------------------+------------+-----------------+---------------+
|3442f8959a84dea7e...|       13023|         campinas|             SP|
|d1b65fc7debc3361e...|       13844|       mogi guacu|             SP|
|ce3ad9de960102d06...|       20031|   rio de janeiro|             RJ|
|c0f3eea2e14555b6f...|        4195|        sao paulo|             SP|
|51a04a8a6bdcb23de...|       12914|braganca paulista|             SP|
|c240c4061717ac180...|       20920|   rio de janeiro|             RJ|
|e49c26c3edfa46d22...|       55325|           brejao|             PE|
|1b938a7ec6ac5061a...|       16304|        penapolis|             SP|
|768a86e36ad6aae3d...|        1529|        sao paulo|             SP|
|ccc4bbb5f32a6ab2b...|       80310|         curitiba|             PR|
+--------------------+------------+-----------------+---------------+
only showing top 10 

### Dropar

In [None]:
display(df_geolocation)

geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
1037,-23.54562128115268,-46.63929204800168,sao paulo,SP
1046,-23.54608112703553,-46.64482029837157,sao paulo,SP
1046,-23.54612896641469,-46.64295148361138,sao paulo,SP
1041,-23.5443921648681,-46.63949930627844,sao paulo,SP
1035,-23.541577961711493,-46.64160722329613,sao paulo,SP
1012,-23.547762303364262,-46.63536053788448,são paulo,SP
1047,-23.54627311241268,-46.64122516971552,sao paulo,SP
1013,-23.546923208436723,-46.6342636964915,sao paulo,SP
1029,-23.543769055769133,-46.63427784085132,sao paulo,SP
1011,-23.547639550320632,-46.63603162315495,sao paulo,SP


In [None]:
df_geo_dropped = df_geolocation.drop("geolocation_state")
df_geo_dropped.show

<bound method DataFrame.show of +---------------------------+-------------------+-------------------+----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|
+---------------------------+-------------------+-------------------+----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|
|                       1035|-23.541577961711493| -46.64160722329613|       sao paulo|
|                       1012|-23.547762303364266| -46.63536053788448|       são paulo|
|                       1047|-23.546273112412678| -46.64122516971552|       sao paulo|
|                       1013|-23.546923208436723|  -46.6342636964915|       sao paulo|
|          

In [None]:
df_geolocation.drop("geolocation_city", "geolocation_state").show(5)

+---------------------------+-------------------+------------------+
|geolocation_zip_code_prefix|    geolocation_lat|   geolocation_lng|
+---------------------------+-------------------+------------------+
|                       1037| -23.54562128115268|-46.63929204800168|
|                       1046|-23.546081127035535|-46.64482029837157|
|                       1046| -23.54612896641469|-46.64295148361138|
|                       1041|  -23.5443921648681|-46.63949930627844|
|                       1035|-23.541577961711493|-46.64160722329613|
+---------------------------+-------------------+------------------+
only showing top 5 rows



In [None]:
df_geolocation.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       sao paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       são paulo|               SP|
|                       1047|-23.546273112412678| -46.6