## Setup spark

## Ingestão
Leitura dos dados no banco de dados e upload para a storage account

In [0]:
from datetime import datetime

now = datetime.now()

from pyspark.sql import SparkSession

STRG_CONTAINER_KEY = 'INSERT YOUR KEY'

session = SparkSession.builder.getOrCreate()

session.conf.set("spark.sql.sourcers.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
session.conf.set("parquet.enable.summary-metadata", "false")
session.conf.set("fs.azure.account.key.trabalho06022003.dfs.core.windows.net", STRG_CONTAINER_KEY)

In [0]:
from pyspark.sql.functions import col

ingestion_strg = "abfss://ingestion@trabalho06022003.dfs.core.windows.net/carloca/" + str(now.year) + "/" + now.strftime('%m') + "/" + now.strftime('%d')

tables = [
    'loc_age_bco',
    'loc_agencia','loc_banco','loc_cargo','loc_cidade','loc_cliente','loc_cor','loc_depto','loc_estado','loc_fabricante','loc_fone_cliente','loc_funcionario','loc_grupo','loc_item_locacao','loc_modelo','loc_operadora','loc_pedido_locacao','loc_proprietario','loc_tp_automovel','loc_tp_cliente','loc_tp_combustivel','loc_veiculo'
]

print(f"TABLE COUNT: {len(tables)}")

for t in tables:
    df = spark.read.format('mysql').\
        option("host", "HOST").\
        option("port", "3306").\
        option("database", "carloca").\
        option("dbtable", t).\
        option("user", "USER").\
        option("password", "PASSWORD").\
        load()
    
    print(t)
    print(df.printSchema())

    #  lower case column names for better joins
    df = df.select([col(x).alias(x.lower()) for x in df.columns])
    
    df.show()
    df.write.parquet(ingestion_strg + "/" + t + ".parquet", mode="overwrite")

TABLE COUNT: 22
loc_age_bco
root
 |-- cd_agencia: integer (nullable = true)
 |-- Nm_Agencia: string (nullable = true)
 |-- cd_banco: integer (nullable = true)

None
+----------+--------------------+--------+
|cd_agencia|          nm_agencia|cd_banco|
+----------+--------------------+--------+
|         1|       RUA BOA VISTA|     246|
|         2|       RUA BOA VISTA|      25|
|         3|       RUA BOA VISTA|     641|
|         4|       RUA BOA VISTA|      29|
|         5|       RUA BOA VISTA|      38|
|         6|       RUA BOA VISTA|     740|
|         7|     RUA 25 DE MAR?O|     107|
|         8|     RUA 25 DE MAR?O|      31|
|         9|RUA BAR?O DE LIMEIRA|      96|
|        10|        AV. PAULISTA|     394|
|        11|  RUA 15 DE NOVEMBRO|     318|
|        12|        AV. PAULISTA|     752|
|        13|       RUA BOA VISTA|     248|
|        14|        AV. PAULISTA|      36|
|        15|        AV. PAULISTA|     237|
|        16|  RUA 15 DE NOVEMBRO|     225|
|        17|  RUA 

## Manager
Agregação dos dados ingeridos e upload para a storage account

In [0]:
ingestion_strg = "abfss://ingestion@trabalho06022003.dfs.core.windows.net/carloca/" + str(now.year) + "/" + now.strftime('%m') + "/" + now.strftime('%d')
manager_strg = "abfss://manager@trabalho06022003.dfs.core.windows.net/carloca/" + str(now.year) + "/" + now.strftime('%m') + "/" + now.strftime('%d')

manager = spark.read.parquet(ingestion_strg + '/loc_operadora.parquet')

# jtable and join with which column
joins = {
    # cliente
    'loc_fone_cliente': 'cd_operadora',
    'loc_cliente': 'cd_cliente',
    'loc_tp_cliente': 'cd_tp_cliente',
    # cliente -> pedido
    'loc_pedido_locacao': 'cd_cliente',
    # pedido -> itens
    'loc_item_locacao': 'nr_pedido',
    # pedido -> loja
    'loc_agencia': 'cd_agencia',
    'loc_cidade': 'cd_cidade',
    'loc_estado': 'cd_estado',
    # item -> veículo
    'loc_veiculo': 'nr_placa',
    'loc_proprietario': 'cd_proprietario',
    'loc_cor': 'cd_cor',
    'loc_grupo': 'cd_grupo',
    'loc_tp_automovel': 'cd_tp_automovel',
    'loc_tp_combustivel': 'cd_tp_combustivel',
    # veículo -> modelo
    'loc_modelo': 'cd_modelo',
    'loc_fabricante': 'cd_fabricante',
    # pedido -> agencia de pagamento
    'loc_age_bco': 'cd_age_bco',
    'loc_banco': 'cd_banco',
    # pedido -> funcionario
    'loc_funcionario': 'cd_func',
    'loc_cargo': 'cd_cargo',
    'loc_depto': 'cd_depto',
}

# for each table and join column
for t, c in joins.items():
    # read the table data
    df = spark.read.parquet(ingestion_strg + '/' + t + '.parquet')

    # make adjusments to the dataframe if needed
    if t == 'loc_pedido_locacao':
        df = df.withColumnRenamed('status', 'pedido_status')
    elif t == 'loc_item_locacao':
        df = df.withColumnRenamed('vl_diaria', 'item_pedido_vl_diaria')
        df = df.withColumnRenamed('vl_diaria_calc', 'item_pedido_vl_diaria_calc')
    elif t == 'loc_veiculo':
        df = df.withColumnRenamed('status', 'veiculo_status')
        df = df.withColumnRenamed('vl_diaria', 'veiculo_vl_diaria')
        df = df.withColumnRenamed('tp_automovel', 'cd_tp_automovel')
        df = df.withColumnRenamed('tp_combustivel', 'cd_tp_combustivel')
    elif t == 'loc_age_bco':
        df = df.withColumnRenamed('cd_agencia', 'cd_age_bco')
        df = df.withColumnRenamed('nm_agencia', 'nm_age_bco')
    elif t == 'loc_proprietario':
        df = df.withColumnRenamed('nr_cpf', 'nr_cpf_proprietario')
    elif t == 'loc_funcionario':
        df = df.withColumnRenamed('nr_cpf', 'nr_cpf_funcionario')
    
    # do equivalent join with other table
    manager = manager.join(df, on=c)

In [0]:
# append manager table
loc_funcionario = spark.read.parquet(ingestion_strg + '/loc_funcionario.parquet')
loc_cargo = spark.read.parquet(ingestion_strg + '/loc_cargo.parquet')
# loc_depto = spark.read.parquet(ingestion_strg + '/loc_depto.parquet')

# loc_gerente = loc_funcionario.withColumnRenamed('cd_gerente', 'cd_gerente_gerente')\
#     .withColumnRenamed('cd_func', 'cd_gerente')\
#     .withColumnRenamed('nm_func', 'nm_gerente')\
#     .withColumnRenamed('dt_inicio', 'dt_inicio_gerente')\
#     .withColumnRenamed('nr_cpf', 'nr_cpf_gerente')\
#     .withColumnRenamed('vl_salario', 'vl_salario_gerente')\
#     .withColumnRenamed('vl_perc_comissao', 'vl_perc_comissao_gerente')\
#     .join(loc_depto, on='cd_depto')\
#     .withColumnRenamed('nm_depto', 'nm_depto_gerente')\
#     .withColumnRenamed('cd_depto', 'cd_depto_gerente')\
#     .withColumnRenamed('vl_orc_depto', 'vl_orc_depto_gerente')\
#     .join(loc_cargo, on='cd_cargo')\
#     .withColumnRenamed('nm_cargo', 'nm_cargo_gerente')\
#     .withColumnRenamed('cd_cargo', 'cd_cargo_gerente')

# manager = manager.join(loc_gerente, 'cd_gerente', 'inner')

In [0]:
manager.printSchema()

manager.write.parquet(manager_strg + "/manager.parquet", mode="overwrite")

root
 |-- cd_depto: integer (nullable = true)
 |-- cd_cargo: integer (nullable = true)
 |-- cd_func: integer (nullable = true)
 |-- cd_banco: integer (nullable = true)
 |-- cd_age_bco: integer (nullable = true)
 |-- cd_fabricante: integer (nullable = true)
 |-- cd_modelo: integer (nullable = true)
 |-- cd_tp_combustivel: integer (nullable = true)
 |-- cd_tp_automovel: integer (nullable = true)
 |-- cd_grupo: integer (nullable = true)
 |-- cd_cor: integer (nullable = true)
 |-- cd_proprietario: integer (nullable = true)
 |-- nr_placa: string (nullable = true)
 |-- cd_estado: integer (nullable = true)
 |-- cd_cidade: integer (nullable = true)
 |-- cd_agencia: integer (nullable = true)
 |-- nr_pedido: integer (nullable = true)
 |-- cd_cliente: integer (nullable = true)
 |-- cd_tp_cliente: integer (nullable = true)
 |-- cd_operadora: integer (nullable = true)
 |-- nm_operadora: string (nullable = true)
 |-- cd_fone_cliente: integer (nullable = true)
 |-- nm_fone_cliente: string (nullable =

## Query
Disponiblização dos dados para consulta

In [0]:
manager_strg = "abfss://manager@trabalho06022003.dfs.core.windows.net/carloca/" + str(now.year) + "/" + now.strftime('%m') + "/" + now.strftime('%d')

manager = spark.read.parquet(manager_strg + '/manager.parquet')

In [0]:
from pyspark.sql.functions import col, year, month, dayofmonth, dayofweek

# todas as dimensões
client_dim = ['cd_cliente','nm_cliente','cd_tp_cliente','nm_tp_cliente','nr_habilitacao','nr_estrelas','cd_fone_cliente','nm_fone_cliente','cd_operadora','nm_operadora']

produto_dim = [
    # 'nm_veiculo',
    'nm_modelo',
    'nr_placa','nr_chassis','km_atual']

loja_dim = ['cd_agencia','nm_agencia',
            # 'dt_ult_reforma',
            'cd_cidade','nm_cidade','cd_estado','nm_estado']

agencia_bco_dim = ['nm_banco', 'nm_age_bco']

data_dim = ['dt_locacao']

funcionario_dim = ['cd_func','nm_func','dt_inicio','cd_gerente','nr_cpf_funcionario','vl_salario','vl_perc_comissao','cd_depto','cd_cargo']

processo_dim = ['dt_entrega', 'dt_retirada']

proprietario_dim = ['nm_proprietario']

modelo_dim = ['nm_fabricante']

tp_veiculo_dim = ['nm_tp_automovel']

# star schema pedido
fato_pedido = ['vl_total', 'item_pedido_vl_diaria', 'item_pedido_vl_diaria_calc', 'veiculo_vl_diaria', 'km_atual']

star_schema1 = manager.select(client_dim + produto_dim + loja_dim + data_dim + funcionario_dim + processo_dim + fato_pedido)

star_schema1 = star_schema1\
    .withColumn('ano', year('dt_locacao'))\
    .withColumn('mes', month('dt_locacao'))\
    .withColumn('dia', dayofmonth('dt_locacao'))\
    .withColumn('dia_semana', dayofweek('dt_locacao'))

# star schema de produto
fato_produto = ['vl_total', 'item_pedido_vl_diaria', 'item_pedido_vl_diaria_calc', 'veiculo_vl_diaria', 'km_atual']

star_schema2 = manager.select(produto_dim + proprietario_dim + loja_dim + modelo_dim + tp_veiculo_dim + data_dim)

star_schema2 = star_schema2\
    .withColumn('ano', year('dt_locacao'))\
    .withColumn('mes', month('dt_locacao'))\
    .withColumn('dia', dayofmonth('dt_locacao'))\
    .withColumn('dia_semana', dayofweek('dt_locacao'))

# star schema de lojas
fato_produto = ['vl_total', 'item_pedido_vl_diaria', 'item_pedido_vl_diaria_calc', 'veiculo_vl_diaria', 'km_atual']

star_schema3 = manager.select(agencia_bco_dim + funcionario_dim + loja_dim + data_dim)

star_schema3 = star_schema3\
    .withColumn('ano', year('dt_locacao'))\
    .withColumn('mes', month('dt_locacao'))\
    .withColumn('dia', dayofmonth('dt_locacao'))\
    .withColumn('dia_semana', dayofweek('dt_locacao'))

print("STAR SCHEMA PEDIDO")
star_schema1.show()

print("STAR SCHEMA PRODUTO")
star_schema2.show()

print("STAR SCHEMA LOJAS")
star_schema3.show()

STAR SCHEMA PEDIDO
+----------+--------------------+-------------+-------------+--------------+-----------+---------------+---------------+------------+------------+--------------------+--------+-----------------+--------+----------+----------+---------+---------+---------+---------+----------+-------+--------------+----------+----------+------------------+----------+----------------+--------+--------+----------+-----------+--------+---------------------+--------------------------+-----------------+--------+----+---+---+----------+
|cd_cliente|          nm_cliente|cd_tp_cliente|nm_tp_cliente|nr_habilitacao|nr_estrelas|cd_fone_cliente|nm_fone_cliente|cd_operadora|nm_operadora|           nm_modelo|nr_placa|       nr_chassis|km_atual|cd_agencia|nm_agencia|cd_cidade|nm_cidade|cd_estado|nm_estado|dt_locacao|cd_func|       nm_func| dt_inicio|cd_gerente|nr_cpf_funcionario|vl_salario|vl_perc_comissao|cd_depto|cd_cargo|dt_entrega|dt_retirada|vl_total|item_pedido_vl_diaria|item_pedido_vl_diaria_