Notebook que realiza as queries solicitadas e salva arquivos em _Gold_

In [1]:
# bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
# sparksession
spark = SparkSession.builder\
    .master('local')\
        .appName('queries')\
            .config('spark.executer.memory','1gb')\
                .getOrCreate()

22/08/08 15:42:08 WARN Utils: Your hostname, rbsmotta resolves to a loopback address: 127.0.1.1; using 192.168.1.100 instead (on interface enp1s0)
22/08/08 15:42:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/08 15:42:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# path para arquivos parquet 
work_path = '/home/robson/repositorios/bike-manufacturing-company/bucket/work'

# carregando arquivos parquet
person_df             = spark.read.load(work_path + '/person/person.parquet')
product_df            = spark.read.load(work_path + '/production/product.parquet')
customer_df           = spark.read.load(work_path + '/sales/customer.parquet')
sales_order_detail_df = spark.read.load(work_path + '/sales/sales_order_detail.parquet')
sales_order_header_df = spark.read.load(work_path + '/sales/sales_order_header.parquet')
special_offer_df      = spark.read.load(work_path + '/sales/special_offer.parquet')

# criando tabelas a partir dos dataframes
person_df.write.saveAsTable('person')
product_df.write.saveAsTable('product')
customer_df.write.saveAsTable('customer')
sales_order_detail_df.write.saveAsTable('sales_order_detail')
sales_order_header_df.write.saveAsTable('sales_order_header')
special_offer_df.write.saveAsTable('special_offer')

                                                                                

## Queries

1- Retornar a quantidade de linhas na tabela "Sales.SalesOrderDetail" pelo campo SalesOrderID, desde que tenham pelo menos três linhas de detalhes.

In [6]:
query_1 = spark.sql(
    """
        SELECT SalesOrderID
        FROM sales_order_detail
        GROUP BY SalesOrderID
        HAVING count(SalesOrderDetailID) >= 3 
        
    """
)

query_1.count()

12757

2- Ligar as tabelas "Sales.SalesOrderDetail", "Sales.SpecialOfferProduct" e "Production.Product" e retornar o nome dos 3 produtos mais vendidos, pela soma "OrderQty", agrupados pelo número de dias para manufatura ("DaysToManufacture") 

In [7]:
query_2 = spark.sql(
    """ 
        SELECT prod.Name, sum(sod.OrderQty) as Qty, prod.DaysToManufacture
        FROM product as prod
        INNER JOIN sales_order_detail as sod on sod.ProductID = prod.ProductID
        INNER JOIN special_offer as so on so.ProductID = prod.ProductID
        GROUP BY prod.Name, prod.DaysToManufacture
        ORDER BY Qty DESC
        
    """
)

query_2.show(3)

+--------------------+-----+-----------------+
|                Name|  Qty|DaysToManufacture|
+--------------------+-----+-----------------+
|Sport-100 Helmet,...|33715|                0|
|        AWC Logo Cap|33244|                0|
|Sport-100 Helmet,...|32660|                0|
+--------------------+-----+-----------------+
only showing top 3 rows



3- Obter uma lista de nomes de clientes e uma contagem de pedidos efetuados utilizando as tabelas "Person.Person", "Sales.Customer" e "Sales.SalesOrderHeader".

In [8]:
query_3 = spark.sql(
    """ 
        SELECT pers.LastName, pers.FirstName, COUNT(soh.SalesOrderID) as CountOrderQty
        FROM sales_order_header as soh
        INNER JOIN customer AS c ON c.CustomerID = soh.CustomerID
        INNER JOIN person AS pers ON pers.BusinessEntityID = c.CustomerID
        GROUP BY pers.FirstName, pers.LastName 
        ORDER BY CountOrderQty DESC

    """
)

query_3.show()

+--------+---------+-------------+
|LastName|FirstName|CountOrderQty|
+--------+---------+-------------+
|  Miller|   Morgan|           28|
|  Taylor| Jennifer|           28|
|    Shen| Marshall|           27|
| Jackson|   Morgan|           27|
| Carlson|    Ruben|           27|
|  Wilson|  Natalie|           27|
|   Lewis|    Grace|           27|
|   Moore| Isabella|           27|
|   Jones|  Brianna|           27|
| Vazquez|    Ruben|           27|
|   Lewis|   Morgan|           27|
|     Lee|    Grace|           27|
|  Martin|  Natalie|           27|
|     Zhu|  Barbara|           25|
|   Brown| Isabella|           17|
|  Walker|   Morgan|           17|
|     Cai| Marshall|           17|
|    Sanz|      Joe|           17|
|  Prasad|    Ruben|           17|
|Thompson|  Kaitlyn|           17|
+--------+---------+-------------+
only showing top 20 rows



