### Create a Table in DataBricks - Example

In [None]:
 CREATE TABLE employee_delta (
      empno INT,
      ename STRING,
      designation STRING,
      manager INT,
      hire_date DATE,
      sal BIGINT,
      deptno INT,
      location STRING
) USING DELTA;

### Create Table from Path - Example



In [None]:
 CREATE TABLE employee_delta (
      empno INT,
      ename STRING,
      designation STRING,
      manager INT,
      hire_date DATE,
      sal BIGINT,
      deptno INT,
      location STRING
) USING DELTA
Location '/mnt/bdpdatalake/blob-storage/';

### Create Table with Partition - Example

In [None]:
CREATE TABLE employee_delta (
      empno INT,
      ename STRING,
      manager INT,
      hire_date DATE,
      sal BIGINT,
      deptno INT,
      location STRING
) PARTITION BY (
      designation STRING
)
USING DELTA
Location '/mnt/bdpdatalake/blob-storage/';

### Create Delta Table from Dataframe



In [None]:
 df.write.format("delta").saveAsTable("testdb.testdeltatable")

In [None]:
#Verify
%sql
show create table testdb.testdeltatable;

### Create Delta Table From Dataframe Without Schema At External Location

In [None]:
 %sql
CREATE TABLE testdb.testDeltaTable2
USING DELTA
LOCATION '/mnt/blob-storage/testDeltaTable2'

%scala
df.write \
  .format("delta") \
  .mode("overwrite") \
  .save("mnt/blob-storage/")

In [None]:
#Verify
%sql
show create table testdb.testdeltatable2;

### Create Parquet Table from CSV File in Databricks

In [None]:
%python
# File location and type
file_location = "/FileStore/tables/emp_data1-3.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)


# Create a view or table

temp_table_name = "emp_data13_csv"

df.createOrReplaceTempView(temp_table_name)


%sql

/* Query the created temp table in a SQL cell */

select * from `emp_data13_csv`


permanent_table_name = "emp_data13_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

### Create Delta Table from CSV File in Databricks

In [None]:
%python
# File location and type
file_location = "/FileStore/tables/emp_data1-3.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)


# Create a view or table

temp_table_name = "emp_data13_csv"

df.createOrReplaceTempView(temp_table_name)


%sql

/* Query the created temp table in a SQL cell */

select * from `emp_data13_csv`


permanent_table_name = "emp_data13_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

### Create Delta Table with Partition from CSV File in Databricks


In [None]:
#load csv
%scala
val file_location = "/FileStore/tables/emp_data1-3.csv"

val df = spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .option("sep", ",")
  .load(file_location)

display(df)

In [None]:
 df.write.format("delta").partitionBy("location").saveAsTable(testdb.emp_partition_tbl)

In [None]:
show partitions testdb.emp_partition_tbl

### Create Delta Table from JSON File in Databricks

In [None]:
%scala
val jsonDf = spark.read.option("multiline", "true").json("/FileStore/tables/emp_data1.json")
display(jsonDf)

In [None]:
 jsonDf.write.mode("overwrite").format("delta").saveAsTable("testdb.jsonDataTable")

In [None]:
show create table testdb.jsondatatable;

### Create Delta table from Excel File in Databricks

In [None]:
val xslFilePath = "/FileStore/tables/emp_data1.xls"
val xslDf = spark.read.format("com.crealytics.spark.excel")
                      .option("header", "true")
                      .option("dataAddress", "sheet1")
                      .load(xslFilePath)
display(xslDf)

In [None]:
 xslDf.write.format("delta").saveAsTable("excel_tableName")

### Create Delta table from TSV File in Databricks


In [None]:
%scala
val tsvFilePath = "/FileStore/tables/emp_data1.tsv"

val tsvDf = spark.read.format("csv")
                      .option("header", "true")
                      .option("sep", "\t")
                      .load(tsvFilePath)

display(tsvDf)

### Read Nested JSON in Spark DataFrame


In [None]:
import org.apache.spark.sql.functions._

