### Importando as bibliotecas

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

###Conexão com blob storage

In [0]:
# Função criada para facilitar a inserção de parâmetros durante a criação da conexão do Blob com DBFS
 
def con_to_blob(storage_account_name, access_key, container):
    if any(mount.mountPoint == f"/mnt/{container}" for mount in dbutils.fs.mounts()):
        print("Conexão existente")
    else:
        con_url = f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/"
        dbutils.fs.mount(
            source = con_url,
            mount_point = f"/mnt/{container}",
            extra_configs = {
                f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net":access_key
        }
    )

In [0]:
# Paramêtros para criar a conexão entre o container "roxtest" do blob com o DBFS
 
con_to_blob("stroxtest001", "key-test","roxtest")

## Importando Tabelas

In [0]:
# Configurando File location e type das tabelas
file_location1 = "dbfs:/mnt/roxtest/Sales_SalesOrderDetail.csv"
file_location2 = "dbfs:/mnt/roxtest/Person_Person.csv"
file_location3 = "dbfs:/mnt/roxtest/Production_Product.csv"
file_location4 = "dbfs:/mnt/roxtest/Sales_Customer.csv"
file_location5 = "dbfs:/mnt/roxtest/Sales_SalesOrderHeader.csv"
file_location6 = "dbfs:/mnt/roxtest/Sales_SpecialOfferProduct.csv"
file_type = "csv"

# Configurando CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ";"

### Tabela 01 - Sales Order Detail
Tabela com as informações de pedido e venda.

In [0]:
# query de importação utilizando as configurações do Cmd5
df_salesorderdetail = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location1)

# visualização parcial da tabela
display(df_salesorderdetail)

SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
43659,1,4911-403C-98,1,776,1,2024994,0,2024.994,B207C96D-D9E6-402B-8470-2CC176C42283,2011-05-31 00:00:00.000
43659,2,4911-403C-98,3,777,1,2024994,0,6074.982,7ABB600D-1E77-41BE-9FE5-B9142CFC08FA,2011-05-31 00:00:00.000
43659,3,4911-403C-98,1,778,1,2024994,0,2024.994,475CF8C6-49F6-486E-B0AD-AFC6A50CDD2F,2011-05-31 00:00:00.000
43659,4,4911-403C-98,1,771,1,2039994,0,2039.994,04C4DE91-5815-45D6-8670-F462719FBCE3,2011-05-31 00:00:00.000
43659,5,4911-403C-98,1,772,1,2039994,0,2039.994,5A74C7D2-E641-438E-A7AC-37BF23280301,2011-05-31 00:00:00.000
43659,6,4911-403C-98,2,773,1,2039994,0,4079.988,CE472532-A4C0-45BA-816E-EEFD3FD848B3,2011-05-31 00:00:00.000
43659,7,4911-403C-98,1,774,1,2039994,0,2039.994,80667840-F962-4EE3-96E0-AECA108E0D4F,2011-05-31 00:00:00.000
43659,8,4911-403C-98,3,714,1,288404,0,86.5212,E9D54907-E7B7-4969-80D9-76BA69F8A836,2011-05-31 00:00:00.000
43659,9,4911-403C-98,1,716,1,288404,0,28.8404,AA542630-BDCD-4CE5-89A0-C1BF82747725,2011-05-31 00:00:00.000
43659,10,4911-403C-98,6,709,1,570,0,34.2,AC769034-3C2F-495C-A5A7-3B71CDB25D4E,2011-05-31 00:00:00.000


In [0]:
# verificando o schema da tabela
df_salesorderdetail.printSchema()

root
 |-- SalesOrderID: string (nullable = true)
 |-- SalesOrderDetailID: string (nullable = true)
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- OrderQty: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- SpecialOfferID: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- UnitPriceDiscount: string (nullable = true)
 |-- LineTotal: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: string (nullable = true)



#### Observações da Tabela 01
  Todos os campos devido a importação de um arquivo .csv se encontram como string, como observado no Cmd8, sendo assim, necessário a conversão dos mesmos para cada tipo referente.\
  Já os campos 'UnitPrice' e 'UnitPriceDiscount' se encontram com , (vírgula) necessário a substituição para . (ponto) e depois a conversão dos mesmos para tipo 'double'.\

In [0]:
# verificando o resumo da tabela e suas colunas
df_salesorderdetail.summary().display()

summary,SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
count,121317.0,121317.0,121317,121317.0,121317.0,121317.0,121317.0,121317.0,121317.0,121317,121317
mean,57827.363782487206,60659.0,,2.2660797744751355,841.6808361565156,1.1625411113034447,,,905.449206622974,,
stddev,9009.14790159367,35021.345640908774,,2.491323174727561,86.4521237377648,1.218604380202876,,,1693.4173889709234,,
min,43659.0,1.0,000A-434D-BC,1.0,707.0,1.0,13282.0,0.0,1.374,0000C99C-2B71-4885-B976-C1CCAE896EF2,2011-05-31 00:00:00.000
25%,49884.0,30331.0,,1.0,768.0,1.0,,,24.99,,
50%,57029.0,60653.0,,1.0,863.0,1.0,,,134.982,,
75%,65485.0,90979.0,,3.0,921.0,1.0,,,1120.49,,
max,75123.0,99999.0,,9.0,999.0,9.0,9865742.0,40.0,9944.658192,FFFF365B-8BAC-4A28-B3A3-35C17BCFFA01,2014-06-30 00:00:00.000


