<a href="https://colab.research.google.com/github/victorog17/soulcode_projeto_pandas_spark/blob/main/002_Mini_Projeto_Soul_Epata_PySpark_COLAB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**BIBLIOTECAS**

In [None]:
pip install pyspark

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from pyspark.sql.window import Window

In [None]:
spark = (SparkSession.builder\
        .master("local")\
        .appName("dataframe_withcolumn")\
        .config("spark.ui.port", "4050")\
        .getOrCreate())
spark

**CRIANDO O DATAFRAME**

In [None]:
# File location and type
file_location = "/content/drive/MyDrive/Soul_Code_Academy/repositorio_mini_projeto/marketing_campaign_tratado_pandas.csv"
#file_location = "https://storage.googleapis.com/bucket-mini-projeto-final-soulcode-victor/arquivo_tratado_pandas/marketing_campaign_tratado_pandas.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

customSchema = StructType([
    StructField("id", StringType(), True),
    StructField("ano_nasc", IntegerType(), True),
    StructField("educacao", StringType(), True),
    StructField("status_civil", StringType(), True),
    StructField("renda_anual", DoubleType(), True),
    StructField("qtd_crianca", IntegerType(), True),
    StructField("qtd_adolescente", IntegerType(), True),
    StructField("data_primeira_compra", DateType(), True),
    StructField("dias_ultima_compra", IntegerType(), True),
    StructField("gasto_em_vinhos", IntegerType(), True),
    StructField("gasto_em_frutas", IntegerType(), True),
    StructField("gasto_em_carnes", IntegerType(), True),
    StructField("gasto_em_peixes", IntegerType(), True),
    StructField("gasto_em_doces", IntegerType(), True),
    StructField("gasto_em_ouro", IntegerType(), True),
    StructField("num_compras_desconto", IntegerType(), True),
    StructField("num_compras_online", IntegerType(), True),
    StructField("num_compras_catalogo", IntegerType(), True),
    StructField("num_compras_loja", IntegerType(), True),
    StructField("num_visitas_online_mensal", IntegerType(), True),
    StructField("oferta_aceito3", StringType(), True),
    StructField("oferta_aceito4", StringType(), True),
    StructField("oferta_aceito5", StringType(), True),
    StructField("oferta_aceito1", StringType(), True),
    StructField("oferta_aceito2", StringType(), True),
    StructField("reclamacoes", StringType(), True),
    StructField("resposta", StringType(), True),
])

df1 = spark.read.load(file_location, format="csv", header="true", sep=',', schema=customSchema)

df1.show(5)
df1.printSchema()

+----+--------+---------+-------------+-----------+-----------+---------------+--------------------+------------------+---------------+---------------+---------------+---------------+--------------+-------------+--------------------+------------------+--------------------+----------------+-------------------------+--------------+--------------+--------------+--------------+--------------+-----------+--------+
|  id|ano_nasc| educacao| status_civil|renda_anual|qtd_crianca|qtd_adolescente|data_primeira_compra|dias_ultima_compra|gasto_em_vinhos|gasto_em_frutas|gasto_em_carnes|gasto_em_peixes|gasto_em_doces|gasto_em_ouro|num_compras_desconto|num_compras_online|num_compras_catalogo|num_compras_loja|num_visitas_online_mensal|oferta_aceito3|oferta_aceito4|oferta_aceito5|oferta_aceito1|oferta_aceito2|reclamacoes|resposta|
+----+--------+---------+-------------+-----------+-----------+---------------+--------------------+------------------+---------------+---------------+---------------+-------

**RENOMEANDO COLUNAS**

In [None]:
df1 = df1.withColumnRenamed("id", "identificacao").withColumnRenamed("renda_anual", "salario_anual")
df1.printSchema()

