####Getting secret values stored in Azure Key Vault.

In [0]:
#Getting data from scope, the rootpath,volumePath are stored in Azure key Vault and secrets is a scope name on the workplace.
rootPath = dbutils.secrets.get('secrets', 'rootPath')
volumePath = dbutils.secrets.get('secrets', 'volumePath')

In [0]:
dbutils.fs.ls(volumePath)

####Creating the bronze delta table.

######The raw data file is in CSV format, we cannot directly create a delta table from the file formats that do not have well-defined schema. So, we can create a temp view and use that temp view to create coil_raw using CTAS. Now the table generated will be the delta table.

In [0]:
#Creating a temp view for the raw data

rawFilePath = f"{volumePath}/data.csv"

temp_view_query = f"""
CREATE OR REPLACE TEMP VIEW raw_data_temp
  (
    coil_id INT,
    weight FLOAT,
    thickness FLOAT,
    material_type STRING,
    production_date DATE,
    defects INT,
    line_speed FLOAT
  )
USING CSV
OPTIONS (
  path = '{rawFilePath}',
  header = "true",
  delimiter = ","
);
"""

spark.sql(temp_view_query)

In [0]:
%sql
SELECT 
    * 
FROM 
    raw_data_temp;


In [0]:
#Creating the bronze delta table named coil_raw from temp view raw_data_temp using CTAS

bronzePath = f"{rootPath}/table/coil_raw"

sql_ext_bronze_table_creation_query = f"""
CREATE TABLE IF NOT EXISTS coil_manufacturing_catalog.manufacturing.coil_raw
    COMMENT "The raw delta table. This is the bronze table of medallion architecture and the lowest layer among the data layers."
    LOCATION '{bronzePath}'
AS
SELECT 
    *
FROM raw_data_temp
"""

spark.sql(sql_ext_bronze_table_creation_query)


In [0]:
%sql
SELECT 
  * 
FROM 
  coil_manufacturing_catalog.manufacturing.coil_raw

######The CTAS will automatically infer schema from the source. The schema has been defined on the temp view, so the coil_raw should adopt that schema. 
######Ensuring the schema of coil_raw table.
* coil_id --> int
* weight --> float
* thickness --> float
* material_type --> string
* production_date --> date
* defects --> int
* line_speed --> float


In [0]:
%sql
DESCRIBE EXTENDED coil_manufacturing_catalog.manufacturing.coil_raw


####Creating the silver table.

######Creating the silver table by cleansing and transforming the coil_raw table.
######Removing all records that do not satisfy the below condition:
* defects should be less than or equal to 5.
* weight should be greater or equal to 1000 kg
* thickness should be greater or equal to 1 mm.

In [0]:
silverPath = f"{rootPath}/table/coil_curated"

sql_ext_silver_table_creation_query = f"""
CREATE TABLE IF NOT EXISTS coil_manufacturing_catalog.manufacturing.coil_curated
    
    COMMENT "The curated delta table. This is the silver table of medallion architecture and the middle layer among the data layers. The invalid data are filtered out including, for instance, thicknesses less than 1mm, records containing more than 5 defects."
    
    LOCATION '{silverPath}'
AS
SELECT 
    * 
FROM 
    coil_manufacturing_catalog.manufacturing.coil_raw
WHERE 
    weight >= 1000 OR thickness >= 1 OR defects <= 5
"""

spark.sql(sql_ext_silver_table_creation_query) 

In [0]:
%sql
SELECT 
    * 
FROM 
    coil_manufacturing_catalog.manufacturing.coil_curated

###Creating the gold table coil_summary from the aggregated summary data.


In [0]:
goldPath = f"{rootPath}/table/coil_summary"

sql_ext_gold_table_creation_query = f"""
CREATE TABLE IF NOT EXISTS coil_manufacturing_catalog.manufacturing.coil_summary

COMMENT "The aggregated delta table. This is the gold table of medallion architecture and the top layer among the data layers. The records are aggregated
based on material_type and production_date columns."

LOCATION '{goldPath}'

SELECT 
    material_type,
    production_date,
    CAST(COUNT(coil_id) AS INT) AS total_coils,
    CAST(AVG(weight) AS FLOAT) AS avg_weight,
    CAST(AVG(thickness) AS FLOAT) AS avg_thickness,
    CAST(MAX(line_speed) AS FLOAT) AS max_line_speed,
    CAST(SUM(defects) AS INT) AS total_defects,
    CAST((total_defects/total_coils)*100 AS FLOAT) AS defect_rate
FROM 
    coil_manufacturing_catalog.manufacturing.coil_curated
GROUP BY 
    material_type, production_date
"""

spark.sql(sql_ext_gold_table_creation_query) 

######Adding the column descriptions.


In [0]:
%sql
COMMENT ON COLUMN coil_manufacturing_catalog.manufacturing.coil_summary.material_type IS 'Type of material (e.g, Steel and Aluminum).';
COMMENT ON COLUMN coil_manufacturing_catalog.manufacturing.coil_summary.production_date IS 'The date the coil was produced.';
COMMENT ON COLUMN coil_manufacturing_catalog.manufacturing.coil_summary.total_coils IS 'Total number of coils.';
COMMENT ON COLUMN coil_manufacturing_catalog.manufacturing.coil_summary.avg_weight IS 'Average weight of coils (kG).';
COMMENT ON COLUMN coil_manufacturing_catalog.manufacturing.coil_summary.avg_thickness IS 'Average thickness of coils (MM).';
COMMENT ON COLUMN coil_manufacturing_catalog.manufacturing.coil_summary.max_line_speed IS 'Maximum production line speed.';
COMMENT ON COLUMN coil_manufacturing_catalog.manufacturing.coil_summary.total_defects IS 'Total number of defects in all coils.';
COMMENT ON COLUMN coil_manufacturing_catalog.manufacturing.coil_summary.defect_rate IS 'Ratio of total defects to total coils, as a percentage.';


######Ensuring the schema of coil_summary table.
* material_type --> string
* production_date --> date
* total_coils --> int
* avg_weight --> float
* avg_thickness --> float
* max_line_speed --> float
* total_defects --> int
* defect_rate --> float

In [0]:
%sql
DESCRIBE EXTENDED coil_manufacturing_catalog.manufacturing.coil_summary;

######The Final output of coil_summary table.

In [0]:
%sql
SELECT 
    * 
FROM  
    coil_manufacturing_catalog.manufacturing.coil_summary
ORDER BY 
    material_type DESC;