// Step 1: Load Nested JSON data into Spark Dataframe
val ordersDf = spark.read.format("json")
                         .option("inferSchema", "true")
                         .option("multiLine", "true")
                         .load("/FileStore/tables/orders_sample_datasets.json")

// Step 2: Explode -
var parseOrdersDf = ordersDf.withColumn("orders", explode($"datasets"))

// Step 3: Fetch Each Order using getItem on explode column
parseOrdersDf = parseOrdersDf.withColumn("customerId", $"orders".getItem("customerId"))
                             .withColumn("orderId", $"orders".getItem("orderId"))
                             .withColumn("orderDate", $"orders".getItem("orderDate"))
                             .withColumn("orderDetails", $"orders".getItem("orderDetails"))
                             .withColumn("shipmentDetails", $"orders".getItem("shipmentDetails"))

// Step 4: Explode orderDetails column to flatten all the rows
parseOrdersDf = parseOrdersDf.withColumn("orderDetails", explode($"orderDetails"))

// Step 5: Fetch attributes from object and make it available in a column
parseOrdersDf = parseOrdersDf.withColumn("productId", $"orderDetails".getItem("productId"))
                             .withColumn("quantity", $"orderDetails".getItem("quantity"))
                             .withColumn("sequence", $"orderDetails".getItem("sequence"))
                             .withColumn("totalPrice", $"orderDetails".getItem("totalPrice"))
                             .withColumn("city", $"shipmentDetails".getItem("city"))
                             .withColumn("country", $"shipmentDetails".getItem("country"))
                             .withColumn("postalcode", $"shipmentDetails".getItem("postalCode"))
                             .withColumn("street", $"shipmentDetails".getItem("street"))
                             .withColumn("state", $"shipmentDetails".getItem("state"))

// Step 6: Fetch gross, net and tax from totalprice object
parseOrdersDf = parseOrdersDf.withColumn("gross", $"totalprice".getItem("gross"))
                             .withColumn("net", $"totalprice".getItem("net"))
                             .withColumn("tax", $"totalprice".getItem("tax"))

// Step 7: Select required columns from the dataframe
val jsonParseOrdersDf = parseOrdersDf.select($"orderId"
                                           ,$"customerId"
                                           ,$"orderDate"
                                           ,$"productId"
                                           ,$"quantity"
                                           ,$"sequence"
                                           ,$"gross"
                                           ,$"net"
                                           ,$"tax"
                                           ,$"street"
                                           ,$"city"
                                           ,$"state"
                                           ,$"postalcode"
                                           ,$"country")

display(jsonParseOrdersDf)

### Write DataFrame to Delta Table in Databricks with Append Mode

In [None]:
val file_location = "/FileStore/tables/emp_data1-3.csv"

val df = spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .option("sep", ",")
  .load(file_location)

display(df)

### Write DataFrame to Delta Table in Databricks with Overwrite Mode

In [None]:
val file_location = "/FileStore/tables/emp_data1-3.csv"

val df = spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .option("sep", ",")
  .load(file_location)

display(df)

In [None]:
 df.write.mode("overwrite").format("delta").saveAsTable(permanent_table_name)

### Query Delta Data in Databricks

In [None]:
%sql
select * from delta.`/mnt/blob-storage/testDeltaTable2/`

### Merge into Delta Table using Spark SQL

In [None]:
%scala
val dailyDf = Seq((1400, "Person4", "Location4", "Contact4")
                 ,(1500, "Person5", "Location5", "Contact5")
                 ,(1600, "Person6", "Location6", "Contact6")).toDF("id", "name", "location", "contact")
dailyDf.createOrReplaceTempView("dailyTable")

In [None]:
MERGE INTO testdb.testdeltatable as target
USINg dailyTable as source
ON target.id = source.id
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

### Merge into Delta Table using Spark Scala

In [None]:
%scala
import io.delta.tables._

val dailyDf = Seq((1400, "Person4", "Location4", "Contact4")
                 ,(1500, "Person5", "Location5", "Contact5")
                 ,(1600, "Person6", "Location6", "Contact6")).toDF("id", "name", "location", "contact")

val target_table = DeltaTable.forName("testdb.testDeltaTable")

