<a href="https://colab.research.google.com/github/viniciusfjacinto/data-engineering-test/blob/main/teste_eng_jr.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Inserting new Data into S3

In [99]:
!pip install awswrangler
!pip install boto3
!pip install dotenv
!pip install pyathena

import boto3
import awswrangler as wr
import pandas as pd
from pyathena import connect
import os
import dotenv
dotenv.load_dotenv()

# Creating a connection between Python and AWS S3
session = boto3.Session(
    aws_access_key_id=os.environ['aws_access_key'],
    aws_secret_access_key=os.environ['aws_secret_key'],
    region_name=os.environ['aws_region']
)

s3_client = session.client('s3')

In [88]:
#Reading csv tables that will structure our schema
person_person = pd.read_csv('Person.Person.csv', sep = ';', decimal=",")
production_product = pd.read_csv('Production.Product.csv', sep = ';', decimal=",")
sales_customer = pd.read_csv('Sales.Customer.csv', sep = ';', decimal=",")
sales_orderdetail = pd.read_csv('Sales.SalesOrderDetail.csv', sep = ';')
sales_orderheader = pd.read_csv('Sales.SalesOrderHeader.csv', sep = ';', decimal=",")
sales_specialofferproduct = pd.read_csv('Sales.SpecialOfferProduct.csv', sep = ';', decimal=",")

#Creating a list of dataframes and their names for easy and fast insertion
dataframe_names = ["person_person","production_product", "sales_customer", "sales_orderdetail", "sales_orderheader", "sales_specialofferproduct"]
dataframes = [person_person, production_product, sales_customer, sales_orderdetail, sales_orderheader, sales_specialofferproduct]

In [89]:
#Inserting data into S3 Bucket in Parquet while creating Amazon Athena's tables

for i,j in zip(dataframes,dataframe_names):

    wr.s3.to_parquet(
        df=i,
        path=f"s3://{os.environ['raw']}/{j}",
        dataset=True,
        database=os.environ['raw'], #this will be our raw/landing zone
        mode = "overwrite", #we will replace table/bucket file if exists
        table=f"{j}",
        boto3_session = session #here we use the connection created initially

    )

    print(f"Insert of table {j} sucessful")

Insert of table person_person sucessful
Insert of table production_product sucessful
Insert of table sales_customer sucessful
Insert of table sales_orderdetail sucessful
Insert of table sales_orderheader sucessful
Insert of table sales_specialofferproduct sucessful


In [100]:
#Create a new AWS connection for querying in Athena

def connect_aws():

  conn = connect(aws_access_key_id=os.environ['aws_access_key'],
                aws_secret_access_key=os.environ['aws_secret_key'],
                s3_staging_dir=os.environ['s3_staging_dir'],
                region_name=os.environ['aws_region'])
  return conn

#Data Analysis / Answering Questions with SQL

In [57]:
# 1.	Escreva uma query que retorna a quantidade de linhas na tabela Sales.SalesOrderDetail
# pelo campo SalesOrderID, desde que tenham pelo menos três linhas de detalhes.


details_by_sales = pd.read_sql(
"""
SELECT SalesOrderID, COUNT(*) AS NumberOfDetails
FROM "raw".sales_orderdetail
GROUP BY SalesOrderID
HAVING COUNT(*) >= 3
""",
connect_aws())

print(f"Total de Linhas: {details_by_sales['NumberOfDetails'].sum()}\n")
print(f"Total de Linhas por SalesOrderID:\n {details_by_sales.head()}")

Total de Linhas: 94157

Total de Linhas por SalesOrderID:
    SalesOrderID  NumberOfDetails
0         43661               15
1         43671               11
2         43676                5
3         43678               19
4         43683               13


In [58]:
# 2. Escreva uma query que ligue as tabelas Sales.SalesOrderDetail, Sales.SpecialOfferProduct e Production.Product
# e retorne os 3 produtos (Name) mais vendidos (pela soma de OrderQty),
# agrupados pelo número de dias para manufatura (DaysToManufacture).

top3_products_by_orderqty = pd.read_sql(
"""
SELECT
    pp.Name AS ProductName,
    pp.DaysToManufacture,
    SUM(sod.OrderQty) AS TotalOrderQuantity
FROM
"raw".sales_orderdetail sod
INNER JOIN "raw".sales_specialofferproduct ssop
  ON sod.specialofferid = ssop.specialofferid and sod.productid = ssop.productid
INNER JOIN "raw".production_product pp
  ON pp.productid = ssop.productid
GROUP BY
    pp.Name, pp.DaysToManufacture
ORDER BY
    SUM(sod.OrderQty) DESC
LIMIT 3
""",
connect_aws()
)

