# Delta table 
- Update 
- Delete

In [1]:
import os
import logging
import configparser

# Read from config

In [2]:
config = configparser.ConfigParser()
root_path = os.path.abspath(os.path.join(os.getcwd(), ".."))
config.read(os.path.join(root_path, "configs","config.ini"))

['/opt/spark/work-dir/configs/config.ini']

# Logging function 
- 

In [3]:
# Create a logger
logger = logging.getLogger("EMP_DELTA_CRUD")

# Set the level of the logger
logger.setLevel(logging.INFO)

# Create a FileHandler object and specify the file path
log_file_path = config['LOGS']['log_path']
file_path = os.path.join(log_file_path,"employee_delta_crud.log")
os.makedirs(os.path.dirname(file_path), exist_ok=True)
file_handler = logging.FileHandler(file_path)

# Set the format for the log messages
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)

# Logger handler 
logger.addHandler(file_handler)

# Read from delta table

In [4]:
try:
    logger.info("start reading the delta table")
    emp_delta_table = os.path.join(config['DELTA']['delta_table_path'],"datq", "employee")
    empdf = (spark
             .read
             .format("delta")
             .load(emp_delta_table)
            )
    logger.info("end reading the delta")
except: 
    logger.error("cannot read delta table")

In [6]:
empdf.show(1,vertical=True)

-RECORD 0-----------------------------------
 Age                      | 41              
 Attrition                | 1               
 BusinessTravel           | Travel_Rarely   
 DailyRate                | 1102            
 Department               | Sales           
 DistanceFromHome         | 1               
 Education                | 2               
 EducationField           | Life Sciences   
 EmployeeCount            | 1               
 EmployeeNumber           | 1               
 EnvironmentSatisfaction  | 2               
 Gender                   | Female          
 HourlyRate               | 94              
 JobInvolvement           | 3               
 JobLevel                 | 2               
 JobRole                  | Sales Executive 
 JobSatisfaction          | 4               
 MaritalStatus            | Single          
 MonthlyIncome            | 5993            
 MonthlyRate              | 19479           
 NumCompaniesWorked       | 8               
 Over18   

In [15]:
empdf.filter(col("MonthlyIncome") > 10000).count()

219

# Acid properties

In [11]:
from delta.tables import DeltaTable
from pyspark.sql.functions import expr, col

# Update the delta file

In [8]:
emp_delta_data = os.path.join(config['DELTA']['delta_table_path'],"datq", "employee")
delta_table = DeltaTable.forPath(spark, emp_delta_data)

In [16]:
(delta_table
  .update(
    condition = expr("MonthlyIncome >= 12000"),
    set = { "JobSatisfaction": expr("JobSatisfaction + 1") }
  )
)

In [19]:
(delta_table
  .toDF()
  .orderBy("JobSatisfaction")
  .show(1,vertical=True)
)

-RECORD 0----------------------------------------
 Age                      | 29                   
 Attrition                | 0                    
 BusinessTravel           | Travel_Rarely        
 DailyRate                | 1389                 
 Department               | Research & Develo... 
 DistanceFromHome         | 21                   
 Education                | 4                    
 EducationField           | Life Sciences        
 EmployeeCount            | 1                    
 EmployeeNumber           | 20                   
 EnvironmentSatisfaction  | 2                    
 Gender                   | Female               
 HourlyRate               | 51                   
 JobInvolvement           | 4                    
 JobLevel                 | 3                    
 JobRole                  | Manufacturing Dir... 
 JobSatisfaction          | 1                    
 MaritalStatus            | Divorced             
 MonthlyIncome            | 9980                 


# Check history

In [20]:
delta_table_history = (DeltaTable
                        .forPath(spark, emp_delta_data)
                        .history()
                      )

In [23]:
delta_table_history.show(vertical=True)

