## DeltaTable Creation

In [0]:
from delta.tables import *
DeltaTable.create(spark)\
    .tableName("employee_insert") \
    .addColumn("emp_id","INT") \
    .addColumn("emp_name","STRING") \
    .addColumn("gender","STRING") \
    .addColumn("salary","INT") \
    .addColumn("Dept","String") \
    .property("Description","Table for employee") \
    .location("/FileStore/tables/delta/Insertable") \
    .execute()

Out[11]: <delta.tables.DeltaTable at 0x7fe524676640>

### Sql Style Insert

In [0]:
%sql  
select * from   employee_insert

emp_id,emp_name,gender,salary,Dept


In [0]:
%sql
INSERT INTO employee_insert VALUES(100,"Stephen","M",10000,"IT");
INSERT INTO employee_insert VALUES(200,"Sare","F",13000,"IT");

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql 
select * from employee_insert

emp_id,emp_name,gender,salary,Dept
100,Stephen,M,10000,IT
200,Sare,F,13000,IT


## Dataframe Insert

In [0]:
from pyspark.sql.types import IntegerType,StringType

employee_date = [(333,"Gayel","M",8000000,"Owner")]

employee_schema = StructType([ \
    StructField("emp_id",   IntegerType(),  False),\
    StructField("emp_name", StringType(),   True),\
    StructField("gender",   StringType(),   True),\
    StructField("salary",   IntegerType(),  True),\
    StructField("dept",     StringType(),   True)])

df = spark.createDataFrame(data=employee_date,schema=employee_schema)
display(df)

emp_id,emp_name,gender,salary,dept
333,Gayel,M,8000000,Owner


In [0]:
df.write.format("delta").mode("append").saveAsTable("employee_insert")

In [0]:
%sql
select * from employee_insert

emp_id,emp_name,gender,salary,Dept
333,Gayel,M,8000000,Owner
100,Stephen,M,10000,IT
200,Sare,F,13000,IT


In [0]:
display(spark.sql("select * from employee_insert"))

emp_id,emp_name,gender,salary,Dept
333,Gayel,M,8000000,Owner
100,Stephen,M,10000,IT
200,Sare,F,13000,IT


### DataFrame Insert Into Method

In [0]:
from pyspark.sql.types import IntegerType,StringType

employee_date = [(111,"Pandya","M",5000000,"Sub-Owner")]

employee_schema = StructType([ \
    StructField("emp_id",   IntegerType(),  False),\
    StructField("emp_name", StringType(),   True),\
    StructField("gender",   StringType(),   True),\
    StructField("salary",   IntegerType(),  True),\
    StructField("dept",     StringType(),   True)])

df = spark.createDataFrame(data=employee_date,schema=employee_schema)
display(df)

emp_id,emp_name,gender,salary,dept
111,Pandya,M,5000000,Sub-Owner


In [0]:
df.write.insertInto("employee_insert",overwrite=False)

In [0]:
display(spark.sql("select * from employee_insert"))

emp_id,emp_name,gender,salary,Dept
111,Pandya,M,5000000,Sub-Owner
333,Gayel,M,8000000,Owner
100,Stephen,M,10000,IT
200,Sare,F,13000,IT


### Insert Using Temp View

In [0]:
from pyspark.sql.types import IntegerType,StringType

employee_date = [(112,"Partha","M",5000000,"CTO")]

employee_schema = StructType([ \
    StructField("emp_id",   IntegerType(),  False),\
    StructField("emp_name", StringType(),   True),\
    StructField("gender",   StringType(),   True),\
    StructField("salary",   IntegerType(),  True),\
    StructField("dept",     StringType(),   True)])

df1 = spark.createDataFrame(data=employee_date,schema=employee_schema)
display(df1)

emp_id,emp_name,gender,salary,dept
112,Partha,M,5000000,CTO


In [0]:
df1.createOrReplaceTempView("temp_emp_table")

In [0]:
%sql
select * from temp_emp_table

emp_id,emp_name,gender,salary,dept
112,Partha,M,5000000,CTO


In [0]:
%sql
INSERT INTO employee_insert
SELECT * from temp_emp_table

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
select * from employee_insert

emp_id,emp_name,gender,salary,Dept
111,Pandya,M,5000000,Sub-Owner
333,Gayel,M,8000000,Owner
100,Stephen,M,10000,IT
112,Partha,M,5000000,CTO
200,Sare,F,13000,IT


### Spark SQL Insert

In [0]:
spark.sql("INSERT INTO employee_insert SELECT * from temp_emp_table")