4- Obter a soma total de produtos (OrderQty) por ProductID e OrderDate das tabelas "Sales.SalesOrderHeader", "Sales.SalesOrderDetail" e "Production.Product".

In [9]:
query_4 = spark.sql(
    """ 
        SELECT prod.ProductID, soh.OrderDate, SUM(sod.OrderQty) as Qty
        FROM sales_order_header as soh
        INNER JOIN sales_order_detail as sod ON sod.SalesOrderID = soh.SalesOrderID
        INNER JOIN product as prod ON prod.ProductID = sod.ProductID
        GROUP BY prod.ProductID, soh.OrderDate
        ORDER BY Qty DESC
        
    """
)

query_4.show()

+---------+-------------------+---+
|ProductID|          OrderDate|Qty|
+---------+-------------------+---+
|      864|2013-06-29 21:00:00|498|
|      864|2013-07-30 21:00:00|465|
|      884|2013-06-29 21:00:00|444|
|      867|2013-06-29 21:00:00|427|
|      864|2014-03-30 21:00:00|424|
|      884|2013-07-30 21:00:00|420|
|      712|2013-06-29 21:00:00|415|
|      863|2012-06-29 21:00:00|409|
|      715|2013-06-29 21:00:00|406|
|      876|2013-07-30 21:00:00|397|
|      864|2014-04-30 21:00:00|383|
|      864|2013-09-29 21:00:00|383|
|      864|2013-10-29 22:00:00|380|
|      869|2013-07-30 21:00:00|374|
|      876|2013-06-29 21:00:00|363|
|      712|2013-07-30 21:00:00|363|
|      863|2013-03-29 21:00:00|358|
|      863|2012-05-29 21:00:00|357|
|      867|2013-07-30 21:00:00|356|
|      715|2013-07-30 21:00:00|354|
+---------+-------------------+---+
only showing top 20 rows



5- Obter apenas as linhas onde a ordem tenha sido feita durante o mês de setembro/2011 e o total devido esteja acima de 1000, ordenando pelo total devido descrescente, utilizando os campos SalesOrderID, OrderDate e TotalDue da tabela Sales.SalesOrderHeader.

In [10]:
query_5 = spark.sql(
    """ 
        SELECT SalesOrderID, OrderDate, TotalDue
        FROM sales_order_header
        WHERE MONTH(OrderDate) = 09 AND YEAR(OrderDate) = 2011
            AND TotalDue > 1000
        ORDER BY TotalDue DESC
    
    """
)

query_5.show()

+------------+-------------------+-----------+
|SalesOrderID|          OrderDate|   TotalDue|
+------------+-------------------+-----------+
|       44518|2011-09-30 21:00:00|142312.2199|
|       44528|2011-09-30 21:00:00|122500.6617|
|       44530|2011-09-30 21:00:00|118206.3408|
|       44552|2011-09-30 21:00:00| 110276.574|
|       44534|2011-09-30 21:00:00| 96937.2022|
|       44523|2011-09-30 21:00:00| 83772.7786|
|       44547|2011-09-30 21:00:00| 77692.7415|
|       44492|2011-09-30 21:00:00|  75971.892|
|       44570|2011-09-30 21:00:00| 65872.3967|
|       44538|2011-09-30 21:00:00| 65105.6812|
|       44520|2011-09-30 21:00:00| 64981.7034|
|       44541|2011-09-30 21:00:00| 57231.0827|
|       44549|2011-09-30 21:00:00| 55212.4098|
|       44563|2011-09-30 21:00:00| 53733.7469|
|       44567|2011-09-30 21:00:00| 52872.0292|
|       44509|2011-09-30 21:00:00| 49116.1883|
|       44513|2011-09-30 21:00:00| 49012.5587|
|       44514|2011-09-30 21:00:00| 47939.4352|
|       44561

## Salvando resultados na pasta Gold

In [None]:
gold_path = '/home/robson/repositorios/bike-manufacturing-company/bucket/gold'

query_1.write.mode('overwrite').save(gold_path + '/query_1')
query_2.write.mode('overwrite').save(gold_path + '/query_2')
query_3.write.mode('overwrite').save(gold_path + '/query_3')
query_4.write.mode('overwrite').save(gold_path + '/query_4')
query_5.write.mode('overwrite').save(gold_path + '/query_5')