-RECORD 0-----------------------------------
 version             | 1                    
 timestamp           | 2023-11-25 16:23:... 
 userId              | null                 
 userName            | null                 
 operation           | UPDATE               
 operationParameters | {predicate -> (Mo... 
 job                 | null                 
 notebook            | null                 
 clusterId           | null                 
 readVersion         | 0                    
 isolationLevel      | Serializable         
 isBlindAppend       | false                
 operationMetrics    | {numRemovedFiles ... 
 userMetadata        | null                 
 engineInfo          | Apache-Spark/3.3.... 
-RECORD 1-----------------------------------
 version             | 0                    
 timestamp           | 2023-11-25 09:48:... 
 userId              | null                 
 userName            | null                 
 operation           | WRITE                
 operation

In [24]:
(delta_table_history
   .select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo")
   .show()
)

+-------+--------------------+---------+--------------------+--------------------+--------------------+
|version|           timestamp|operation| operationParameters|    operationMetrics|          engineInfo|
+-------+--------------------+---------+--------------------+--------------------+--------------------+
|      1|2023-11-25 16:23:...|   UPDATE|{predicate -> (Mo...|{numRemovedFiles ...|Apache-Spark/3.3....|
|      0|2023-11-25 09:48:...|    WRITE|{mode -> Overwrit...|{numFiles -> 1, n...|Apache-Spark/3.3....|
+-------+--------------------+---------+--------------------+--------------------+--------------------+



# Delete the data 

In [32]:
empdf.filter(col("Age") <= 18).count()

6

In [33]:
(delta_table
  .delete(
    condition = expr("Age <= 18")
  )
)

In [34]:
delta_table_history = (DeltaTable
                        .forPath(spark, emp_delta_data)
                        .history()
                      )

In [36]:
(delta_table_history
   .select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo")
   .show(vertical=True,truncate=False)
)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 2                                                                                                                                                                                                                                     
 timestamp           | 2023-11-25 16:35:18.513                                                                                                                                                                                                               
 operation           | DELETE                                                                                                                                                                                                                 

# latest version of data

In [37]:
try:
    logger.info("start reading the delta table")
    emp_delta_table = os.path.join(config['DELTA']['delta_table_path'],"datq", "employee")
    empdf = (spark
             .read
             .format("delta")
             .load(emp_delta_table)
            )
    logger.info("end reading the delta")
except: 
    logger.error("cannot read delta table")

In [38]:
empdf.count()

1052

# Time travle in Delta table

In [39]:
emp_delta_table = os.path.join(config['DELTA']['delta_table_path'],"datq", "employee")
df = (spark
        .read
        .format("delta")
        .option("versionAsOf", 0) # we pass an option `versionAsOf` with the required version number we are interested in
        .load(emp_delta_table)
        .orderBy("EmployeeNumber")
      )

df.show(3, vertical=True, truncate=False)

-RECORD 0------------------------------------------
 Age                      | 41                     
 Attrition                | 1                      
 BusinessTravel           | Travel_Rarely          
 DailyRate                | 1102                   
 Department               | Sales                  
 DistanceFromHome         | 1                      
 Education                | 2                      
 EducationField           | Life Sciences          
 EmployeeCount            | 1                      
 EmployeeNumber           | 1                      
 EnvironmentSatisfaction  | 2                      
 Gender                   | Female                 
 HourlyRate               | 94                     
 JobInvolvement           | 3                      
 JobLevel                 | 2                      
 JobRole                  | Sales Executive        
 JobSatisfaction          | 4                      
 MaritalStatus            | Single                 
 MonthlyInco

In [40]:
df.count()

1058

In [41]:
emp_delta_table = os.path.join(config['DELTA']['delta_table_path'],"datq", "employee")
df = (spark
        .read
        .format("delta")
        .option("versionAsOf", 2) # we pass an option `versionAsOf` with the required version number we are interested in
        .load(emp_delta_table)
        .orderBy("EmployeeNumber")
      )

df.show(3, vertical=True, truncate=False)

-RECORD 0------------------------------------------
 Age                      | 41                     
 Attrition                | 1                      
 BusinessTravel           | Travel_Rarely          
 DailyRate                | 1102                   
 Department               | Sales                  
 DistanceFromHome         | 1                      
 Education                | 2                      
 EducationField           | Life Sciences          
 EmployeeCount            | 1                      
 EmployeeNumber           | 1                      
 EnvironmentSatisfaction  | 2                      
 Gender                   | Female                 
 HourlyRate               | 94                     
 JobInvolvement           | 3                      
 JobLevel                 | 2                      
 JobRole                  | Sales Executive        
 JobSatisfaction          | 4                      
 MaritalStatus            | Single                 
 MonthlyInco

In [42]:
df.count()

1052