In [0]:
# verificando a existência de campos 'Nan' ou 'Null'
df_salesorderdetail.select([count(when(isnan(c) | col(c).isNull() | (col(c) == 'NULL'), c)).alias(c) for c in df_salesorderdetail.columns]).display()

SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
0,0,60398,0,0,0,0,0,0,0,0


Podemos observar que não consta nenhum campo Nan ou Null na tabela.

In [0]:
# realizando a conversão de types de cada coluna e aplicando regexp_replace nas colunas necessárias
df_salesorderdetail_v2 = (df_salesorderdetail\
                                  .withColumn("SalesOrderID", col("SalesOrderID").cast("int"))
                                  .withColumn("SalesOrderDetailID", col("SalesOrderDetailID").cast("int"))
                                  .withColumn("OrderQty", col("OrderQty").cast("int"))
                                  .withColumn("ProductID", col("ProductID").cast("int"))
                                  .withColumn("SpecialOfferID", col("SpecialOfferID").cast("int"))
                                  .withColumn("UnitPrice", regexp_replace(col("UnitPrice"), ",",".").cast("double"))
                                  .withColumn("UnitPriceDiscount", regexp_replace(col("UnitPriceDiscount"), ",",".").cast("double"))
                                  .withColumn("ModifiedDate", to_date(col("ModifiedDate"))))

In [0]:
# verificando alteração realizada no cmd13
df_salesorderdetail_v2.printSchema()

root
 |-- SalesOrderID: integer (nullable = true)
 |-- SalesOrderDetailID: integer (nullable = true)
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- OrderQty: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- SpecialOfferID: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- UnitPriceDiscount: double (nullable = true)
 |-- LineTotal: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: date (nullable = true)



### Tabela 02 - Person
Tabela com as informações dos clientes.

In [0]:
# query de importação utilizando as configurações do Cmd5
df_person = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location2)

# 
display(df_person)

BusinessEntityID,PersonType,NameStyle,Title,FirstName,MiddleName,LastName,Suffix,EmailPromotion,AdditionalContactInfo,Demographics,rowguid,ModifiedDate
1,EM,0,,Ken,J,Sánchez,,0,,"""0""",92C4279F-1207-48A3-8448-4636514EB7E2,2009-01-07 00:00:00.000
2,EM,0,,Terri,Lee,Duffy,,1,,"""0""",D8763459-8AA8-47CC-AFF7-C9079AF79033,2008-01-24 00:00:00.000
3,EM,0,,Roberto,,Tamburello,,0,,"""0""",E1A2555E-0828-434B-A33B-6F38136A37DE,2007-11-04 00:00:00.000
4,EM,0,,Rob,,Walters,,0,,"""0""",F2D7CE06-38B3-4357-805B-F4B6B71C01FF,2007-11-28 00:00:00.000
5,EM,0,Ms.,Gail,A,Erickson,,0,,"""0""",F3A3F6B4-AE3B-430C-A754-9F2231BA6FEF,2007-12-30 00:00:00.000
6,EM,0,Mr.,Jossef,H,Goldberg,,0,,"""0""",0DEA28FD-EFFE-482A-AFD3-B7E8F199D56F,2013-12-16 00:00:00.000
7,EM,0,,Dylan,A,Miller,,2,,"""0""",C45E8AB8-01BE-4B76-B215-820C8368181A,2009-02-01 00:00:00.000
8,EM,0,,Diane,L,Margheim,,0,,"""0""",A948E590-4A56-45A9-BC9A-160A1CC9D990,2008-12-22 00:00:00.000
9,EM,0,,Gigi,N,Matthew,,0,,"""0""",5FC28C0E-6D36-4252-9846-05CAA0B1F6C5,2009-01-09 00:00:00.000
10,EM,0,,Michael,,Raheem,,2,,"""0""",CA2C740E-75B2-420C-9D4B-E3CBC6609604,2009-04-26 00:00:00.000


In [0]:
# verificando a existência de campos 'Nan' ou 'Nulls'
df_person.select([count(when(isnan(c) | col(c).isNull() | (col(c) == 'NULL'), c)).alias(c) for c in df_person.columns]).display()

BusinessEntityID,PersonType,NameStyle,Title,FirstName,MiddleName,LastName,Suffix,EmailPromotion,AdditionalContactInfo,Demographics,rowguid,ModifiedDate
0,0,0,18963,0,8499,0,19919,0,19962,0,0,0


In [0]:
# verificando o Schema
df_person.printSchema()

root
 |-- BusinessEntityID: string (nullable = true)
 |-- PersonType: string (nullable = true)
 |-- NameStyle: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- MiddleName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Suffix: string (nullable = true)
 |-- EmailPromotion: string (nullable = true)
 |-- AdditionalContactInfo: string (nullable = true)
 |-- Demographics: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: string (nullable = true)



