In [0]:
# Databricks notebook source
def path_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

# COMMAND ----------

def download_dataset(source, target):
    files = dbutils.fs.ls(source)

    for f in files:
        source_path = f"{source}/{f.name}"
        target_path = f"{target}/{f.name}"
        if not path_exists(target_path):
            print(f"Copying {f.name} ...")
            dbutils.fs.cp(source_path, target_path, True)

# COMMAND ----------

data_source_uri = "s3://dalhussein-courses/datasets/bookstore/v1/"
dataset_bookstore = 'dbfs:/mnt/demo-datasets/bookstore'
data_catalog = 'hive_metastore'
spark.conf.set(f"dataset.bookstore", dataset_bookstore)

# COMMAND ----------

def get_index(dir):
    files = dbutils.fs.ls(dir)
    index = 0
    if files:
        file = max(files).name
        index = int(file.rsplit('.', maxsplit=1)[0])
    return index+1

# COMMAND ----------

def set_current_catalog(catalog_name):
    spark.sql(f"USE CATALOG {catalog_name}")

# COMMAND ----------

# Structured Streaming
streaming_dir = f"{dataset_bookstore}/orders-streaming"
raw_dir = f"{dataset_bookstore}/orders-raw"

def load_file(current_index):
    latest_file = f"{str(current_index).zfill(2)}.parquet"
    print(f"Loading {latest_file} file to the bookstore dataset")
    dbutils.fs.cp(f"{streaming_dir}/{latest_file}", f"{raw_dir}/{latest_file}")

    
def load_new_data(all=False):
    index = get_index(raw_dir)
    if index >= 10:
        print("No more data to load\n")

    elif all == True:
        while index <= 10:
            load_file(index)
            index += 1
    else:
        load_file(index)
        index += 1

# COMMAND ----------

# DLT
streaming_orders_dir = f"{dataset_bookstore}/orders-json-streaming"
streaming_books_dir = f"{dataset_bookstore}/books-streaming"

raw_orders_dir = f"{dataset_bookstore}/orders-json-raw"
raw_books_dir = f"{dataset_bookstore}/books-cdc"

def load_json_file(current_index):
    latest_file = f"{str(current_index).zfill(2)}.json"
    print(f"Loading {latest_file} orders file to the bookstore dataset")
    dbutils.fs.cp(f"{streaming_orders_dir}/{latest_file}", f"{raw_orders_dir}/{latest_file}")
    print(f"Loading {latest_file} books file to the bookstore dataset")
    dbutils.fs.cp(f"{streaming_books_dir}/{latest_file}", f"{raw_books_dir}/{latest_file}")

    
def load_new_json_data(all=False):
    index = get_index(raw_orders_dir)
    if index >= 10:
        print("No more data to load\n")

    elif all == True:
        while index <= 10:
            load_json_file(index)
            index += 1
    else:
        load_json_file(index)
        index += 1

# COMMAND ----------

download_dataset(data_source_uri, dataset_bookstore)
set_current_catalog(data_catalog)

#METHODS OF WRITING IN A TABLE

###1. CREATE OR REPLACE (CRAS)

In [0]:
%sql
CREATE OR REPLACE TABLE orders AS 
SELECT * FROM parquet.`dbfs:/mnt/demo-datasets/bookstore/orders/export_001.parquet`

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE HISTORY orders ;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-01-02T11:01:50.000+0000,1135498371366187,intuztrainee@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1587427143463976),0102-084153-xjuzlqfb,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1000, numOutputBytes -> 22279)",,Databricks-Runtime/12.2.x-scala2.12


###2. INSERT OVERWRITE

In [0]:
%sql
INSERT OVERWRITE orders
SELECT * FROM parquet.`dbfs:/mnt/demo-datasets/bookstore/orders/export_001.parquet`;


num_affected_rows,num_inserted_rows
1000,1000