Out[33]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
select * from employee_insert

emp_id,emp_name,gender,salary,Dept
111,Pandya,M,5000000,Sub-Owner
333,Gayel,M,8000000,Owner
100,Stephen,M,10000,IT
112,Partha,M,5000000,CTO
112,Partha,M,5000000,CTO
200,Sare,F,13000,IT


## Write to a table Mode

##Append
To atomically add new data to an existing Delta table, use append mode:






You can selectively overwrite only the data that matches an arbitrary expression.

In [0]:
df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")

## Overwrite
To atomically replace all the data in a table, use overwrite mode:

In [0]:
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")

#### *You can selectively overwrite only the data that matches an arbitrary expression.

In [0]:
# The following command atomically replaces events in January in the target table, which is partitioned by start_date, with the data in replace_data:

replace_data.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \
  .save("/tmp/delta/events")

This sample code writes out the data in replace_data, validates that it all matches the predicate, and performs an atomic replacement. If you want to write out data that doesn’t all match the predicate, to replace the matching rows in the target table, you can disable the constraint check by setting spark.databricks.delta.replaceWhere.constraintCheck.enabled to false:

In [0]:
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

In Delta Lake 1.0.0 and below, replaceWhere overwrites data matching a predicate over partition columns only. The following command atomically replaces the month in January in the target table, which is partitioned by date, with the data in df:

In [0]:
df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \
  .save("/tmp/delta/people10m")

In Delta Lake 1.1.0 and above, if you want to fall back to the old behavior, you can disable the spark.databricks.delta.replaceWhere.dataColumns.enabled flag:

In [0]:
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

### Limit rows written in a file

You can also use the DataFrameWriter option maxRecordsPerFile when using the DataFrame APIs to write to a Delta Lake table. When maxRecordsPerFile is specified, the value of the SQL session configuration spark.sql.files.maxRecordsPerFile is ignored.

In [0]:
df.write.format("delta") \
  .mode("append") \
  .option("maxRecordsPerFile", "10000") \
  .save("/tmp/delta/people10m")

### Idempotent writes

Sometimes a job that writes data to a Delta table is restarted due to various reasons (for example, job encounters a failure). The failed job may or may not have written the data to Delta table before terminating. In the case where the data is written to the Delta table, the restarted job writes the same data to the Delta table which results in duplicate data.

To address this, Delta tables support the following DataFrameWriter options to make the writes idempotent:

txnAppId: A unique string that you can pass on each DataFrame write. For example, this can be the name of the job.

txnVersion: A monotonically increasing number that acts as transaction version. This number needs to be unique for data that is being written to the Delta table(s). For example, this can be the epoch seconds of the instant when the query is attempted for the first time. Any subsequent restarts of the same job needs to have the same value for txnVersion.

The above combination of options needs to be unique for each new data that is being ingested into the Delta table and the txnVersion needs to be higher than the last data that was ingested into the Delta table. For example:

Last successfully written data contains option values as dailyETL:23423 (txnAppId:txnVersion).

Next write of data should have txnAppId = dailyETL and txnVersion as at least 23424 (one more than the last written data txnVersion).

Any attempt to write data with txnAppId = dailyETL and txnVersion as 23422 or less is ignored because the txnVersion is less than the last recorded txnVersion in the table.

Attempt to write data with txnAppId:txnVersion as anotherETL:23424 is successful writing data to the table as it contains a different txnAppId compared to the same option value in last ingested data.

You can also configure idempotent writes by setting the Spark session configuration spark.databricks.delta.write.txnAppId and spark.databricks.delta.write.txnVersion. In addition, you can set spark.databricks.delta.write.txnVersion.autoReset.enabled to true to automatically reset spark.databricks.delta.write.txnVersion after every write. When both the writer options and session configuration are set, we will use the writer option values.

In [0]:
app_id = ... # A unique string that is used as an application ID.
version = ... # A monotonically increasing number that acts as transaction version.

dataFrame.write.format(...).option("txnVersion", version).option("txnAppId", app_id).save(...)

#### Set user-defined commit metadata

You can specify user-defined strings as metadata in commits made by these operations, either using the DataFrameWriter option userMetadata or the SparkSession configuration spark.databricks.delta.commitInfo.userMetadata. If both of them have been specified, then the option takes preference.

In [0]:
df.write.format("delta") \
  .mode("overwrite") \
  .option("userMetadata", "overwritten-for-fixing-incorrect-data") \
  .save("/tmp/delta/people10m")