# Databricks Delta
#### Load a CSV file

In [2]:
PATH = '/mnt/rawdata/UsedCars.csv'

used_cars = spark.read.option("header", "true").csv(PATH)

In [3]:
display(used_cars)

Price,Age,KM,FuelType,HP,MetColor,Automatic,CC,Doors,Weight
7450.0,65.0,82000.0,Petrol,86,1,0,1300,3,1015
7250.0,74.0,130025.0,Petrol,110,1,0,1600,3,1050
8950.0,80.0,64000.0,Petrol,110,0,0,1600,3,1055
11450.0,54.0,62987.0,Petrol,110,0,0,1600,5,1080
,42.0,38932.0,Petrol,110,1,0,1600,3,1040
6950.0,80.0,62581.0,Petrol,110,0,0,1600,5,1075
8250.0,70.0,59017.0,petrol,107,1,1,1600,3,1080
12950.0,44.0,41499.0,CNG,110,1,0,1600,5,1103
9950.0,65.0,65513.0,Petrol,110,1,1,1600,4,1070
7900.0,75.0,125400.0,Petrol,110,0,0,1600,3,1050


## Persist in Delta format

In [5]:
used_cars.write.mode("overwrite").format("delta").save("/delta/{}".format(file_name), )

In [6]:
display(dbutils.fs.ls("dbfs:/delta/{}".format(file_name)))

path,name,size
dbfs:/delta/UsedCars/_delta_log/,_delta_log/,0
dbfs:/delta/UsedCars/part-00000-914d65a2-a930-4807-8220-e3b789b4db11-c000.snappy.parquet,part-00000-914d65a2-a930-4807-8220-e3b789b4db11-c000.snappy.parquet,19348


## Create table using Delta Lake

In [8]:
spark.sql("""
  DROP TABLE IF EXISTS used_cars
""")

spark.sql("""
  CREATE TABLE used_cars 
  USING DELTA 
  LOCATION '/delta/{}' 
""".format(file_name))

In [9]:
%sql
SELECT * FROM used_cars

Price,Age,KM,FuelType,HP,MetColor,Automatic,CC,Doors,Weight
7450.0,65.0,82000.0,Petrol,86,1,0,1300,3,1015
7250.0,74.0,130025.0,Petrol,110,1,0,1600,3,1050
8950.0,80.0,64000.0,Petrol,110,0,0,1600,3,1055
11450.0,54.0,62987.0,Petrol,110,0,0,1600,5,1080
,42.0,38932.0,Petrol,110,1,0,1600,3,1040
6950.0,80.0,62581.0,Petrol,110,0,0,1600,5,1075
8250.0,70.0,59017.0,petrol,107,1,1,1600,3,1080
12950.0,44.0,41499.0,CNG,110,1,0,1600,5,1103
9950.0,65.0,65513.0,Petrol,110,1,1,1600,4,1070
7900.0,75.0,125400.0,Petrol,110,0,0,1600,3,1050


###Review Spark Catalog

In [11]:
spark.catalog.listTables()

## Split by partition
Use Fuel Type to split in partitions (performance)

In [13]:
file_name_by_partition = "UsedCars_by_fuel_type"

used_cars.write.mode("overwrite").partitionBy("FuelType").format("delta").save("/delta/{}".format(file_name_by_partition))
spark.sql("CREATE TABLE used_cars_by_fuel_type USING DELTA LOCATION '/delta/{}/'".format(file_name_by_partition))

In [14]:
display(dbutils.fs.ls("dbfs:/delta"))

path,name,size
dbfs:/delta/UsedCars/,UsedCars/,0
dbfs:/delta/UsedCars_by_fuel_type/,UsedCars_by_fuel_type/,0


In [15]:
%sql
SELECT FuelType, COUNT(*) FROM used_cars_by_fuel_type GROUP BY FuelType

FuelType,count(1)
Diesel,147
diesel,10
petrol,199
CNG,9
methane,4
Petrol,1073
CompressedNaturalGas,4


In [16]:
%sql
DESCRIBE DETAIL used_cars_by_fuel_type

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,597dbe3f-fe82-4f16-9889-0d95939c4087,default.used_cars_by_fuel_type,,dbfs:/delta/UsedCars_by_fuel_type,2020-06-18T19:48:44.844+0000,2020-06-18T19:48:47.000+0000,List(FuelType),7,32908,Map(),1,2


## Time Travel
Because Delta Lake is version controlled, you have the option to query past versions of the data. Let's look at the history of our current Delta table.

In [18]:
%sql
DESCRIBE HISTORY used_cars

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
2,2020-06-18T19:58:33.000+0000,2671230428599712,lubraz@microsoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3418276086377984),0618-190813-trues609,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 2301, numOutputRows -> 1)"
1,2020-06-18T19:48:06.000+0000,2671230428599712,lubraz@microsoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(3418276086377984),0618-190813-trues609,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 19347, numOutputRows -> 1446)"
0,2020-06-18T19:32:09.000+0000,2671230428599712,lubraz@microsoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(3418276086377984),0618-190813-trues609,,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 19347, numOutputRows -> 1446)"


### Insert a new row

In [20]:
%sql
INSERT INTO used_cars
SELECT 5500, 44, 34000, 'Petrol', 110, 1, 1, 1300, 3, 1005

Querying an older version is as easy as adding `VERSION AS OF desired_version`. Let's verify that our table from one version back still exists.

In [22]:
%sql
SELECT count(*)
FROM used_cars
VERSION AS OF 2

count(1)
1447


### Delete Delta

In [24]:
dbutils.fs.rm("/delta/", True)