target_table.as("target")
  .merge(
    dailyDf.as("source"),
    "source.id = target.id")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

### Temporary View in Databricks

In [None]:
 %scala
val file_location = "/FileStore/tables/emp_data1-3.csv"

val df = spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .option("sep", ",")
  .load(file_location)

display(df)

### Global View in Databricks

In [None]:
 %scala
val file_location = "/FileStore/tables/emp_data1-3.csv"

val df = spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .option("sep", ",")
  .load(file_location)

display(df)

In [None]:
 df.createOrReplaceGlobalTempView("df_globalview")


In [None]:
%sql
select * from df_globalview

In [None]:
%sql
select * from global_temp.df_globalview

### Vacuum Delta Table


In [None]:
VACUUM
<table_name> RETAIN 168 HOURS

### Auto Optimize Delta Table in Databricks

In [None]:
%sql

CREATE TABLE employee(empno bigint, ename  string, designation string, manager string,
                      hire_date date, sal double, deptno bigint, location string)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)

In [None]:
#Enable for Existing Created Table

%sql

ALTER TABLE employee SET TBLPROPERTIES
 (delta.autoOptimize.optimizeWrite = true,
  delta.autoOptimize.autoCompact = true)


In [None]:
#Enable Auto Optimize in Spark Session
%sql
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

In [None]:
%sql
OPTIMIZE employee;

### DELTA TABLE Utility Commands

In [None]:
# CREATE Managed Table
spark.sql("""
CREATE TABLE delta_table_name (
……
) USING delta
""")

# CREATE External Table
spark.sql("""
CREATE TABLE delta_table_name (
……
) USING delta
LOCATION '<path>'
""")

# SELECT FROM Delta File
spark.sql("""
SELECT * FROM delta.`<path>`
""")

# DESCRIBE HISTORY
spark.sql("""
DESCRIBE HISTORY delta.'<path>'
""")

# UPDATE
spark.sql("""
UPDATE <table_name> SET <column_name> = <value> WHERE <CONDITION>
""")

# MERGE
spark.sql("""
MERGE INTO <target_delta_table>
USING <source_table>
ON <merge_condition>
WHEN MATCHED THEN UPDATE *
""")

# DELETE
spark.sql("""
DELETE FROM delta.'<path>'
""")

# CONVERT Non-Partition Table
spark.sql("""
CONVERT TO DELTA parquet.'<parquet_table_path>'
""")

# CONVERT Partition Table
spark.sql("""
CONVERT TO DELTA parquet.'<parquet_table_path>' PARTITIONED BY (<column name> <column datatype>)
""")

# VACUUM
spark.sql("""
VACUUM delta.'<path>'
""")

# OPTIMIZE
spark.sql("""
OPTIMIZE <delta_table_name>
""")

# CLONE Deep Clone
spark.sql("""
CREATE OR REPLACE TABLE IF NOT EXISTS delta.'<target_path>' CLONE delta.'<source_path>'
""")

# CLONE Shallow Clone
spark.sql("""
CREATE OR REPLACE TABLE IF NOT EXISTS delta delta.'<target_path>' SHALLOW CLONE delta.'<source_path>'
""")

# RESTORE to Version
spark.sql("""
RESTORE TABLE delta.'<path>' TO VERSION AS OF <version>
""")

# RESTORE to Timestamp
spark.sql("""
RESTORE TABLE delta.'<path>' TO TIMESTAMP AS OF <timestamp>
""")

# DESCRIBE DETAIL
spark.sql("""
DESCRIBE DETAIL delta.'<path>'
""")

# SHOW CREATE TABLE
spark.sql("""
SHOW CREATE TABLE <delta_table_name>
""")

### Create a Bloom Filter Index

In [None]:
CREATE TABLE testdb.bloomtabletest (
empno bigint,
ename string,
designation string,
manager string,
hire_date date,
sal double,
depno bigint,
location string
) Using DELTA
Location '/mnt/bdp/bloomtabletest';

In [None]:
CREATE BLOOMFILTER INDEX
ON TABLE bloomtabletest
FOR COLUMNS(ename OPTIONS (fpp=0.1, numItems=1000000);