In [0]:
%sql
DESCRIBE HISTORY orders;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-01-02T11:13:15.000+0000,1135498371366187,intuztrainee@gmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1587427143463976),0102-084153-xjuzlqfb,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1000, numOutputBytes -> 22279)",,Databricks-Runtime/12.2.x-scala2.12
0,2025-01-02T11:01:50.000+0000,1135498371366187,intuztrainee@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1587427143463976),0102-084153-xjuzlqfb,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1000, numOutputBytes -> 22279)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
%sql
INSERT OVERWRITE orders
SELECT *,current_timestamp() FROM parquet.`dbfs:/mnt/demo-datasets/bookstore/orders/export_001.parquet`;
-- This code gives schema mismatch error which conveys that INSERT OVREWRITE DOES'NT OVERWRITE THE ORIGINAL SCHEMA OF TABLE 

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1587427143463991>:7[0m
[1;32m      5[0m     display(df)
[1;32m      6[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 7[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m      8[0m [38;5;28;01mfinally[39;00m:
[1;32m      9[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-1587427143463991>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;

###Appending records into table


### 1. INSERT INTO

In [0]:
%sql
SELECT COUNT(*) FROM orders;

count(1)
1000


In [0]:
%sql
INSERT INTO orders 
SELECT * FROM parquet.`dbfs:/mnt/demo-datasets/bookstore/orders-new/export_004.parquet`; 

num_affected_rows,num_inserted_rows
700,700


In [0]:
%sql
SELECT COUNT(*) FROM parquet.`dbfs:/mnt/demo-datasets/bookstore/orders-new/export_004.parquet`;

count(1)
700


In [0]:
%sql
SELECT COUNT(*) FROM orders; -- counting the records of orders to check whether the records are successfully appended or not
-- 700 new records + 1000 previous records = 1700 total records

count(1)
1700


In [0]:
%sql
-- insert into does not prevent the inertion of duplicate data 
INSERT INTO orders 
SELECT * FROM parquet.`dbfs:/mnt/demo-datasets/bookstore/orders-new/export_004.parquet`; 
-- if we again insert the same records it will accept them


num_affected_rows,num_inserted_rows
700,700


In [0]:
%sql
-- and now the total count of records in orders = 1000 + 700 + 700 i.e. 2400
SELECT COUNT(*) FROM orders ;

count(1)
2400


To escape from insertion of repeated records we use MERGE INTO 

###MERGE INTO

In [0]:
files = dbutils.fs.ls('/mnt/demo-datasets/bookstore')
display(files)

path,name,size,modificationTime
dbfs:/mnt/demo-datasets/bookstore/books-cdc/,books-cdc/,0,0
dbfs:/mnt/demo-datasets/bookstore/books-csv/,books-csv/,0,0
dbfs:/mnt/demo-datasets/bookstore/books-csv-new/,books-csv-new/,0,0
dbfs:/mnt/demo-datasets/bookstore/books-streaming/,books-streaming/,0,0
dbfs:/mnt/demo-datasets/bookstore/customers-json/,customers-json/,0,0
dbfs:/mnt/demo-datasets/bookstore/customers-json-new/,customers-json-new/,0,0
dbfs:/mnt/demo-datasets/bookstore/orders/,orders/,0,0
dbfs:/mnt/demo-datasets/bookstore/orders-json-raw/,orders-json-raw/,0,0
dbfs:/mnt/demo-datasets/bookstore/orders-json-streaming/,orders-json-streaming/,0,0
dbfs:/mnt/demo-datasets/bookstore/orders-new/,orders-new/,0,0


In [0]:
%sql
-- creating view of customers 
CREATE OR REPLACE TEMP VIEW customer_view AS 
SELECT * FROM json.`${dataset.bookstore}/customers-json`;

-- creating table customers using customer_view
CREATE TABLE customers 
AS SELECT * FROM customer_view;


num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED customers;

col_name,data_type,comment
customer_id,string,
email,string,
profile,string,
updated,string,
,,
# Detailed Table Information,,
Catalog,hive_metastore,
Database,default,
Table,customers,
Created Time,Fri Jan 03 03:39:59 UTC 2025,


In [0]:
%sql
CREATE OR REPLACE TEMP VIEW updates_view AS
SELECT * FROM json.`dbfs:/mnt/demo-datasets/bookstore/customers-json-new/`;

In [0]:
%sql
MERGE INTO customers c
USING  updates_view u
ON c.customer_id = u.customer_id
WHEN MATCHED AND c.email IS NULL AND u.email IS NOT NULL THEN
  UPDATE SET email = u.email, updated = u.updated
WHEN NOT MATCHED THEN INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
301,100,0,201


In [0]:
%sql
DESCRIBE EXTENDED books_csv;

col_name,data_type,comment
book_id,string,
title,string,
author,string,
category,string,
price,double,
,,
# Detailed Table Information,,
Catalog,hive_metastore,
Database,default,
Table,books_csv,
