##  Data Science Academy

## Modelagem de Bancos de Dados Relacionais, Não Relacionais e Data Stores

## Modelagem de Bancos de Dados Não Relacionais

## Processo ETL

In [1]:
# Instala o cassandra driver
# https://pypi.org/project/cassandra-driver/
!pip install -q cassandra-driver

In [2]:
# Imports
from cassandra.cluster import Cluster

In [3]:
# Define o cluster
cluster = Cluster(['localhost']) 

In [4]:
# Define a session
session = cluster.connect('projeto2')

In [5]:
# Testa o acesso
linhas = session.execute('select * from TB_CONSULTA1 limit 5;')
for linha in linhas:
    print (linha)

In [6]:
# Variáveis de controle
cassandra_keyspace = "projeto2"

## Carregando os Dados

In [7]:
# Dataset de clientes
df_clientes = spark.read.csv("/home/dmpm/Projeto2/dados/Clientes.csv", header = True, inferSchema = True)

In [8]:
df_clientes.show(5)

+----------+-------------------+--------------------+---------------+-----------+--------------------+--------------------+-----------------+------------+--------------------+--------+------+-----+
|ID_Cliente|               Nome|            Endereco|Data_Nascimento|     Cidade|       Data_Cadastro|               Email|         Latitude|   Longitude|               Senha|   Fonte|Estado|  Zip|
+----------+-------------------+--------------------+---------------+-----------+--------------------+--------------------+-----------------+------------+--------------------+--------+------+-----+
|         1|       Hudson Borer|9611-9809 West Ro...|     1986-12-12| Wood River|2017-10-06 13:04:...|borer-hudson@test...|40.71314890000001| -98.5259864|ccca881f-3e4b-4e5...| Twitter|    SP|68883|
|         2|Domenica Williamson|      101 4th Street|     1967-06-10|  Searsboro|2018-04-08 23:40:...|williamson-domeni...|       41.5813224| -92.6991321|eafc45bf-cf8e-4c9...|Afiliado|    SC|50242|
|         

In [9]:
# Dataset de pedidos
df_pedidos = spark.read.csv("/home/dmpm/Projeto2/dados/Pedidos.csv", header = True, inferSchema = True)

In [10]:
df_pedidos.show(5)

+---------+--------------------+-----------------+----------+----------+------------------+-------+------------------+----------+
|ID_Pedido|         Data_Pedido|         Desconto|ID_Produto|Quantidade|          Subtotal|Imposto|             Total|ID_Cliente|
+---------+--------------------+-----------------+----------+----------+------------------+-------+------------------+----------+
|        1|2019-02-11 08:10:...|             null|        14|         2|37.648145389078365|   2.07|39.718145389078366|         1|
|        2|2018-05-14 19:34:...|             null|       123|         3|110.93145648834248|    6.1| 117.0376564084763|         1|
|        3|2019-12-06 08:52:...|6.416679208849759|       105|         2|52.723521442619514|    2.9| 55.62208681964182|         1|
|        4|2019-08-22 04:00:...|             null|        94|         6|109.21864156655383|   6.01| 115.2207354961295|         1|
|        5|2018-10-09 15:04:...|             null|       132|         5|127.88197029833711

## Junção das Tabelas

In [11]:
# Converte os dados
for i in df_clientes.columns:
    df_clientes = df_clientes.withColumnRenamed(i, i.lower())

In [12]:
# Converte os dados
for i in df_pedidos.columns:
    df_pedidos = df_pedidos.withColumnRenamed(i, i.lower())

In [13]:
# Fazendo o Join das tabelas
df_join = df_pedidos.join(df_clientes, df_pedidos.id_cliente == df_clientes.id_cliente, how = 'inner')

In [14]:
# Schema
df_join.printSchema()

