#**Instalando a biblioteca PySpark**

Obs: Toda vez que for trabalhar com o PySpark no Colab tem que fazer a instalação do PySpark

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=70bf1b457cc00e8f7db0722cde682ec13af8d0b35c195ec19d5f1fdcb71bd547
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


#**Importando Bibliotecas**

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
from pyspark.sql.window import Window

#**Configurando a SparkSession**

In [3]:
spark = (SparkSession.builder\
        .master("local")\
        .appName("mini_projeto_final")\
        .config("spark.ui.port", "4050")\
        .getOrCreate())

spark

#**Montando a estrutura do DataFrame utilizando o StructType**

In [4]:
schema = StructType() \
        .add("ID", IntegerType(), True) \
        .add("ano_de_nascimento", IntegerType(), True) \
        .add("graduacao", StringType(), True) \
        .add("estado_civil", StringType(), True) \
        .add("renda_anual", DoubleType(), True) \
        .add("criancas_em_casa", IntegerType(), True) \
        .add("adolescentes_em_casa", IntegerType(), True) \
        .add("data_cliente", DateType(), True) \
        .add("recencia", IntegerType(), True) \
        .add("qts_vinhos", IntegerType(), True) \
        .add("qts_frutas", IntegerType(), True) \
        .add("qts_produtos_carne", IntegerType(), True) \
        .add("qts_produtos_peixe", IntegerType(), True) \
        .add("qts_produtos_doces", IntegerType(), True) \
        .add("qts_produtos_ouro", IntegerType(), True) \
        .add("num_compras_desconto", IntegerType(), True) \
        .add("num_compras_web", IntegerType(), True) \
        .add("num_compras_catalogo", IntegerType(), True) \
        .add("num_compras_loja", IntegerType(), True) \
        .add("num_visitas_web_mes", IntegerType(), True) \
        .add("campanha3", IntegerType(), True) \
        .add("campanha4", IntegerType(), True) \
        .add("campanha5", IntegerType(), True) \
        .add("campanha1", IntegerType(), True) \
        .add("campanha2", IntegerType(), True) \
        .add("reclamacoes", IntegerType(), True) \
        .add("resposta_alvo", IntegerType(), True) \

df1 = spark.read.format('csv') \
                  .option("header", True) \
                  .schema(schema) \
                  .load("/content/drive/MyDrive/dados/mini_projeto_final.csv")

#**Visualizando a como ficou a estrutura do DataFrame**

In [5]:
df1.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- ano_de_nascimento: integer (nullable = true)
 |-- graduacao: string (nullable = true)
 |-- estado_civil: string (nullable = true)
 |-- renda_anual: double (nullable = true)
 |-- criancas_em_casa: integer (nullable = true)
 |-- adolescentes_em_casa: integer (nullable = true)
 |-- data_cliente: date (nullable = true)
 |-- recencia: integer (nullable = true)
 |-- qts_vinhos: integer (nullable = true)
 |-- qts_frutas: integer (nullable = true)
 |-- qts_produtos_carne: integer (nullable = true)
 |-- qts_produtos_peixe: integer (nullable = true)
 |-- qts_produtos_doces: integer (nullable = true)
 |-- qts_produtos_ouro: integer (nullable = true)
 |-- num_compras_desconto: integer (nullable = true)
 |-- num_compras_web: integer (nullable = true)
 |-- num_compras_catalogo: integer (nullable = true)
 |-- num_compras_loja: integer (nullable = true)
 |-- num_visitas_web_mes: integer (nullable = true)
 |-- campanha3: integer (nullable = true)
 |-- campan

#**Visualizando o DataFrame estruturado**

In [6]:
df1.show()

+----+-----------------+-----------------+-------------+-----------+----------------+--------------------+------------+--------+----------+----------+------------------+------------------+------------------+-----------------+--------------------+---------------+--------------------+----------------+-------------------+---------+---------+---------+---------+---------+-----------+-------------+
|  ID|ano_de_nascimento|        graduacao| estado_civil|renda_anual|criancas_em_casa|adolescentes_em_casa|data_cliente|recencia|qts_vinhos|qts_frutas|qts_produtos_carne|qts_produtos_peixe|qts_produtos_doces|qts_produtos_ouro|num_compras_desconto|num_compras_web|num_compras_catalogo|num_compras_loja|num_visitas_web_mes|campanha3|campanha4|campanha5|campanha1|campanha2|reclamacoes|resposta_alvo|
+----+-----------------+-----------------+-------------+-----------+----------------+--------------------+------------+--------+----------+----------+------------------+------------------+------------------

#**Modificando o nome de 2 colunas**

In [7]:
df1 = df1.withColumnRenamed("ID", "id").withColumnRenamed("recencia", "qtd_dias_ultima_compra")

df1.show()

+----+-----------------+-----------------+-------------+-----------+----------------+--------------------+------------+----------------------+----------+----------+------------------+------------------+------------------+-----------------+--------------------+---------------+--------------------+----------------+-------------------+---------+---------+---------+---------+---------+-----------+-------------+
|  id|ano_de_nascimento|        graduacao| estado_civil|renda_anual|criancas_em_casa|adolescentes_em_casa|data_cliente|qtd_dias_ultima_compra|qts_vinhos|qts_frutas|qts_produtos_carne|qts_produtos_peixe|qts_produtos_doces|qts_produtos_ouro|num_compras_desconto|num_compras_web|num_compras_catalogo|num_compras_loja|num_visitas_web_mes|campanha3|campanha4|campanha5|campanha1|campanha2|reclamacoes|resposta_alvo|
+----+-----------------+-----------------+-------------+-----------+----------------+--------------------+------------+----------------------+----------+----------+--------------