print(f"Principais produtos mais vendidos:\n {top3_products_by_orderqty}")

Principais produtos mais vendidos:
               ProductName  DaysToManufacture  TotalOrderQuantity
0            AWC Logo Cap                  0                8311
1   Water Bottle - 30 oz.                  0                6815
2  Sport-100 Helmet, Blue                  0                6743


In [61]:
# 3.	Escreva uma query ligando as tabelas Person.Person, Sales.Customer e Sales.SalesOrderHeader
# de forma a obter uma lista de nomes de clientes e uma contagem de pedidos efetuados.

orders_by_person = pd.read_sql(
"""
SELECT
    pe.businessentityid as PersonID,
    CONCAT(coalesce(pe.FirstName,''), ' ', COALESCE(pe.middlename,''), ' ', COALESCE(pe.LastName,'')) AS CustomerName,
    COUNT(soh.SalesOrderID) AS NumberOfOrders
FROM
    "raw".Person_Person pe
INNER JOIN
    "raw".sales_customer sc ON pe.BusinessEntityID = sc.PersonID
INNER JOIN
    "raw".sales_orderheader soh ON sc.CustomerID = soh.CustomerID
GROUP BY
    1,2
ORDER BY
    COUNT(soh.SalesOrderID) desc
""",
connect_aws()
)

print(f"Pedidos por cliente: \n {orders_by_person.head()}")

Pedidos por cliente: 
    PersonID       CustomerName  NumberOfOrders
0      4515      Dalton  Perez              28
1     15994    Mason D Roberts              28
2      4855  Charles P Jackson              27
3     12526  Samantha  Jenkins              27
4     17961    Ryan M Thompson              27


In [64]:
#4.	Escreva uma query usando as tabelas Sales.SalesOrderHeader, Sales.SalesOrderDetail e Production.Product,
# de forma a obter a soma total de produtos (OrderQty) por ProductID e OrderDate.

product_orders_by_day = pd.read_sql(
"""SELECT
    sod.ProductID,
    pp.name ProductName,
    soh.OrderDate,
    SUM(sod.OrderQty) AS TotalOrderQuantity
FROM
"raw".sales_orderheader soh
INNER JOIN "raw".sales_orderdetail sod
ON sod.salesorderid = soh.salesorderid
INNER JOIN "raw".production_product pp
ON pp.productid = sod.productid
GROUP BY
    sod.ProductID, pp.name, soh.OrderDate
""",
connect_aws()
)

print(f"Pedidos por Produto e Data: \n {product_orders_by_day.head()}")

Pedidos por Produto e Data: 
    ProductID              ProductName                OrderDate  \
0        771  Mountain-100 Silver, 38  2011-05-31 00:00:00.000   
1        775   Mountain-100 Black, 38  2011-05-31 00:00:00.000   
2        733  ML Road Frame - Red, 52  2011-05-31 00:00:00.000   
3        729  LL Road Frame - Red, 60  2011-05-31 00:00:00.000   
4        756         Road-450 Red, 44  2011-05-31 00:00:00.000   

   TotalOrderQuantity  
0                  10  
1                  22  
2                   4  
3                  16  
4                  14  


In [96]:
# 5.	Escreva uma query mostrando os campos SalesOrderID, OrderDate e TotalDue da tabela Sales.SalesOrderHeader.
# Obtenha apenas as linhas onde a ordem tenha sido feita durante o mês de setembro/2011
# e o total devido esteja acima de 1.000. Ordene pelo total devido decrescente.

sales_by_date_due = pd.read_sql(
"""
SELECT
    SalesOrderID,
    OrderDate,
    TotalDue
FROM
    "raw".sales_orderheader
WHERE
    YEAR(CAST(OrderDate as TIMESTAMP)) = 2011
    AND MONTH(CAST(OrderDate AS TIMESTAMP)) = 9
    AND TotalDue > 1000
ORDER BY
    TotalDue DESC;
""",
connect_aws()
)

print(f"Pedidos por Produto e Data: \n {sales_by_date_due.head()}")

Pedidos por Produto e Data: 
    SalesOrderID                OrderDate   TotalDue
0         44324  2011-09-01 00:00:00.000  3953.9884
1         44441  2011-09-22 00:00:00.000  3953.9884
2         44443  2011-09-22 00:00:00.000  3953.9884
3         44444  2011-09-24 00:00:00.000  3953.9884
4         44445  2011-09-25 00:00:00.000  3953.9884