In [0]:
# realizando a conversão de types de cada coluna e aplicando regexp_replace nas colunas necessárias
df_person_v2 = (df_person\
                     .withColumn("BusinessEntityID", col("BusinessEntityID").cast("int"))
                     .withColumn("ModifiedDate", to_date(col("ModifiedDate"))))

### Tabela 03 - Product
Tabela com as informações dos produtos.

In [0]:
# query de importação utilizando as configurações do Cmd5
df_product = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location3)

display(df_product)

ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
1,Adjustable Race,AR-5381,0,0,,1000,750,0,0,,,,,0,,,,,,2008-04-30 00:00:00.000,,,694215B7-08F7-4C0D-ACB1-D734BA44C0C8,2014-02-08 10:01:36.827
2,Bearing Ball,BA-8327,0,0,,1000,750,0,0,,,,,0,,,,,,2008-04-30 00:00:00.000,,,58AE3C20-4F3A-4749-A7D4-D568806CC537,2014-02-08 10:01:36.827
3,BB Ball Bearing,BE-2349,1,0,,800,600,0,0,,,,,1,,,,,,2008-04-30 00:00:00.000,,,9C21AED2-5BFA-4F18-BCB8-F11638DC2E4E,2014-02-08 10:01:36.827
4,Headset Ball Bearings,BE-2908,0,0,,800,600,0,0,,,,,0,,,,,,2008-04-30 00:00:00.000,,,ECFED6CB-51FF-49B5-B06C-7D8AC834DB8B,2014-02-08 10:01:36.827
316,Blade,BL-2036,1,0,,800,600,0,0,,,,,1,,,,,,2008-04-30 00:00:00.000,,,E73E9750-603B-4131-89F5-3DD15ED5FF80,2014-02-08 10:01:36.827
317,LL Crankarm,CA-5965,0,0,Black,500,375,0,0,,,,,0,,L,,,,2008-04-30 00:00:00.000,,,3C9D10B7-A6B2-4774-9963-C19DCEE72FEA,2014-02-08 10:01:36.827
318,ML Crankarm,CA-6738,0,0,Black,500,375,0,0,,,,,0,,M,,,,2008-04-30 00:00:00.000,,,EABB9A92-FA07-4EAB-8955-F0517B4A4CA7,2014-02-08 10:01:36.827
319,HL Crankarm,CA-7457,0,0,Black,500,375,0,0,,,,,0,,,,,,2008-04-30 00:00:00.000,,,7D3FD384-4F29-484B-86FA-4206E276FE58,2014-02-08 10:01:36.827
320,Chainring Bolts,CB-2903,0,0,Silver,1000,750,0,0,,,,,0,,,,,,2008-04-30 00:00:00.000,,,7BE38E48-B7D6-4486-888E-F53C26735101,2014-02-08 10:01:36.827
321,Chainring Nut,CN-6137,0,0,Silver,1000,750,0,0,,,,,0,,,,,,2008-04-30 00:00:00.000,,,3314B1D7-EF69-4431-B6DD-DC75268BD5DF,2014-02-08 10:01:36.827


In [0]:
# verificando a existência de campos 'Nan' ou 'Nulls'
df_product.select([count(when(isnan(c) | col(c).isNull() | (col(c) == 'NULL'), c)).alias(c) for c in df_product.columns]).display()

ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
0,0,0,0,0,248,0,0,0,0,293,328,299,299,0,226,257,293,209,209,0,406,504,0,0


In [0]:
# realizando a conversão de types de cada coluna e aplicando regexp_replace nas colunas necessárias
df_product_v2 = (df_product\
                                  .withColumn("ProductID", col("ProductID").cast("int"))
                                  .withColumn("DaysToManufacture", col("DaysToManufacture").cast("int"))
                                  .withColumn("StandardCost", regexp_replace(col("StandardCost"), ",",".").cast("double"))
                                  .withColumn("ListPrice", regexp_replace(col("ListPrice"), ",",".").cast("double"))
                                  .withColumn("ModifiedDate", to_date(col("ModifiedDate"))))