#**Criando duas novas colunas contendo informações relevantes**

In [8]:
df1 = df1.withColumn("soma_qtd_produtos", F.col("qts_vinhos") + F.col("qts_frutas") + F.col("qts_produtos_carne") + F.col("qts_produtos_peixe") + F.col("qts_produtos_doces") + F.col("qts_produtos_ouro"))
ano_atual = 2021
df1 = df1.withColumn("idade_cliente", ano_atual - F.col("ano_de_nascimento"))

#Convertendo o tipo do dado
df1 = df1.withColumn("soma_qtd_produtos", df1.soma_qtd_produtos.cast("integer"))

df1.show()

+----+-----------------+-----------------+-------------+-----------+----------------+--------------------+------------+----------------------+----------+----------+------------------+------------------+------------------+-----------------+--------------------+---------------+--------------------+----------------+-------------------+---------+---------+---------+---------+---------+-----------+-------------+-----------------+-------------+
|  id|ano_de_nascimento|        graduacao| estado_civil|renda_anual|criancas_em_casa|adolescentes_em_casa|data_cliente|qtd_dias_ultima_compra|qts_vinhos|qts_frutas|qts_produtos_carne|qts_produtos_peixe|qts_produtos_doces|qts_produtos_ouro|num_compras_desconto|num_compras_web|num_compras_catalogo|num_compras_loja|num_visitas_web_mes|campanha3|campanha4|campanha5|campanha1|campanha2|reclamacoes|resposta_alvo|soma_qtd_produtos|idade_cliente|
+----+-----------------+-----------------+-------------+-----------+----------------+--------------------+--------

#**Filtrando, ordenando e agrupando dados relevantes para o negócio**

In [9]:
dados = (df1.withColumn("qntd_filhos",
          F.when(F.col("criancas_em_casa") > 0, F.concat(df1.criancas_em_casa, F.lit(" - Filhos")))
          .otherwise("Não tem filhos")))

In [10]:
dados.select(F.col("estado_civil"), F.col("qntd_filhos")).show()

+-------------+--------------+
| estado_civil|   qntd_filhos|
+-------------+--------------+
|  Solteiro(a)|Não tem filhos|
|  Solteiro(a)|    1 - Filhos|
|União Estável|Não tem filhos|
|União Estável|    1 - Filhos|
|    Casado(a)|    1 - Filhos|
|União Estável|Não tem filhos|
|Divorciado(a)|Não tem filhos|
|    Casado(a)|    1 - Filhos|
|União Estável|    1 - Filhos|
|União Estável|    1 - Filhos|
|    Casado(a)|Não tem filhos|
|Divorciado(a)|Não tem filhos|
|Divorciado(a)|    1 - Filhos|
|    Casado(a)|Não tem filhos|
|  Solteiro(a)|Não tem filhos|
|    Casado(a)|    1 - Filhos|
|União Estável|Não tem filhos|
|    Casado(a)|Não tem filhos|
|  Solteiro(a)|    1 - Filhos|
|    Casado(a)|Não tem filhos|
+-------------+--------------+
only showing top 20 rows



# **Utilizando Window Functions**

In [11]:
#RETORNA O NÚMERO DA LINHA DE ACORDO COM A COLUNA QUE FOI PARTICIONADA

wf1 = Window.partitionBy(F.col("estado_civil")).orderBy("renda_anual")

#withColumn cria uma nova coluna no dataframe
df1.withColumn("row_number", F.row_number().over(wf1)).show()

+-----+-----------------+-----------------+-------------------+-----------+----------------+--------------------+------------+----------------------+----------+----------+------------------+------------------+------------------+-----------------+--------------------+---------------+--------------------+----------------+-------------------+---------+---------+---------+---------+---------+-----------+-------------+-----------------+-------------+----------+
|   id|ano_de_nascimento|        graduacao|       estado_civil|renda_anual|criancas_em_casa|adolescentes_em_casa|data_cliente|qtd_dias_ultima_compra|qts_vinhos|qts_frutas|qts_produtos_carne|qts_produtos_peixe|qts_produtos_doces|qts_produtos_ouro|num_compras_desconto|num_compras_web|num_compras_catalogo|num_compras_loja|num_visitas_web_mes|campanha3|campanha4|campanha5|campanha1|campanha2|reclamacoes|resposta_alvo|soma_qtd_produtos|idade_cliente|row_number|
+-----+-----------------+-----------------+-------------------+-----------+---

In [12]:
df1.withColumn("percent_rank", F.percent_rank().over(wf1)).show()

+-----+-----------------+-----------------+-------------------+-----------+----------------+--------------------+------------+----------------------+----------+----------+------------------+------------------+------------------+-----------------+--------------------+---------------+--------------------+----------------+-------------------+---------+---------+---------+---------+---------+-----------+-------------+-----------------+-------------+--------------------+
|   id|ano_de_nascimento|        graduacao|       estado_civil|renda_anual|criancas_em_casa|adolescentes_em_casa|data_cliente|qtd_dias_ultima_compra|qts_vinhos|qts_frutas|qts_produtos_carne|qts_produtos_peixe|qts_produtos_doces|qts_produtos_ouro|num_compras_desconto|num_compras_web|num_compras_catalogo|num_compras_loja|num_visitas_web_mes|campanha3|campanha4|campanha5|campanha1|campanha2|reclamacoes|resposta_alvo|soma_qtd_produtos|idade_cliente|        percent_rank|
+-----+-----------------+-----------------+---------------

#**Salvando o DataFrame em um arquivo .csv**

In [13]:
df1.write.format("csv").option("header", "true").option("schema", "true").option("delimiter", ",").save("/content/drive/MyDrive/dados/df_pyspark_campanhaMKT.csv")