root
 |-- id_pedido: integer (nullable = true)
 |-- data_pedido: timestamp (nullable = true)
 |-- desconto: double (nullable = true)
 |-- id_produto: integer (nullable = true)
 |-- quantidade: integer (nullable = true)
 |-- subtotal: double (nullable = true)
 |-- imposto: double (nullable = true)
 |-- total: double (nullable = true)
 |-- id_cliente: integer (nullable = true)
 |-- id_cliente: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- endereco: string (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- data_cadastro: timestamp (nullable = true)
 |-- email: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- senha: string (nullable = true)
 |-- fonte: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- zip: integer (nullable = true)



## Agregações

In [15]:
from pyspark.sql.functions import col

In [16]:
# Agregação da consulta 1
df_consulta1 = df_join.groupBy('fonte') \
.agg({'total':'sum'}).select('fonte', col('sum(total)').alias('total_vendas'))

In [17]:
df_consulta1.show(5)

+--------+-----------------+
|   fonte|     total_vendas|
+--------+-----------------+
| Twitter|319448.7269814239|
|Afiliado|297605.2749875903|
|  Google|325183.5607881129|
|Organico|319637.0497447002|
|Facebook|333453.5126582293|
+--------+-----------------+



In [18]:
# Agregação da consulta 2
df_consulta2 = df_join.groupBy('estado') \
.agg({'total':'sum'}).select('estado', col('sum(total)').alias('total_vendas'))

In [19]:
df_consulta2.show(5)

+------+------------------+
|estado|      total_vendas|
+------+------------------+
|    SC|  78672.3796432496|
|    PI|60885.920881314254|
|    AM| 32415.70034927819|
|    NJ| 6768.687499559493|
|    RR| 35598.05678620004|
+------+------------------+
only showing top 5 rows



In [20]:
# Agregação da consulta 3
df_consulta3 = df_join.groupBy('estado', 'data_pedido') \
.agg({'desconto':'avg'}).select('estado', 'data_pedido', col('avg(desconto)').alias('media_desconto'))

In [21]:
df_consulta3.show(15)

+------+--------------------+-----------------+
|estado|         data_pedido|   media_desconto|
+------+--------------------+-----------------+
|    AL|2018-04-28 06:18:...|6.012432951799443|
|    PE|2019-12-27 06:51:...|             null|
|    MT|2019-07-11 16:32:...|             null|
|    WV|2020-03-24 07:56:...|             null|
|    DF|2020-03-07 23:58:...|             null|
|    PR|2018-09-29 02:19:...|             null|
|    SC|2019-07-23 05:19:...|             null|
|    BA|2018-02-07 00:52:...|             null|
|    IL|2018-11-11 04:21:...|             null|
|    IL|2020-03-22 02:11:...|             null|
|    MS|2018-09-14 15:39:...|             null|
|    SC|2018-05-06 02:05:...|             null|
|    AP|2020-02-24 05:33:...|             null|
|    MT|2019-11-12 20:24:...|             null|
|    TN|2018-05-01 21:01:...|             null|
+------+--------------------+-----------------+
only showing top 15 rows



## Limpeza dos Dados

In [22]:
from pyspark.sql.functions import to_date

In [23]:
# Ajusta a coluna de data
df_join = df_join.withColumn('data_pedido', to_date(df_join.data_pedido, 'yyyy-MM-dd'))

In [24]:
df_join.show(5)

+---------+-----------+-----------------+----------+----------+------------------+-------+------------------+----------+----------+------------+--------------------+---------------+----------+--------------------+--------------------+-----------------+-----------+--------------------+-------+------+-----+
|id_pedido|data_pedido|         desconto|id_produto|quantidade|          subtotal|imposto|             total|id_cliente|id_cliente|        nome|            endereco|data_nascimento|    cidade|       data_cadastro|               email|         latitude|  longitude|               senha|  fonte|estado|  zip|
+---------+-----------+-----------------+----------+----------+------------------+-------+------------------+----------+----------+------------+--------------------+---------------+----------+--------------------+--------------------+-----------------+-----------+--------------------+-------+------+-----+
|        1| 2019-02-11|             null|        14|         2|37.6481453890783

In [25]:
# Agregação da consulta 3
df_consulta3 = df_join.groupBy('estado', 'data_pedido') \
.agg({'desconto':'avg'}).select('estado', 'data_pedido', col('avg(desconto)').alias('media_desconto'))

In [26]:
df_consulta3.show(15)

+------+-----------+-----------------+
|estado|data_pedido|   media_desconto|
+------+-----------+-----------------+
|    SC| 2018-05-28|             null|
|    DF| 2019-11-15|             null|
|    SP| 2018-01-18|             null|
|    MA| 2018-07-06|             null|
|    MA| 2020-01-31|             null|
|    KY| 2018-08-17|             null|
|    BA| 2018-03-25|3.115455689329357|
|    IN| 2018-03-19|             null|
|    AM| 2019-03-13|             null|
|    ME| 2018-01-23|             null|
|    MT| 2018-04-11|             null|
|    IN| 2019-02-28|             null|
|    DF| 2018-11-02|             null|
|    MT| 2019-02-05|             null|
|    FL| 2017-12-03|6.274049858216723|
+------+-----------+-----------------+
only showing top 15 rows



## Carga no Banco de Dados

### Carregando a Consulta 1

In [27]:
type(df_consulta1)

pyspark.sql.dataframe.DataFrame

In [28]:
df_consulta1.show(5)

+--------+-----------------+
|   fonte|     total_vendas|
+--------+-----------------+
| Twitter|319448.7269814239|
|Afiliado|297605.2749875903|
|  Google|325183.5607881129|
|Organico|319637.0497447002|
|Facebook|333453.5126582293|
+--------+-----------------+



In [29]:
# Gerar valores exclusivos como id
import uuid

In [30]:
# Inserindo um registro
session.execute(
    """
    INSERT INTO TB_CONSULTA1 (ID, FONTE, TOTAL_VENDAS)
    VALUES (%s, %s, %s)
    """,
    (uuid.uuid1(), "Twitter", 319448.72)
)

<cassandra.cluster.ResultSet at 0x7f99beda7550>

In [31]:
# Consultando a tabela
linhas = session.execute('select * from TB_CONSULTA1;')
for linha in linhas:
    print (linha)

Row(id=UUID('1d0af9e4-9bf8-11eb-8ae0-1bf33eb9f15e'), fonte='Twitter', total_vendas=319448.72)


In [32]:
# Coletando todas as linhas do dataframe
rows = df_consulta1.collect()

# Loop pelas linhas
for i in rows:
    print(i.fonte)
    print(i.total_vendas)

Twitter
319448.7269814239
Afiliado
297605.2749875903
Google
325183.5607881129
Organico
319637.0497447002
Facebook
333453.5126582293


In [33]:
# Insere todos os registros na tabela
for i in rows:
    session.execute(
    """
    INSERT INTO TB_CONSULTA1 (ID, FONTE, TOTAL_VENDAS)
    VALUES (%s, %s, %s)
    """,
    (uuid.uuid1(), i.fonte, i.total_vendas)
)

In [34]:
# Consultando a tabela
linhas = session.execute('select * from TB_CONSULTA1;')
for linha in linhas:
    print (linha)

Row(id=UUID('25cae2b0-9bf8-11eb-8ae0-1bf33eb9f15e'), fonte='Twitter', total_vendas=319448.7269814239)
Row(id=UUID('25ccd584-9bf8-11eb-8ae0-1bf33eb9f15e'), fonte='Facebook', total_vendas=333453.5126582293)
Row(id=UUID('25cbf380-9bf8-11eb-8ae0-1bf33eb9f15e'), fonte='Google', total_vendas=325183.5607881129)
Row(id=UUID('25cc5b22-9bf8-11eb-8ae0-1bf33eb9f15e'), fonte='Organico', total_vendas=319637.0497447002)
Row(id=UUID('25cb5d08-9bf8-11eb-8ae0-1bf33eb9f15e'), fonte='Afiliado', total_vendas=297605.2749875903)


### Carregando a Consulta 2

In [35]:
df_consulta2.show(5)

+------+------------------+
|estado|      total_vendas|
+------+------------------+
|    SC|  78672.3796432496|
|    PI|60885.920881314254|
|    AM| 32415.70034927819|
|    NJ| 6768.687499559493|
|    RR| 35598.05678620004|
+------+------------------+
only showing top 5 rows



In [36]:
# Coletando todas as linhas do dataframe
rows2 = df_consulta2.collect()

In [37]:
# Insere todos os registros na tabela
for i in rows2:
    session.execute(
    """
    INSERT INTO TB_CONSULTA2 (ID, ESTADO, TOTAL_VENDAS)
    VALUES (%s, %s, %s)
    """,
    (uuid.uuid1(), i.estado, i.total_vendas)
)

In [38]:
# Consultando a tabela
linhas = session.execute('select * from TB_CONSULTA2;')
for linha in linhas:
    print (linha)

Row(id=UUID('2be03f6a-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='IN', total_vendas=39618.912714715996)
Row(id=UUID('2bde4386-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='NH', total_vendas=2775.888499371694)
Row(id=UUID('2bdf04e2-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='MD', total_vendas=7031.354914905968)
Row(id=UUID('2bdc8168-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='SC', total_vendas=78672.3796432496)
Row(id=UUID('2be199dc-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='WV', total_vendas=18074.826617148246)
Row(id=UUID('2be09e24-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='NM', total_vendas=12484.900805231957)
Row(id=UUID('2be238c4-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='PR', total_vendas=59732.90421633871)
Row(id=UUID('2be0e2c6-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='BA', total_vendas=39054.741385205074)
Row(id=UUID('2be0bbac-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='PA', total_vendas=35051.89459979438)
Row(id=UUID('2be20dfe-9bf8-11eb-8ae0-1bf33eb9f15e'), estado='AK', total_vendas=36984.23330876673)
Row(id=UUID('2bdd

### Carregando a Consulta 3

In [39]:
df_consulta3.show(5)

+------+-----------+--------------+
|estado|data_pedido|media_desconto|
+------+-----------+--------------+
|    SC| 2018-05-28|          null|
|    DF| 2019-11-15|          null|
|    SP| 2018-01-18|          null|
|    MA| 2018-07-06|          null|
|    MA| 2020-01-31|          null|
+------+-----------+--------------+
only showing top 5 rows



In [40]:
# Coletando todas as linhas do dataframe
rows3 = df_consulta3.collect()

In [41]:
# Insere todos os registros na tabela
for i in rows3:
    
    if i.media_desconto == None:
        session.execute(
            """
            INSERT INTO TB_CONSULTA3 (ID, ESTADO, DATA_PEDIDO, MEDIA_DESCONTO)
            VALUES (%s, %s, %s, %s)
            """,
            (uuid.uuid1(), i.estado, i.data_pedido, 0))
    else:
            session.execute(
                """
                INSERT INTO TB_CONSULTA3 (ID, ESTADO, DATA_PEDIDO, MEDIA_DESCONTO)
                VALUES (%s, %s, %s, %s)
                """,
                (uuid.uuid1(), i.estado, i.data_pedido, i.media_desconto))

In [42]:
# Consultando a tabela
linhas = session.execute('select * from TB_CONSULTA3 limit 25;')
for linha in linhas:
    print (linha)

Row(id=UUID('2d2b6e52-9bf9-11eb-8ae0-1bf33eb9f15e'), data_pedido=Date(17288), estado='MT', media_desconto=0.0)
Row(id=UUID('3123fb14-9bf9-11eb-8ae0-1bf33eb9f15e'), data_pedido=Date(17677), estado='FL', media_desconto=0.0)
Row(id=UUID('3195e788-9bf9-11eb-8ae0-1bf33eb9f15e'), data_pedido=Date(17266), estado='MT', media_desconto=0.0)
Row(id=UUID('34160218-9bf9-11eb-8ae0-1bf33eb9f15e'), data_pedido=Date(18024), estado='MT', media_desconto=0.0)
Row(id=UUID('2d221848-9bf9-11eb-8ae0-1bf33eb9f15e'), data_pedido=Date(17835), estado='SC', media_desconto=5.660386613495339)
Row(id=UUID('2f928d38-9bf9-11eb-8ae0-1bf33eb9f15e'), data_pedido=Date(18025), estado='NM', media_desconto=0.0)
Row(id=UUID('3051881e-9bf9-11eb-8ae0-1bf33eb9f15e'), data_pedido=Date(18331), estado='ND', media_desconto=0.0)
Row(id=UUID('313f911c-9bf9-11eb-8ae0-1bf33eb9f15e'), data_pedido=Date(17588), estado='CE', media_desconto=0.0)
Row(id=UUID('32e96362-9bf9-11eb-8ae0-1bf33eb9f15e'), data_pedido=Date(17215), estado='KY', media_d

# Fim