##Creating a table from dummy data and saving it in delta table format

In [0]:
data = spark.range(0,5)
data.write.format('delta').save('/tmp/delta-table')

## Reading the data from saved delta table

In [0]:
df = spark.read.format('delta').load('/tmp/delta-table')
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



## Modifying the eisting delta table with new values

In [0]:
data = spark.range(5, 10)
data.write.format('delta').mode('overwrite').save('/tmp/delta-table')

## Conditionally updating the data without overwriting
Using Delta Lake APIs to conditionally update, delete and merging data into tables.

In [0]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  7|
|  8|
|  9|
| 12|
| 13|
| 14|
| 17|
| 18|
| 19|
|  0|
|  1|
|  5|
|  6|
| 10|
| 11|
| 15|
| 16|
+---+



## Reading the older version of data using time travel
We can query previous snapshots of your Delta table by using time travel

In [0]:
df = spark.read.format("delta") \
  .option("versionAsOf", 0) \
  .load("/tmp/delta-table")

df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



## Writing stream of data into a table.
We can also write to a Delta table using Structured Streaming. The Delta Lake transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table:

In [0]:
streamingDf = spark.readStream.format("rate").load()

stream = streamingDf \
  .selectExpr("value as id") \
  .writeStream.format("delta") \
  .option("checkpointLocation", "/tmp/checkpoint") \
  .start("/tmp/delta-table")

## Stopping the streaming data

In [0]:
stream.stop()

## Reading stream of data from a table.
While the stream is writing to the Delta table, we can also read from that table as streaming source. For example, we can start another streaming query that prints all the changes made to the Delta table. We can specify which version Structured Streaming should start from by providing the startingVersion or startingTimestamp option to get changes from that point onwards.

In [0]:
stream2 = spark.readStream.format("delta") \
  .load("/tmp/delta-table") \
  .writeStream.format("console") \
  .start()

## Again, stopping the streaming data

In [0]:
stream2.stop()