# Unity Catalog Habilitado

In [0]:
%sql

SELECT * FROM workspace.information_schema.catalogs;

## 1. Creación de un Schema 

In [0]:
%sql 

CREATE DATABASE IF NOT EXISTS BRONZE; 


## 2. Creación de una Tabla Delta

In [0]:
%sql

CREATE OR REPLACE TABLE bronze.business (
    business_id VARCHAR(16777216),
    address VARCHAR(16777216),
    attributes VARCHAR(16777216),
    categories VARCHAR(16777216),
    city VARCHAR(16777216),
    hours VARCHAR(16777216),
    is_open INTEGER,
    latitude DECIMAL(8,4),
    longitude DECIMAL(8,4),
    postal_code VARCHAR(16777216),
    review_count INTEGER,
    stars INTEGER,
    state STRING
) 
USING DELTA

In [0]:
%sql

SELECT * FROM bronze.business;

## 3. Cargando una fuente a una Tabla Delta

In [0]:
from pyspark.sql.functions import lit, input_file_name
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
import datetime
import time

jsonFileBusiness = '/Volumes/workspace/default/yelp_data/yelp_academic_dataset_business.json' 

df_business_bronze = (spark.read      
   .format("json") 
   .option("header", "false")      
   .option("inferSchema", "true") 
   .load(jsonFileBusiness)                 
)

In [0]:
display(df_business_bronze)

In [0]:
from pyspark.sql.functions import to_json, col

df_business_formatted = df_business_bronze\
                        .withColumn("attributes_formatted", to_json(col("attributes")))\
                        .withColumn("hours_formatted", to_json(col("hours")))

In [0]:
from pyspark.sql.functions import to_json, col

df_business_formatted_f = df_business_formatted.select("business_id",\
                             "address",\
                             col("attributes_formatted").alias("attributes"),\
                             "categories",\
                             "city",\
                             col("hours_formatted").alias("hours"),\
                             "is_open",\
                             "latitude",\
                             "longitude",\
                             "postal_code",\
                             "review_count",\
                             "stars",\
                             "state")

df_business_formatted_f.createOrReplaceTempView("stage_business")

In [0]:
%sql 
SELECT * FROM stage_business; 

In [0]:
%sql
MERGE INTO bronze.business as target
USING stage_business as source
ON target.business_id=source.business_id
WHEN NOT MATCHED
  THEN INSERT *

## 4. Validando los datos cargados

In [0]:
%sql
SELECT * FROM bronze.business;

Databricks visualization. Run in Databricks to view.