# Databricks Demo 
***
1. Create source data as JSON (usually get it via data ingestion process as a batch data or stream data)
2. Read source data to spark dataframe
3. Transformed data and simplify the data schema
4. Persist the simplified version of data 
5. Read the data from storage

In [0]:
import pyspark.pandas as ps
import pandas as pd
import json
from pyspark.sql.functions import explode

## 1. Create Test Data with complex json nested structure 
***

In [0]:
# create payloads 
payload1 = {"EmpId": "A01", "IsPermanent": True, "Department": [{"DepartmentID": "D1", "DepartmentName": "Data Science"}]}
payload2 = {"EmpId": "A02", "IsPermanent": False, "Department": [{"DepartmentID": "D2", "DepartmentName": "Application"}]}
payload3 = {"EmpId": "A03", "IsPermanent": True, "Department": [{"DepartmentID": "D1", "DepartmentName": "Data Science"}]}
payload4 = {"EmpId": "A04", "IsPermanent": False, "Department": [{"DepartmentID": "D2", "DepartmentName": "Application"}]}

# create data structure
data =[
  {"EventID": 1, "Payload": payload1},
  {"EventID": 2, "Payload": payload2},
  {"EventID": 3, "Payload": payload3},
  {"EventID": 4, "Payload": payload4}
]

## 2. Get data to the spark dataframe
***

In [0]:
# dump data to json 
jsonData = json.dumps(data)

# append json data to list 
jsonDataList = []
jsonDataList.append(jsonData)

# parallelize json data
jsonRDD = sc.parallelize(jsonDataList)

# store data to spark dataframe 
df = spark.read.json(jsonRDD)

# Show data
display(df)

EventID,Payload
1,"List(List(List(D1, Data Science)), A01, true)"
2,"List(List(List(D2, Application)), A02, false)"
3,"List(List(List(D1, Data Science)), A03, true)"
4,"List(List(List(D2, Application)), A04, false)"


### 2.1 Check original data schema 
***

In [0]:
df.printSchema()

root
 |-- EventID: long (nullable = true)
 |-- Payload: struct (nullable = true)
 |    |-- Department: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- DepartmentID: string (nullable = true)
 |    |    |    |-- DepartmentName: string (nullable = true)
 |    |-- EmpId: string (nullable = true)
 |    |-- IsPermanent: boolean (nullable = true)



### 2.2 Explode data to make data schema simple
***

In [0]:
df1 = df.selectExpr("EventID", "Payload.EmpId", "Payload.IsPermanent","explode(Payload.Department) as Department")
df1.printSchema()

root
 |-- EventID: long (nullable = true)
 |-- EmpId: string (nullable = true)
 |-- IsPermanent: boolean (nullable = true)
 |-- Department: struct (nullable = true)
 |    |-- DepartmentID: string (nullable = true)
 |    |-- DepartmentName: string (nullable = true)



### 2.3 Further simplifies the schema
***

In [0]:
df2 = df1.selectExpr("EventID", "EmpId", "IsPermanent","Department.DepartmentID", "Department.DepartmentName")
df2.printSchema()

root
 |-- EventID: long (nullable = true)
 |-- EmpId: string (nullable = true)
 |-- IsPermanent: boolean (nullable = true)
 |-- DepartmentID: string (nullable = true)
 |-- DepartmentName: string (nullable = true)



## 3. Save the transformed data to the storage
***

In [0]:
display(dbutils.fs.ls("/"))

path,name,size,modificationTime
dbfs:/FileStore/,FileStore/,0,0
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/delta/,delta/,0,0
dbfs:/user/,user/,0,0


In [0]:
tbl_name = "/delta/employee_tbl"

# Perform write operation
(df2.write
  .mode("overwrite")
  .format("delta")
  .option("overwriteSchema", "true")
  .save(tbl_name))

# List all delta tables
display(dbutils.fs.ls("./delta/"))

path,name,size,modificationTime
dbfs:/delta/city/,city/,0,0
dbfs:/delta/diamonds/,diamonds/,0,0
dbfs:/delta/employee_tbl/,employee_tbl/,0,0


In [0]:
# display data inside the employee_tbl
display(spark.read.load("/delta/employee_tbl"))

EmpId,IsPermanent,DepartmentID,DepartmentName
A01,True,D1,Data Science
A02,False,D2,Application
A03,True,D1,Data Science
A04,False,D2,Application


In [0]:
display(dbutils.fs.ls("./delta/employee_tbl/"))

path,name,size,modificationTime
dbfs:/delta/employee_tbl/_delta_log/,_delta_log/,0,0
dbfs:/delta/employee_tbl/part-00007-b9d7dc6c-8abe-457b-bead-90cb0bf0d630-c000.snappy.parquet,part-00007-b9d7dc6c-8abe-457b-bead-90cb0bf0d630-c000.snappy.parquet,1435,1679536133000


In [0]:
display(dbutils.fs.ls("./delta/employee_tbl/_delta_log"))

path,name,size,modificationTime
dbfs:/delta/employee_tbl/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2738,1679536137000
dbfs:/delta/employee_tbl/_delta_log/00000000000000000000.json,00000000000000000000.json,1748,1679536134000


In [0]:
dbutils.fs.rm("./delta/employee_tbl/",True)

Out[54]: True