In [0]:
# verificando Schema
df_product_v2.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- MakeFlag: string (nullable = true)
 |-- FinishedGoodsFlag: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- SafetyStockLevel: string (nullable = true)
 |-- ReorderPoint: string (nullable = true)
 |-- StandardCost: double (nullable = true)
 |-- ListPrice: double (nullable = true)
 |-- Size: string (nullable = true)
 |-- SizeUnitMeasureCode: string (nullable = true)
 |-- WeightUnitMeasureCode: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- DaysToManufacture: integer (nullable = true)
 |-- ProductLine: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- ProductSubcategoryID: string (nullable = true)
 |-- ProductModelID: string (nullable = true)
 |-- SellStartDate: string (nullable = true)
 |-- SellEndDate: string (nullable = true)
 |-- DiscontinuedDate: string (nullabl

In [0]:
# resumo das informações
df_product_v2.summary().display()


summary,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid
count,504.0,504,504,504.0,504.0,504,504.0,504.0,504.0,504.0,504,504,504,504.0,504.0,504,504,504,504.0,504.0,504,504,504.0,504
mean,673.0396825396825,,,0.4742063492063492,0.5853174603174603,,535.1507936507936,401.3630952380952,258.60296130952383,438.66624999999937,48.983050847457626,,,74.06921951219512,1.1031746031746033,,,,12.294915254237289,37.44406779661017,,,,
stddev,229.37314180957824,,,0.4998303540029047,0.4931566460541862,,374.1129536209128,280.5847152156847,461.63280768892565,773.6028426304862,7.249196331844457,,,182.16658823718495,1.4926159675916597,,,,9.860134530977971,34.02544201748429,,,,
min,1.0,AWC Logo Cap,AR-5381,0.0,0.0,Black,100.0,3.0,0.0,0.0,38,CM,G,1000.0,0.0,M,H,M,1.0,1.0,2008-04-30 00:00:00.000,2012-05-29 00:00:00.000,,01A8C3FC-ED52-458E-A634-D5B6E2ACCFED
25%,447.0,,,0.0,0.0,,100.0,75.0,0.0,0.0,44.0,,,2.88,0.0,,,,2.0,11.0,,,,
50%,747.0,,,0.0,1.0,,500.0,375.0,23.3722,49.99,48.0,,,17.9,1.0,,,,12.0,26.0,,,,
75%,873.0,,,1.0,1.0,,1000.0,750.0,308.2179,564.99,54.0,,,27.35,1.0,,,,17.0,49.0,,,,
max,999.0,"Women's Tights, S",WB-H098,1.0,1.0,Yellow,800.0,750.0,2171.2942,3578.27,XL,,,,4.0,T,,W,,,2013-05-30 00:00:00.000,,,FE0678ED-AEF2-4C58-A450-8151CC24DDD8


### Tabela 04 - Customer
Tabela com as informações dos clientes.

In [0]:
# query de importação utilizando as configurações do Cmd5
df_customer = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location4)

display(df_customer)

CustomerID,PersonID,StoreID,TerritoryID,AccountNumber,rowguid,ModifiedDate
1,,934.0,1,AW00000001,3F5AE95E-B87D-4AED-95B4-C3797AFCB74F,2014-09-12 11:15:07.263
2,,1028.0,1,AW00000002,E552F657-A9AF-4A7D-A645-C429D6E02491,2014-09-12 11:15:07.263
3,,642.0,4,AW00000003,130774B1-DB21-4EF3-98C8-C104BCD6ED6D,2014-09-12 11:15:07.263
4,,932.0,4,AW00000004,FF862851-1DAA-4044-BE7C-3E85583C054D,2014-09-12 11:15:07.263
5,,1026.0,4,AW00000005,83905BDC-6F5E-4F71-B162-C98DA069F38A,2014-09-12 11:15:07.263
6,,644.0,4,AW00000006,1A92DF88-BFA2-467D-BD54-FCB9E647FDD7,2014-09-12 11:15:07.263
7,,930.0,1,AW00000007,03E9273E-B193-448E-9823-FE0C44AEED78,2014-09-12 11:15:07.263
8,,1024.0,5,AW00000008,801368B1-4323-4BFA-8BEA-5B5B1E4BD4A0,2014-09-12 11:15:07.263
9,,620.0,5,AW00000009,B900BB7F-23C3-481D-80DA-C49A5BD6F772,2014-09-12 11:15:07.263
10,,928.0,6,AW00000010,CDB6698D-2FF1-4FBA-8F22-60AD1D11DABD,2014-09-12 11:15:07.263


In [0]:
# resumo com as informações
df_customer.summary().display()

summary,CustomerID,PersonID,StoreID,TerritoryID,AccountNumber,rowguid,ModifiedDate
count,19820.0,19820.0,19820.0,19820.0,19820,19820,19820
mean,19844.2770938446,11184.19022961452,1037.6549401197606,5.82497477295661,,,
stddev,6581.7859142707575,5578.70597685964,475.9147548439723,3.0426757383909195,,,
min,1.0,10000.0,1000.0,1.0,AW00000001,0006E071-D04E-426D-81CA-D512E229F3E2,2014-09-12 11:15:07.263
25%,15252.0,6438.0,648.0,4.0,,,
50%,20207.0,11217.0,992.0,6.0,,,
75%,25161.0,15996.0,1342.0,9.0,,,
max,99.0,,,9.0,AW00030118,FFFFF252-5BFC-4823-84B0-380FFF3B0CF4,2014-09-12 11:15:07.263


In [0]:
# verificando a existência de campos 'Nan' ou 'Nulls'
df_customer.select([count(when(isnan(c) | col(c).isNull() | (col(c) == 'NULL'), c)).alias(c) for c in df_customer.columns]).display()

CustomerID,PersonID,StoreID,TerritoryID,AccountNumber,rowguid,ModifiedDate
0,701,18484,0,0,0,0


In [0]:
# realizando a conversão de types de cada coluna e aplicando regexp_replace nas colunas necessárias
df_customer_v2 = (df_customer\
                                  .withColumn("CustomerID", col("CustomerID").cast("int"))
                                  .withColumn("PersonID", col("PersonID").cast("int"))
                                  .withColumn("StoreID", col("StoreID").cast("int"))
                                  .withColumn("TerritoryID", col("TerritoryID").cast("int"))
                                  .withColumn("ModifiedDate", to_date(col("ModifiedDate"))))