root
 |-- identificacao: string (nullable = true)
 |-- ano_nasc: integer (nullable = true)
 |-- educacao: string (nullable = true)
 |-- status_civil: string (nullable = true)
 |-- salario_anual: double (nullable = true)
 |-- qtd_crianca: integer (nullable = true)
 |-- qtd_adolescente: integer (nullable = true)
 |-- data_primeira_compra: date (nullable = true)
 |-- dias_ultima_compra: integer (nullable = true)
 |-- gasto_em_vinhos: integer (nullable = true)
 |-- gasto_em_frutas: integer (nullable = true)
 |-- gasto_em_carnes: integer (nullable = true)
 |-- gasto_em_peixes: integer (nullable = true)
 |-- gasto_em_doces: integer (nullable = true)
 |-- gasto_em_ouro: integer (nullable = true)
 |-- num_compras_desconto: integer (nullable = true)
 |-- num_compras_online: integer (nullable = true)
 |-- num_compras_catalogo: integer (nullable = true)
 |-- num_compras_loja: integer (nullable = true)
 |-- num_visitas_online_mensal: integer (nullable = true)
 |-- oferta_aceito3: string (nullable 

**CRIANDO NOVAS VARIAVEIS**

In [None]:
df1 = df1.withColumn("total_de_filhos", F.col("qtd_crianca") + F.col("qtd_adolescente"))
df1 = df1.withColumn("total_gasto_2anos", 
                          F.col("gasto_em_vinhos") + F.col("gasto_em_frutas") + F.col("gasto_em_carnes") + F.col("gasto_em_peixes") +
                          F.col("gasto_em_doces") + F.col("gasto_em_ouro"))
df1.show(5)

+-------------+--------+---------+-------------+-------------+-----------+---------------+--------------------+------------------+---------------+---------------+---------------+---------------+--------------+-------------+--------------------+------------------+--------------------+----------------+-------------------------+--------------+--------------+--------------+--------------+--------------+-----------+--------+---------------+-----------------+
|identificacao|ano_nasc| educacao| status_civil|salario_anual|qtd_crianca|qtd_adolescente|data_primeira_compra|dias_ultima_compra|gasto_em_vinhos|gasto_em_frutas|gasto_em_carnes|gasto_em_peixes|gasto_em_doces|gasto_em_ouro|num_compras_desconto|num_compras_online|num_compras_catalogo|num_compras_loja|num_visitas_online_mensal|oferta_aceito3|oferta_aceito4|oferta_aceito5|oferta_aceito1|oferta_aceito2|reclamacoes|resposta|total_de_filhos|total_gasto_2anos|
+-------------+--------+---------+-------------+-------------+-----------+----------

**FILTROS E INSIGHTS**

In [None]:
# a ideia aqui é mostrar como estão as faixas salariais e o número de filhos de acordo com o grau de estudos
(df1.groupBy(F.col("educacao")).agg(
            F.round(F.mean("salario_anual"), 2).alias("media_salario"),
            F.round(F.min("salario_anual"), 2).alias("menor_salario"),
            F.round(F.max("salario_anual"), 2).alias("maior_salario"),
            F.round(F.mean("total_de_filhos"), 2).alias("media_de_filhos"),
            F.max("total_de_filhos").alias("maior_numero_filhos")
            ).orderBy("media_salario").show())

+------------------+-------------+-------------+-------------+---------------+-------------------+
|          educacao|media_salario|menor_salario|maior_salario|media_de_filhos|maior_numero_filhos|
+------------------+-------------+-------------+-------------+---------------+-------------------+
|ensino_fundamental|     20306.26|       7500.0|      34445.0|           0.72|                  2|
|      ensino_medio|     47625.33|       7500.0|      96547.0|           0.88|                  3|
|         graduacao|     52713.21|       1730.0|     666666.0|           0.93|                  3|
|          mestrado|     52859.73|       6560.0|     157733.0|           0.99|                  3|
|         doutorado|     56162.77|       4023.0|     162397.0|            1.0|                  3|
+------------------+-------------+-------------+-------------+---------------+-------------------+



In [None]:
# a ideia aqui é verificar quantos doutores ganham acima de 100.000,00 ao ano
df1.filter((F.col("educacao") == 'doutorado') & (F.col("salario_anual") > 100000)).show()

+-------------+--------+---------+-------------+-------------+-----------+---------------+--------------------+------------------+---------------+---------------+---------------+---------------+--------------+-------------+--------------------+------------------+--------------------+----------------+-------------------------+--------------+--------------+--------------+--------------+--------------+-----------+--------+---------------+-----------------+
|identificacao|ano_nasc| educacao| status_civil|salario_anual|qtd_crianca|qtd_adolescente|data_primeira_compra|dias_ultima_compra|gasto_em_vinhos|gasto_em_frutas|gasto_em_carnes|gasto_em_peixes|gasto_em_doces|gasto_em_ouro|num_compras_desconto|num_compras_online|num_compras_catalogo|num_compras_loja|num_visitas_online_mensal|oferta_aceito3|oferta_aceito4|oferta_aceito5|oferta_aceito1|oferta_aceito2|reclamacoes|resposta|total_de_filhos|total_gasto_2anos|
+-------------+--------+---------+-------------+-------------+-----------+----------

In [None]:
# a ideia aqui é mostrar como estão as faixas salariais e o número de filhos de acordo com o status civil
(df1.groupBy(F.col("status_civil")).agg(
            F.round(F.mean("salario_anual"), 2).alias("media_salario"),
            F.round(F.min("salario_anual"), 2).alias("menor_salario"),
            F.round(F.max("salario_anual"), 2).alias("maior_salario"),
            F.round(F.mean("total_de_filhos"), 2).alias("media_de_filhos"),
            F.max("total_de_filhos").alias("maior_numero_filhos")
            ).orderBy("media_salario").show())

+-------------+-------------+-------------+-------------+---------------+-------------------+
| status_civil|media_salario|menor_salario|maior_salario|media_de_filhos|maior_numero_filhos|
+-------------+-------------+-------------+-------------+---------------+-------------------+
|     solteiro|      50975.8|       3502.0|     113734.0|           0.86|                  3|
|       casado|     51724.98|       2447.0|     160803.0|           0.96|                  3|
|   divorciado|     52904.33|       1730.0|     153924.0|            1.0|                  3|
|uniao_estavel|     53192.59|       5648.0|     666666.0|           0.98|                  3|
|        viuvo|     56481.55|      22123.0|      85620.0|           0.88|                  2|
+-------------+-------------+-------------+-------------+---------------+-------------------+



In [None]:
# aqui a ideia é ver a média salarial daquels que não tem filhos de acordo com o status civil
(df1.groupBy(F.col("educacao"), F.col("total_de_filhos")).mean("salario_anual").where(F.col("total_de_filhos") == 0).show())

+------------------+---------------+------------------+
|          educacao|total_de_filhos|avg(salario_anual)|
+------------------+---------------+------------------+
|      ensino_medio|              0| 57842.15873015873|
|         graduacao|              0| 66367.69278996866|
|         doutorado|              0| 72354.37121212122|
|          mestrado|              0| 66971.70707070707|
|ensino_fundamental|              0|21590.235294117647|
+------------------+---------------+------------------+



**WINDOW FUNCTIONS**

In [None]:
w0 = Window.orderBy(F.col("ano_nasc"))
df2 = df1.withColumn("rank", F.row_number().over(w0))
df2.select(F.col("identificacao"), F.col("ano_nasc"), F.col("status_civil"), F.col("total_de_filhos") , F.col("rank")).show(10)

+-------------+--------+------------+---------------+----+
|identificacao|ano_nasc|status_civil|total_de_filhos|rank|
+-------------+--------+------------+---------------+----+
|         6663|    1940|    solteiro|              0|   1|
|         6932|    1941|      casado|              0|   2|
|         6142|    1943|      casado|              0|   3|
|         7106|    1943|      casado|              0|   4|
|         2968|    1943|  divorciado|              0|   5|
|         8800|    1943|  divorciado|              0|   6|
|         1453|    1943|       viuvo|              0|   7|
|         4994|    1943|    solteiro|              0|   8|
|         4587|    1944|       viuvo|              0|   9|
|         6605|    1944|  divorciado|              0|  10|
+-------------+--------+------------+---------------+----+
only showing top 10 rows



In [None]:
w1 = Window.partitionBy(F.col("status_civil")).orderBy(F.col("total_gasto_2anos").desc())
df3 = df1.withColumn("dense_rank", F.dense_rank().over(w1))
df3.select(F.col("educacao"), F.col("salario_anual"), F.col("resposta"), F.col("total_gasto_2anos"), F.col("dense_rank")).show(10)

+---------+-------------+--------+-----------------+----------+
| educacao|salario_anual|resposta|total_gasto_2anos|dense_rank|
+---------+-------------+--------+-----------------+----------+
|graduacao|      75759.0|     sim|             2486|         1|
|doutorado|      69098.0|     nao|             2440|         2|
| mestrado|      90226.0|     nao|             2352|         3|
| mestrado|      83151.0|     sim|             2346|         4|
|doutorado|      80360.0|     nao|             2231|         5|
|graduacao|      94642.0|     nao|             2211|         6|
|graduacao|      83512.0|     sim|             2157|         7|
|graduacao|      78618.0|     nao|             2153|         8|
|doutorado|      67546.0|     nao|             2126|         9|
|doutorado|      67546.0|     nao|             2126|         9|
+---------+-------------+--------+-----------------+----------+
only showing top 10 rows



**SALVANDO ARQUIVO NO DRIVE**

In [None]:
#df1.write.format("com.databricks.spark.csv").option("header", "true")\
#                                            .option("inferschema", "true")\
#                                            .option("delimiter", ",")\
#                                            .save("/content/drive/MyDrive/Soul_Code_Academy/repositorio_mini_projeto/marketing_campaign_tratado_pyspark.csv")