# <font color='blue'>Uni-Facef - Pyspark - Atividade</font>

Essa atividade tem como objetivo descobrir um pouco mais sobre o cliente e traçar um perfil do mesmo. Visto que temos as informações pessoais do cliemte em um arquivo chamado "cliente_fake.csv", e os dados transacionais desse cliente no arquivo chamado "venda_fake". E idéia do exercicio é a manipulação dos dados de forma a fazer agregações, join nos dados, podendo ser necessário o uso de UDFs, gerando um Dataframe Final:

#### O Dataframe final deve estar agrupado por cliente (cliente_id), e conter as seguintes informações:

- cliente_id - O identificador do cliente
- idade - Calcular a idade do cliente (valor inteiro)
- recencia - Calcular a quantos meses faz que o cliente não compra (valor inteiro)
- qt_total_compras - Quantidade total de compras (pedidos distintos)
- vr_total_compras = Valor total de compras do cliente
- qt_total_itens - Quantidade total de itens comprados pelo cliente (produto_id distintos)
- qt_max_parcelas - Quantidade máxima de parcelas que o cliente já utilizou
- perfil_cliente - Deve mostrar o perfil de compra desse cliente, se compra somente "loja" ou somente "online", ou em ambos "multicanal";

OBS: O dataset "venda_fake" está a nível de itens. Pode haver mais de um item por pedido, ito é, o numero do pedido pode estar duplicado nodataset. 


In [1]:
!pip install pyspark

[33mYou are using pip version 18.1, however version 20.3.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
# coding: utf-8
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .appName('ConhecendoCliente') \
    .getOrCreate()

#### Dataset de cliente

In [4]:
df_cliente = spark.read \
    .option("delimiter", "|") \
    .csv('cliente_fake.csv', header=True)

df_cliente.show()

+--------------+----------+-------------+
|          nome|cliente_id|dt_nascimento|
+--------------+----------+-------------+
|Cliente 100013|    100013|   1964-06-21|
|Cliente 100014|    100014|   1964-05-24|
|Cliente 100015|    100015|   1946-10-18|
|Cliente 100016|    100016|   1972-03-24|
|Cliente 100017|    100017|   1982-01-19|
|Cliente 100018|    100018|   1983-04-26|
|Cliente 100019|    100019|   1971-09-09|
|Cliente 100020|    100020|   1989-06-18|
|Cliente 100007|    100007|   1976-11-04|
|Cliente 100008|    100008|   1980-12-01|
|Cliente 100009|    100009|   1981-03-05|
|Cliente 100010|    100010|   1984-03-20|
|Cliente 100011|    100011|   1977-11-22|
|Cliente 100012|    100012|   1974-02-04|
|Cliente 100001|    100001|   2009-04-16|
|Cliente 100002|    100002|   1974-08-29|
|Cliente 100003|    100003|   1981-12-05|
|Cliente 100004|    100004|   1968-08-08|
|Cliente 100005|    100005|   1982-11-29|
|Cliente 100006|    100006|   1948-04-01|
+--------------+----------+-------

#### Dataset de pedido

In [5]:
df_venda = spark.read \
    .option("delimiter", "|") \
    .csv('venda_fake.csv', header=True)

df_venda.show()

+--------------+----------+--------------------+----------+-----------+--------------+------------+-------+--------+
|     pedido_id|cliente_id|           dt_pedido|produto_id|qt_parcelas|   forma_pagto|vr_unit_item|qt_item|tp_canal|
+--------------+----------+--------------------+----------+-----------+--------------+------------+-------+--------+
|10000720200217|    100007|2020-02-17T00:00:...|   6253644|         20|cartao credito|         478|      1|    loja|
|10000720200217|    100007|2020-02-17T00:00:...|   5313408|         20|cartao credito|      1357.2|      1|    loja|
|10000720200217|    100007|2020-02-17T00:00:...|   6253140|         20|cartao credito|         372|      1|    loja|
|10000720200625|    100007|2020-06-25T00:00:...|   6858780|          7|cartao credito|       317.8|      1|    site|
|10000120200626|    100001|2020-06-26T00:00:...|   7560984|         16|cartao credito|     1399.04|      1|    loja|
|10000920200614|    100009|2020-06-14T00:00:...|   7571280|     

In [7]:
df_venda.createOrReplaceTempView("venda")
df_cliente.createOrReplaceTempView("cliente")

df_resultado = spark.sql(
    """
        SELECT 
            c.cliente_id,    
            FLOOR(DATEDIFF(NOW(), c.dt_nascimento) / 365) as idade, 
            COUNT(DISTINCT v.pedido_id) as qt_total_compras, 
            FLOOR(DATEDIFF(NOW(), MAX(v.dt_pedido)) / 30) as recencia, 
            ROUND(SUM(v.vr_unit_item * v.qt_item ), 2) as vr_total_compras, 
            FLOOR(SUM(v.qt_item)) as qt_total_itens, 
            MAX(v.qt_parcelas) as qt_max_parcelas, 
            IF(COUNT(DISTINCT(v.tp_canal)) > 1, 'multicanal', MAX(v.tp_canal)) as perfil_cliente
        FROM cliente c 
            INNER JOIN venda v ON (c.cliente_id = v.cliente_id) 
        GROUP BY c.cliente_id,c.dt_nascimento 
        ORDER BY c.cliente_id
    """)

df_resultado.show()

+----------+-----+----------------+--------+----------------+--------------+---------------+--------------+
|cliente_id|idade|qt_total_compras|recencia|vr_total_compras|qt_total_itens|qt_max_parcelas|perfil_cliente|
+----------+-----+----------------+--------+----------------+--------------+---------------+--------------+
|    100001|   11|               1|       5|         1399.04|             1|             16|          loja|
|    100002|   46|               1|       9|         1280.07|             1|              9|          loja|
|    100003|   39|               2|       4|         1904.95|             3|              9|          loja|
|    100004|   52|               2|      11|         6860.08|             3|             24|    multicanal|
|    100005|   38|               2|       9|         3207.28|             2|             15|          loja|
|    100006|   72|               1|       4|         3170.72|             4|              9|          loja|
|    100007|   44|          