In [0]:
# verificando Schema
df_customer_v2.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- PersonID: integer (nullable = true)
 |-- StoreID: integer (nullable = true)
 |-- TerritoryID: integer (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: date (nullable = true)



### Tabela 05 - Sales Order Header
Tabela com as informações dos pedidos de venda.

In [0]:
# query de importação utilizando as configurações do Cmd5
df_salesorderheader = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location5)

display(df_salesorderheader)

SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,SalesPersonID,TerritoryID,BillToAddressID,ShipToAddressID,ShipMethodID,CreditCardID,CreditCardApprovalCode,CurrencyRateID,SubTotal,TaxAmt,Freight,TotalDue,Comment,rowguid,ModifiedDate
43659,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43659,PO522145787,10-4020-000676,29825,279.0,5,985,985,5,16281.0,105041Vi84182,,205656206,19715149,6160984,231532339,,79B65321-39CA-4115-9CBA-8FE0903E12E6,2011-06-07 00:00:00.000
43660,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43660,PO18850127500,10-4020-000117,29672,279.0,5,921,921,5,5618.0,115213Vi29411,,12942529,1242483,388276,14573288,,738DC42D-D03B-48A1-9822-F95A67EA7389,2011-06-07 00:00:00.000
43661,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43661,PO18473189620,10-4020-000442,29734,282.0,6,517,517,5,1346.0,85274Vi6854,4.0,327264786,31537696,985553,368658012,,D91B9131-18A4-4A11-BC3A-90B6F53E9D74,2011-06-07 00:00:00.000
43662,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43662,PO18444174044,10-4020-000227,29994,282.0,6,482,482,5,10456.0,125295Vi53935,4.0,288325289,27751646,8672389,324749324,,4A1ECFC0-CC3A-4740-B028-1C50BB48711C,2011-06-07 00:00:00.000
43663,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43663,PO18009186470,10-4020-000510,29565,276.0,4,1073,1073,5,4322.0,45303Vi22691,,4194589,402681,125838,4723108,,9B1E7A40-6AE0-4AD3-811C-A64951857C4B,2011-06-07 00:00:00.000
43664,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43664,PO16617121983,10-4020-000397,29898,280.0,1,876,876,5,806.0,95555Vi4081,,244326088,23449921,73281,275104109,,22A8A5DA-8C22-42AD-9241-839489B6EF0D,2011-06-07 00:00:00.000
43665,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43665,PO16588191572,10-4020-000146,29580,283.0,1,849,849,5,15232.0,35568Vi78804,,143527713,13759427,4299821,161586961,,5602C304-853C-43D7-9E79-76E320D476CF,2011-06-07 00:00:00.000
43666,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43666,PO16008173883,10-4020-000511,30052,276.0,4,1074,1074,5,13349.0,105623Vi69217,,50564896,4863747,1519921,56948564,,E2A90057-1366-4487-8A7E-8085845FF770,2011-06-07 00:00:00.000
43667,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43667,PO15428132599,10-4020-000646,29974,277.0,3,629,629,5,10370.0,55680Vi53503,,6107082,5861203,1831626,68763649,,86D5237D-432D-4B21-8ABC-671942F5789D,2011-06-07 00:00:00.000
43668,8,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5,0,SO43668,PO14732180295,10-4020-000514,29614,282.0,6,529,529,5,1566.0,85817Vi8045,4.0,359441562,34617654,10818017,404877233,,281CC355-D538-494E-9B44-461B36A826C6,2011-06-07 00:00:00.000


In [0]:
# resumo do DF
df_salesorderheader.summary().display()

summary,SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,SalesPersonID,TerritoryID,BillToAddressID,ShipToAddressID,ShipMethodID,CreditCardID,CreditCardApprovalCode,CurrencyRateID,SubTotal,TaxAmt,Freight,TotalDue,Comment,rowguid,ModifiedDate
count,31465.0,31465.0,31465,31465,31465,31465.0,31465.0,31465,31465,31465,31465.0,31465.0,31465.0,31465.0,31465.0,31465.0,31465.0,31465,31465.0,31465.0,31465.0,31465.0,31465.0,31465.0,31465,31465
mean,59391.0,8.000953440330527,,,,5.0,0.8790402034006038,,,,20170.17568727157,280.6079873883342,6.090767519466073,18263.1544255522,18249.192563165423,1.4838391863975846,9684.100448341796,,9191.499570692617,,,,,,,
stddev,9083.307446446292,0.0308635959627022,,,,0.0,0.3260857304097679,,,,6261.728960399542,4.846964645718457,2.9581192269208807,8210.06915779192,8218.42926281756,1.3043429216390716,5566.299591312086,,2945.17009538216,,,,,,,
min,43659.0,8.0,2011-05-31 00:00:00.000,2011-06-12 00:00:00.000,2011-06-07 00:00:00.000,5.0,0.0,SO43659,,10-4020-000001,11000.0,274.0,1.0,1000.0,1000.0,1.0,1.0,1016006Vi91301,10000.0,1374.0,1099.0,344.0,15183.0,,0000DE87-AB3F-4920-AC46-C404834241A0,2011-06-07 00:00:00.000
25%,51523.0,8.0,,,,5.0,1.0,,,,14432.0,277.0,4.0,14080.0,14063.0,1.0,4894.0,,8510.0,,,,,,,
50%,59390.0,8.0,,,,5.0,1.0,,,,19450.0,279.0,6.0,19447.0,19436.0,1.0,9717.0,,10074.0,,,,,,,
75%,67256.0,8.0,,,,5.0,1.0,,,,25992.0,284.0,9.0,24678.0,24670.0,1.0,14508.0,,11282.0,,,,,,,
max,75123.0,9.0,2014-06-30 00:00:00.000,2014-07-12 00:00:00.000,2014-07-07 00:00:00.000,5.0,1.0,SO75123,PO9976195169,10-4030-029483,30118.0,,9.0,999.0,999.0,5.0,,,,995145024.0,99790212.0,995912.0,99952438.0,,FFFE293D-6CC6-445B-876D-955D4DBC5986,2014-07-07 00:00:00.000


