# In this Notebook
- **Create a new delta table with Uniform support for Iceberg**
    - Validate Delta Metadata
    - Validate Iceberg Metadata
    - Read from Snowflake
    - Read from Google 

- **Convert an existing delta table to Uniform**
- **Benchmark**
    - Table Writes 
    - Upserts
    - Table & Metadata size

In [0]:
# Use when using Spark Connect 

# from databricks.connect import DatabricksSession
# from databricks.sdk.core import Config

# runtime_131 = '0601-182128-dcbte59m'
# runtime_132 = '0630-162806-47pefcs5'

# # Use DEFAULT configuration and pass an existing cluster_id
# # Cluster Runtime > 13.2 
# config = Config(
#     profile = 'DEFAULT',
#   cluster_id = runtime_132
# )

# spark = DatabricksSession.builder.sdkConfig(config).getOrCreate()

# Create a new Delta Table with Universal (Iceberg support) 
[Official Documentation](https://docs.delta.io/3.0.0rc1/delta-uniform.html)

In [0]:
catalog =  'rohitb_play_area'
schema = 'sandbox'

spark.sql(f"use catalog {catalog}")
spark.sql(f"use schema {schema}")

In [0]:
# Universal Format is only supported on Unity Catalog tables.
sample_table_name = 'iceberg_test'
spark.sql(f"drop table if exists {sample_table_name}")

spark.sql(f"""
    CREATE OR REPLACE TABLE {sample_table_name}(name STRING, age INT) 
    USING DELTA 
    TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'iceberg')
""")
spark.sql(f"insert into {sample_table_name} values ('Amitabh', 73)")
spark.sql(f"insert into {sample_table_name} values ('Dilip', 85)")
spark.sql(f"insert into {sample_table_name} values ('Dev', 53)")

spark.sql(f"select * from {sample_table_name}").display()

In [0]:
spark.sql(f"describe extended {sample_table_name}").display()

In [0]:
table_location = spark.sql(f"describe extended {sample_table_name}").filter("col_name='Location'").collect()[0]['data_type']

delta_log = f"{table_location}/_delta_log"
iceberg_metadata = f"{table_location}/metadata"
# data_files = f"{table_location}/

In [0]:
display(dbutils.fs.ls(table_location))

In [0]:
display(dbutils.fs.ls(delta_log))

In [0]:
display(dbutils.fs.ls(iceberg_metadata))

# Convert an existing Delta Table to support Uniform

## Download sample data from Kaggle

In [0]:
%pip install kaggle

In [0]:
import os
os.environ['KAGGLE_USERNAME'] = "rohitbhagwat"
os.environ['KAGGLE_KEY'] = "b3776945f1cdae1227ee49ebded7d869"

In [0]:
tmp_directory = '/tmp/fifa'

import kaggle
kaggle.api.dataset_download_files("stefanoleone992/fifa-20-complete-player-dataset",  path = tmp_directory)

import os 
for file in os.listdir(tmp_directory):
  print(file)

In [0]:
import zipfile

# Create a ZipFile Object
with zipfile.ZipFile(f"{tmp_directory}/{file_name}", 'r') as zip_obj:
   # Extract all the contents of the zip file
   zip_obj.extractall(path = tmp_directory)
   
   # list file names
   print("Files in the zip file:")
   for file in zip_obj.namelist():
       print(file)

In [0]:
import pandas as pd

pd.read_csv(f"{tmp_directory}/players_15.csv")

In [0]:
spark.read.csv(f"file:/{tmp_directory}/players*.csv")

In [0]:
%sh
mkdir -p /tmp/toxicity_download
kaggle datasets download -d stefanoleone992/fifa-20-complete-player-dataset  --force -p /tmp/fifa

In [0]:
%sh
cd /tmp/fifa
ls

In [0]:
%sh
cd /tmp/fifa
unzip -o jigsaw-toxic-comment-classification-challenge.zip
unzip -o train.csv.zip
unzip -o test.csv.zip
ls .

In [0]:
spark.sql("""
ALTER TABLE rohitb_play_area.sandbox.transactions SET TBLPROPERTIES(
  'delta.columnMapping.mode' = 'name',
  'delta.universalFormat.enabledFormats' = 'iceberg')
  """)