In [None]:
#Move the file from Workspace to DBFS
dbutils.fs.cp("file:/Workspace/Shared/employeedata.csv", "dbfs:/FileStore/employeedata.csv")

True

In [None]:
#Load CSV data into a DataFrame
df_employee = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/employeedata.csv")

#Write DataFrame to Delta format 
df_employee.write.format("delta").save("/delta/employeedata")
print("Data written to Delta format")

Data written to Delta format


In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
#Define schema for JSON file
schema = StructType([
    StructField("ProductID", IntegerType(), True), 
    StructField("ProductName", StringType(), True), 
    StructField("Category", StringType(), True), 
    StructField("Price", DoubleType(), True), 
    StructField("Stock", IntegerType(), True)
])

In [None]:
#Move the file from Workspace to DBFS
dbutils.fs.cp("file:/Workspace/Shared/productdata.json", "dbfs:/FileStore/productdata.json")

True

In [None]:
#Load JSON data with schema
df_product = spark.read.format("json").schema (schema).load("dbfs:/FileStore/productdata.json")
df_product.show()

+---------+-----------+-----------+------+-----+
|ProductID|ProductName|   Category| Price|Stock|
+---------+-----------+-----------+------+-----+
|      101|     Laptop|Electronics|1200.0|   35|
|      102| Smartphone|Electronics| 800.0|   80|
|      103| Desk Chair|  Furniture| 150.0|   60|
|      104|    Monitor|Electronics| 300.0|   45|
|      105|       Desk|  Furniture| 350.0|   25|
+---------+-----------+-----------+------+-----+



In [None]:
#Create a temp view for SQL operations 
df_product.createOrReplaceTempView("product_view")

#Create a Delta table from the view
spark.sql("""
    CREATE TABLE delta_product_table
    USING DELTA
    AS SELECT * FROM product_view
""")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1074758204289971>, line 5[0m
[1;32m      2[0m df_product[38;5;241m.[39mcreateOrReplaceTempView([38;5;124m"[39m[38;5;124mproduct_view[39m[38;5;124m"[39m)
[1;32m      4[0m [38;5;66;03m#Create a Delta table from the view[39;00m
[0;32m----> 5[0m spark[38;5;241m.[39msql([38;5;124m"""[39m
[1;32m      6[0m [38;5;124m    CREATE TABLE delta_product_table[39m
[1;32m      7[0m [38;5;124m    USING DELTA[39m
[1;32m      8[0m [38;5;124m    AS SELECT * FROM product_view[39m
[1;32m      9[0m [38;5;124m"""[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;

In [None]:
# Move the file from Workspace to DBFS
dbutils.fs.cp("file:/Workspace/Shared/employee_updates.csv", "dbfs:/FileStore/employee_updates.csv")

True

In [None]:
#Convert employee CSV data to Delta format
df_employee = spark.read.format("csv").option("header", "true").load("/FileStore/employee_data.csv")
df_employee.write.format("delta").mode("overwrite").save("/delta/employeedata")
print("Data written to Delta")

#Convert employee updates CSV data to Delta format
df_employee_updates = spark.read.format("csv").option("header", "true").load("/FileStore/employee_updates.csv")
df_employee_updates.write.format("delta").mode ("overwrite").save("/delta/employee_updates")
print("Data written to Delta")

Data written to Delta
Data written to Delta


In [None]:
#Load Delta tables
df_employee = spark.read.format("delta").load("/delta/employeedata")
df_employee_updates = spark.read.format("delta").load("/delta/employee_updates")

#Create temporary views for SQL operations 
df_employee.createOrReplaceTempView("delta_employee")
df_employee_updates.createOrReplaceTempView("employee_updates")

In [None]:
spark.sql("""
    MERGE INTO delta_employee AS target 
    USING employee_updates AS source 
    ON target.EmployeeID = source.EmployeeID 
    WHEN MATCHED THEN UPDATE SET target.Salary = source.Salary, target.Department = source.Department
    WHEN NOT MATCHED THEN INSERT (EmployeeID, Name, Department, JoiningDate, Salary) 
    VALUES (Source.EmployeeID, source.Name, source.Department, source.JoiningDate, source.Salary)
    """)

#Query the Delta table to check if the data was updated or inserted correctly 
spark.sql("SELECT * FROM delta_employee").show()

+----------+-------------+----------+-----------+------+
|EmployeeID|         Name|Department|JoiningDate|Salary|
+----------+-------------+----------+-----------+------+
|      1002|   Jane Smith|        IT| 2020-03-10| 62000|
|      1003|Emily Johnson|   Finance| 2019-07-01| 70000|
|      1004|Michael Brown|        HR| 2018-12-22| 54000|
|      1005| David Wilson|        IT| 2021-06-25| 58000|
|      1006|  Linda Davis|   Finance| 2020-11-15| 67000|
|      1007| James Miller|        IT| 2019-08-14| 65000|
|      1008|Barbara Moore|        HR| 2021-03-29| 53000|
|      1001|     John Doe|        HR| 2021-01-15| 58000|
|      1009|  Sarah Adams| Marketing| 2021-09-01| 60000|
|      1010|  Robert King|        IT| 2022-01-10| 62000|
+----------+-------------+----------+-----------+------+



In [None]:
#write the employee dataframe to delta table
df_employee.write.format("delta").mode("overwrite").save("/delta/employeedata")

In [None]:
#register the delta table
spark.sql("create table if not exists delta_employee using delta location '/delta/employeedata'")

DataFrame[]

In [None]:
#optimize the delta table
spark.sql("OPTIMIZE delta_employee")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2439523593528616>, line 2[0m
[1;32m      1[0m [38;5;66;03m#optimize the delta table[39;00m
[0;32m----> 2[0m spark[38;5;241m.[39msql([38;5;124m"[39m[38;5;124mOPTIMIZE delta_employee[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;241m=[39m func([38;5;241m*[39margs, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m     48[0m     logger[38;5;241m.[39mlog_success(
[1;32m     49[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m-[39m start, signature
[1;

In [None]:
spark.catalog.tableExists("delta_employee")

True

In [None]:
# Register the delta table
spark.sql("CREATE TABLE IF NOT EXISTS delta_employee USING DELTA LOCATION '/delta/employeedata'")

# Check if the table exists
table_exists = spark.catalog.tableExists("delta_employee")
print(f"Table exists: {table_exists}")

if table_exists:
    # Convert the view to a table
    spark.sql("CREATE OR REPLACE TEMPORARY VIEW delta_employee_view AS SELECT * FROM delta_employee")
    print("View converted to table.")

    # Optimize the delta table
    spark.sql("OPTIMIZE delta.`/delta/employeedata`")
    print("Optimization completed.")
else:
    print("Table delta_employee does not exist.")

Table exists: True
View converted to table.
Optimization completed.


In [None]:
#describe the history of the delta
#spark.sql("DESCRIBE HISTORY delta_employee").show(truncate=False)
spark.sql("DESCRIBE HISTORY delta.`/delta/employeedata`").show(truncate=False)

+-------+-------------------+----------------+----------------------------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
spark.sql("""
    OPTIMIZE delta.`/delta/employeedata` ZORDER BY Department
""")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [None]:
spark.sql("""
    VACUUM delta.`/delta/employeedata` RETAIN 168 HOURS 
""")

DataFrame[path: string]

In [None]:
%sql
CREATE TABLE managed_table(
  id INT,
  name STRING
);

In [None]:
%sql
CREATE EXTERNAL TABLE unmanaged_table(
  id INT,
  name STRING 

)
LOCATION '/user/data/external_data/';