In [0]:
# verificando a existência de campos 'Nan' ou 'Nulls'
df_salesorderheader.select([count(when(isnan(c) | col(c).isNull() | (col(c) == 'NULL'), c)).alias(c) for c in df_salesorderheader.columns]).display()

SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,SalesPersonID,TerritoryID,BillToAddressID,ShipToAddressID,ShipMethodID,CreditCardID,CreditCardApprovalCode,CurrencyRateID,SubTotal,TaxAmt,Freight,TotalDue,Comment,rowguid,ModifiedDate
0,0,0,0,0,0,0,0,27659,0,0,27659,0,0,0,0,1131,1131,17489,0,0,0,0,31465,0,0


In [0]:
# realizando a conversão de types de cada coluna e aplicando regexp_replace nas colunas necessárias
df_salesorderheader_v2 = (df_salesorderheader\
                                  .withColumn("SalesOrderID", col("SalesOrderID").cast("int"))
                                  .withColumn("RevisionNumber", col("RevisionNumber").cast("int"))
                                  .withColumn("Status", col("Status").cast("int"))
                                  .withColumn("OnlineOrderFlag", col("OnlineOrderFlag").cast("int"))
                                  .withColumn("CustomerID", col("CustomerID").cast("int"))
                                  .withColumn("SalesPersonID", col("SalesPersonID").cast("int"))
                                  .withColumn("TerritoryID", col("TerritoryID").cast("int"))
                                  .withColumn("BillToAddressID", col("BillToAddressID").cast("int"))
                                  .withColumn("ShipToAddressID", col("ShipToAddressID").cast("int"))
                                  .withColumn("ShipMethodID", col("ShipMethodID").cast("int"))
                                  .withColumn("CreditCardID", col("CreditCardID").cast("int"))
                                  .withColumn("CreditCardApprovalCode", col("CreditCardApprovalCode").cast("int"))
                                  .withColumn("CurrencyRateID", col("CurrencyRateID").cast("int"))
                                  .withColumn("ShipMethodID", col("ShipMethodID").cast("int"))
                                  .withColumn("SubTotal", regexp_replace(col("SubTotal"), ",",".").cast("double"))
                                  .withColumn("TaxAmt", regexp_replace(col("TaxAmt"), ",",".").cast("double"))
                                  .withColumn("Freight", regexp_replace(col("Freight"), ",",".").cast("double"))
                                  .withColumn("TotalDue", regexp_replace(col("TotalDue"), ",",".").cast("double"))
                                  .withColumn("OrderDate", to_date(col("OrderDate")))
                                  .withColumn("DueDate", to_date(col("DueDate")))
                                  .withColumn("ShipDate", to_date(col("ShipDate")))
                                  .withColumn("ModifiedDate", to_date(col("ModifiedDate"))))

In [0]:
# verificando Schema
df_salesorderheader_v2.printSchema()

root
 |-- SalesOrderID: integer (nullable = true)
 |-- RevisionNumber: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- DueDate: date (nullable = true)
 |-- ShipDate: date (nullable = true)
 |-- Status: integer (nullable = true)
 |-- OnlineOrderFlag: integer (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- PurchaseOrderNumber: string (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- SalesPersonID: integer (nullable = true)
 |-- TerritoryID: integer (nullable = true)
 |-- BillToAddressID: integer (nullable = true)
 |-- ShipToAddressID: integer (nullable = true)
 |-- ShipMethodID: integer (nullable = true)
 |-- CreditCardID: integer (nullable = true)
 |-- CreditCardApprovalCode: integer (nullable = true)
 |-- CurrencyRateID: integer (nullable = true)
 |-- SubTotal: double (nullable = true)
 |-- TaxAmt: double (nullable = true)
 |-- Freight: double (nullable = true)
 |-- TotalDue: doubl

### Tabela 06 - Special Offer Product
Tabela com as informações dos produtos em oferta.

In [0]:
# query de importação utilizando as configurações do Cmd5
df_specialofferproduct = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location6)

display(df_specialofferproduct)

SpecialOfferID,ProductID,rowguid,ModifiedDate
1,680,BB30B868-D86C-4557-8DB2-4B2D0A83A0FB,2011-04-01 00:00:00.000
1,706,B3C9A4B1-2AE6-4CBA-B552-1F206C9F4C1F,2011-04-01 00:00:00.000
1,707,27B711FE-0B77-4EA4-AD1A-7C239956BEF4,2011-04-01 00:00:00.000
1,708,46CBB78B-246E-4D69-9BD6-E521277C1078,2011-04-01 00:00:00.000
1,709,CF102AA0-055F-4D2B-8B98-04B161758EA8,2011-04-01 00:00:00.000
1,710,63718DA1-464B-4325-9514-CDEE46CB124F,2011-04-01 00:00:00.000
1,711,457EB971-D1C9-48CA-B947-AE7E1B114377,2011-04-01 00:00:00.000
1,712,5B948448-BAE5-4F2A-A1F3-8203E892FD24,2011-04-01 00:00:00.000
1,713,07768F40-6E46-430F-AC1A-FF6A3629729C,2011-04-01 00:00:00.000
1,714,85004BCE-C74A-4D4E-8D17-3157991A1400,2011-04-01 00:00:00.000


In [0]:
# resumo do df
df_specialofferproduct.summary().display()

summary,SpecialOfferID,ProductID,rowguid,ModifiedDate
count,538.0,538.0,538,538
mean,2.7118959107806693,849.4702602230483,,
stddev,3.478142408113732,86.58989606682316,,
min,1.0,680.0,0020931C-087C-42F8-B441-EBE3D3B5F51E,2011-04-01 00:00:00.000
25%,1.0,775.0,,
50%,1.0,855.0,,
75%,2.0,928.0,,
max,9.0,999.0,FFE24AE4-9E46-4336-843F-D3C0EFBD09F8,2014-03-01 00:00:00.000


In [0]:
# verificando a existência de campos 'Nan' ou 'Nulls'
df_specialofferproduct.select([count(when(isnan(c) | col(c).isNull() | (col(c) == 'NULL'), c)).alias(c) for c in df_specialofferproduct.columns]).display()

SpecialOfferID,ProductID,rowguid,ModifiedDate
0,0,0,0


In [0]:
# conversão de types
df_specialofferproduct_v2 = (df_specialofferproduct\
                                  .withColumn("SpecialOfferID", col("SpecialOfferID").cast("int"))
                                  .withColumn("ProductID", col("ProductID").cast("int"))
                                  .withColumn("ModifiedDate", to_date(col("ModifiedDate"))))

In [0]:
df_specialofferproduct_v2.printSchema()

root
 |-- SpecialOfferID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: date (nullable = true)



## Questão 1
Escreva uma query que retorna a quantidade de linhas na tabela Sales.SalesOrderDetail pelo campo SalesOrderID, desde que tenham pelo menos três linhas de detalhes.

In [0]:
# query com paramêtros para retorno conforme questionamento
df_salesorderdetail_v2.groupBy("SalesOrderID").count().filter(col("count") >= 3).orderBy("SalesOrderID").display()

SalesOrderID,count
43659,12
43661,15
43662,22
43664,8
43665,10
43666,6
43667,4
43668,29
43670,4
43671,11


In [0]:
# demonstração de linhas com maiores e igual a 3 linhas de detalhes
(df_salesorderdetail_v2.groupBy("SalesOrderID").count().filter(col("count") >= 3)).count()

Out[92]: 12757

In [0]:
# demonstração do total de linhas
df_salesorderdetail_v2.select("SalesOrderID").distinct().count()

Out[93]: 31465

Há **12.757** linhas na tabela SalesOrderDetail referente ao campo SalesOrderID, desde que tenham pelo menos três linhas, de um total de **31.465** campos da coluna SalesOrderID distintos.

## Questão 2
Escreva uma query que ligue as tabelas Sales.SalesOrderDetail, Sales.SpecialOfferProduct e Production.Product e retorne os 3 produtos (Name) mais vendidos (pela soma de OrderQty), agrupados pelo número de dias para manufatura (DaysToManufacture).

In [0]:
# Junção das tabelas
df_join = (df_salesorderdetail_v2 \
                 .join(df_product_v2, on=["ProductID"], how="inner")
                 .join(df_specialofferproduct_v2, on=["ProductID", "ModifiedDate"], how="outer"))

In [0]:
#query com os 3 mais vendidos e dias para manufatura
display(df_join.groupBy("Name") \
        .agg((sum(col("OrderQty")).alias("SumQty")), \
             first(col("DaysToManufacture")).alias("DaysToManufacture")) \
        .orderBy(desc("SumQty")) \
        .limit(3))

Name,SumQty,DaysToManufacture
AWC Logo Cap,8311,0
Water Bottle - 30 oz.,6815,0
"Sport-100 Helmet, Blue",6743,0


## Questão 3
Escreva uma query ligando as tabelas Person.Person, Sales.Customer e Sales.SalesOrderHeader de forma a obter uma lista de nomes de clientes e uma contagem de pedidos efetuados.

In [0]:
# Junção de tabelas
df_join_2 = (df_customer_v2 \
                 .join(df_person_v2, df_customer_v2.CustomerID == df_person_v2.BusinessEntityID, "inner")
                 .join(df_salesorderheader_v2, on=["CustomerID"], how="outer"))

In [0]:
# query com a lista de nome dos clientes com sua contagem de compras efetuadas
(df_join_2.groupBy("CustomerID") \
        .agg((count(col("SalesOrderID")).alias("CountSalesOrder")), \
             first(col("FirstName")).alias("FirstName"), \
             first(col("LastName")).alias("LastName")) \
        .orderBy(desc("CountSalesOrder"))).display()

CustomerID,CountSalesOrder,FirstName,LastName
11176,28,Morgan,Miller
11091,28,Jennifer,Taylor
11200,27,Morgan,Lewis
11277,27,Ruben,Vazquez
11287,27,Ruben,Carlson
11300,27,Marshall,Shen
11223,27,Isabella,Moore
11330,27,Grace,Lewis
11331,27,Grace,Lee
11711,27,Brianna,Jones


## Questão 4
Escreva uma query usando as tabelas Sales.SalesOrderHeader, Sales.SalesOrderDetail e Production.Product, de forma a obter a soma total de produtos (OrderQty) por ProductID e OrderDate.

In [0]:
# junção de tabelas
df_join_3 = (df_salesorderdetail_v2 \
                 .join(df_product_v2, on=["ProductID"], how="inner")
                 .join(df_salesorderheader_v2, on=["SalesOrderID"], how="outer"))

In [0]:
# query com a soma de produtos especificadas por dia
(df_join_3.groupBy("ProductID", "OrderDate") \
        .agg(sum(col("OrderQty")).alias("SumQty")) \
        .orderBy(desc("SumQty"))).display()

ProductID,OrderDate,SumQty
864,2013-06-30,498
864,2013-07-31,465
884,2013-06-30,444
867,2013-06-30,427
864,2014-03-31,424
884,2013-07-31,420
712,2013-06-30,415
863,2012-06-30,409
715,2013-06-30,406
876,2013-07-31,397


## Questão 5
Escreva uma query mostrando os campos SalesOrderID, OrderDate e TotalDue da tabela Sales.SalesOrderHeader. Obtenha apenas as linhas onde a ordem tenha sido feita durante o mês de setembro/2011 e o total devido esteja acima de 1.000. Ordene pelo total devido decrescente.

In [0]:
# Query com informações de total devido separado por mês onde este seja maior que 1000 e dentro do mês de semtembro de 2011
(df_salesorderheader_v2.select("SalesOrderID", "OrderDate", "TotalDue") \
                    .filter((col("OrderDate") >= "2011-09-01") & (col("OrderDate") <= "2011-09-30")) \
                    .where(col("TotalDue") > 1000) \
                    .orderBy(desc("TotalDue"))).display()                    

SalesOrderID,OrderDate,TotalDue
44329,2011-09-02,3953.9884
44330,2011-09-02,3953.9884
44334,2011-09-04,3953.9884
44338,2011-09-04,3953.9884
44339,2011-09-04,3953.9884
44340,2011-09-04,3953.9884
44343,2011-09-05,3953.9884
44349,2011-09-07,3953.9884
44350,2011-09-07,3953.9884
44351,2011-09-07,3953.9884


## Databricks to Azure SQL

In [0]:
# Processo para conectar o databricks e o SQL do Azure, através da conexão jdbc ( encontrada nas propriedades do SQL do Azure)
 
conn_jdbc = "jdbc:sqlserver://roxtest.database.windows.net:1433;database=roxtest;user=roxtest@roxtest;password=key-sql;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

In [0]:
# persistindo df_salesorderdetail_v2 dentro do Azure SQL
(
  df_salesorderdetail_v2
    .write
    .format("jdbc")
    .mode("overwrite")
    .option("truncate", "false")
    .option("url", conn_jdbc)
    .option("dbtable", "salesorderheader")
    .save()
)

In [0]:
# persistindo df_person_v2 dentro do Azure SQL
(
  df_person_v2
    .write
    .format("jdbc")
    .mode("overwrite")
    .option("truncate", "false")
    .option("url", conn_jdbc)
    .option("dbtable", "person")
    .save()
)

In [0]:
# persistindo df_product_v2 dentro do Azure SQL
(
  df_product_v2
    .write
    .format("jdbc")
    .mode("overwrite")
    .option("truncate", "false")
    .option("url", conn_jdbc)
    .option("dbtable", "product")
    .save()
)

In [0]:
# persistindo df_customer_v2 dentro do Azure SQL
(
  df_customer_v2
    .write
    .format("jdbc")
    .mode("overwrite")
    .option("truncate", "false")
    .option("url", conn_jdbc)
    .option("dbtable", "customer")
    .save()
)

In [0]:
# persistindo df_salesorderheader_v2 dentro do Azure SQL
(
  df_salesorderheader_v2
    .write
    .format("jdbc")
    .mode("overwrite")
    .option("truncate", "false")
    .option("url", conn_jdbc)
    .option("dbtable", "salesorderheader")
    .save()
)

In [0]:
# persistindo df_specialofferproduct_v2 dentro do Azure SQL
(
  df_specialofferproduct_v2
    .write
    .format("jdbc")
    .mode("overwrite")
    .option("truncate", "false")
    .option("url", conn_jdbc)
    .option("dbtable", "specialofferproduct")
    .save()
)