#** Proof of Concept (POC) - Automatic JSON Flattening from S3 Bucket in Databricks**

### Mounting S3 Bucket

In [0]:
access_key = 'AKIAU72LGJGOY754AEY2'
secret_key = 'PGkCsn5EL/q+X1Qnd+v5GDVENgbAy0hlYZD2qdsS'
encoded_secret_key = secret_key.replace('/', '%2F')
aws_bucket_name = "abcna"
mount_name = "test"

dbutils.fs.mount(f"s3a://{access_key}:{encoded_secret_key}@{aws_bucket_name}", f"/mnt/{mount_name}")

display(dbutils.fs.ls("/mnt/test"))
# dbfs:/mnt/test/

path,name,size,modificationTime
dbfs:/mnt/test/Files/,Files/,0,1745144715367
dbfs:/mnt/test/Instagram_data/,Instagram_data/,0,1745144715367
dbfs:/mnt/test/Nested_Json/,Nested_Json/,0,1745144715367
dbfs:/mnt/test/hudi/,hudi/,0,1745144715367
dbfs:/mnt/test/output/,output/,0,1745144715367
dbfs:/mnt/test/pipedrive/,pipedrive/,0,1745144715367


### Function to Flatten Nested JSON Schema Automatically:

In [0]:
from pyspark.sql.functions import col, explode, flatten
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, ArrayType

# Function to flatten the nested schema
def flatten_df(df):
    # Iterate through all the fields in the schema
    while True:
        # Identify all fields that are arrays or structs
        nested_fields = [(field.name, field.dataType) for field in df.schema.fields if isinstance(field.dataType, (StructType, ArrayType))]
        if not nested_fields:
            break
        
        for field_name, field_type in nested_fields:
            if isinstance(field_type, StructType):
                # Flatten struct columns by selecting individual fields
                expanded = [col(f"{field_name}.{nested_field.name}").alias(f"{field_name}_{nested_field.name}") for nested_field in field_type.fields]
                df = df.select("*", *expanded).drop(field_name)
            elif isinstance(field_type, ArrayType) and isinstance(field_type.elementType, StructType):
                # Explode and flatten array of struct columns
                df = df.withColumn(field_name, explode(field_name))
    
    return df

In [0]:
display(dbutils.fs.ls("/mnt/test/Nested_Json/"))

path,name,size,modificationTime
dbfs:/mnt/test/Nested_Json/sample.json,sample.json,674,1741013890000
dbfs:/mnt/test/Nested_Json/sample2.json,sample2.json,677,1741013890000
dbfs:/mnt/test/Nested_Json/sample3.json,sample3.json,2795,1741013889000


### Fetching JSON Files and Creating DataFrame:

In [0]:
student_raw = spark.read.option("multiLine", "true").json("/mnt/test/Nested_Json/sample.json")
student_raw.display()
student_raw.printSchema()

class,info,school_name,students
Year 1,"List(Scranton, Pennsylvania, List(admin@e.com, 123456789), Michael Scott)",Dunder Miflin,"List(List(61, A1, 60, Jim, 66), List(51, A2, 89, Dwight, 76), List(78, A3, 79, Kevin, 90))"


root
 |-- class: string (nullable = true)
 |-- info: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- contacts: struct (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- tel: string (nullable = true)
 |    |-- president: string (nullable = true)
 |-- school_name: string (nullable = true)
 |-- students: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- chemistry: long (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- math: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- physics: long (nullable = true)



In [0]:

# Flatten the DataFrame
flat_student = flatten_df(student_raw)

# Show the flattened DataFrame
flat_student.display()
flat_student.printSchema()

class,school_name,info_address,info_president,students_chemistry,students_id,students_math,students_name,students_physics,info_contacts_email,info_contacts_tel
Year 1,Dunder Miflin,"Scranton, Pennsylvania",Michael Scott,61,A1,60,Jim,66,admin@e.com,123456789
Year 1,Dunder Miflin,"Scranton, Pennsylvania",Michael Scott,51,A2,89,Dwight,76,admin@e.com,123456789
Year 1,Dunder Miflin,"Scranton, Pennsylvania",Michael Scott,78,A3,79,Kevin,90,admin@e.com,123456789


root
 |-- class: string (nullable = true)
 |-- school_name: string (nullable = true)
 |-- info_address: string (nullable = true)
 |-- info_president: string (nullable = true)
 |-- students_chemistry: long (nullable = true)
 |-- students_id: string (nullable = true)
 |-- students_math: long (nullable = true)
 |-- students_name: string (nullable = true)
 |-- students_physics: long (nullable = true)
 |-- info_contacts_email: string (nullable = true)
 |-- info_contacts_tel: string (nullable = true)



In [0]:
emp_raw = spark.read.option("multiLine", "true").json("/FileStore/tables/sample2.json")
emp_raw.display()
emp_raw.printSchema()

accounting,sales
"List(List(23, John, Doe), List(32, Mary, Smith))","List(List(27, Sally, Green), List(41, Jim, Galley))"


root
 |-- accounting: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- lastName: string (nullable = true)
 |-- sales: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- lastName: string (nullable = true)



In [0]:

# Flatten the DataFrame
flat_emp = flatten_df(emp_raw)

# Show the flattened DataFrame
flat_emp.display()
flat_emp.printSchema()


accounting_age,accounting_firstName,accounting_lastName,sales_age,sales_firstName,sales_lastName
23,John,Doe,27,Sally,Green
23,John,Doe,41,Jim,Galley
32,Mary,Smith,27,Sally,Green
32,Mary,Smith,41,Jim,Galley


root
 |-- accounting_age: long (nullable = true)
 |-- accounting_firstName: string (nullable = true)
 |-- accounting_lastName: string (nullable = true)
 |-- sales_age: long (nullable = true)
 |-- sales_firstName: string (nullable = true)
 |-- sales_lastName: string (nullable = true)



In [0]:
patient_raw = spark.read.option("multiLine", "true").json("/FileStore/tables/sample3.json")
patient_raw.display()
patient_raw.printSchema()

imaging,labs,medications
"List(List(Main Hospital Radiology, Chest X-Ray, Today))","List(List(Main Hospital Lab, Arterial Blood Gas, Today), List(Primary Care Clinic, BMP, Today), List(Primary Care Clinic, BNP, 3 Weeks), List(Primary Care Clinic, BUN, 1 Year), List(Primary Care Clinic, Cardiac Enzymes, Today), List(Primary Care Clinic, CBC, 1 Year), List(Main Hospital Lab, Creatinine, 1 Year), List(Primary Care Clinic, Electrolyte Panel, 1 Year), List(Main Hospital Lab, Glucose, 1 Year), List(Primary Care Clinic, PT/INR, 3 Weeks), List(Coumadin Clinic, PTT, 3 Weeks), List(Primary Care Clinic, TSH, 1 Year))","List(List(List(List(1 tab, lisinopril, #90, Refill 3, PO, daily, 10 mg Tab)), List(List(1 tab, nitroglycerin, #30, Refill 1, SL, q15min PRN, 0.4 mg Sublingual Tab)), List(List(1 tab, warfarin sodium, #90, Refill 3, PO, daily, 3 mg Tab)), List(List(1 tab, metoprolol tartrate, #90, Refill 3, PO, daily, 25 mg Tab)), List(List(1 tab, furosemide, #90, Refill 3, PO, daily, 40 mg Tab)), List(List(1 tab, potassium chloride ER, #90, Refill 3, PO, daily, 10 mEq Tab))))"


root
 |-- imaging: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- time: string (nullable = true)
 |-- labs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- time: string (nullable = true)
 |-- medications: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- aceInhibitors: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- dose: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- pillCount: string (nullable = true)
 |    |    |    |    |-- refills: string (nullable = true)
 |    |    |    |    |-- route: string (nullable = true)
 |    |    |    |    |-- sig: string (nullable = tr

In [0]:

# Flatten the DataFrame
flat_patient = flatten_df(patient_raw)

# Show the flattened DataFrame
flat_patient.display()
flat_patient.printSchema()


imaging_location,imaging_name,imaging_time,labs_location,labs_name,labs_time,medications_aceInhibitors_dose,medications_aceInhibitors_name,medications_aceInhibitors_pillCount,medications_aceInhibitors_refills,medications_aceInhibitors_route,medications_aceInhibitors_sig,medications_aceInhibitors_strength,medications_antianginal_dose,medications_antianginal_name,medications_antianginal_pillCount,medications_antianginal_refills,medications_antianginal_route,medications_antianginal_sig,medications_antianginal_strength,medications_anticoagulants_dose,medications_anticoagulants_name,medications_anticoagulants_pillCount,medications_anticoagulants_refills,medications_anticoagulants_route,medications_anticoagulants_sig,medications_anticoagulants_strength,medications_betaBlocker_dose,medications_betaBlocker_name,medications_betaBlocker_pillCount,medications_betaBlocker_refills,medications_betaBlocker_route,medications_betaBlocker_sig,medications_betaBlocker_strength,medications_diuretic_dose,medications_diuretic_name,medications_diuretic_pillCount,medications_diuretic_refills,medications_diuretic_route,medications_diuretic_sig,medications_diuretic_strength,medications_mineral_dose,medications_mineral_name,medications_mineral_pillCount,medications_mineral_refills,medications_mineral_route,medications_mineral_sig,medications_mineral_strength
Main Hospital Radiology,Chest X-Ray,Today,Main Hospital Lab,Arterial Blood Gas,Today,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab
Main Hospital Radiology,Chest X-Ray,Today,Primary Care Clinic,BMP,Today,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab
Main Hospital Radiology,Chest X-Ray,Today,Primary Care Clinic,BNP,3 Weeks,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab
Main Hospital Radiology,Chest X-Ray,Today,Primary Care Clinic,BUN,1 Year,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab
Main Hospital Radiology,Chest X-Ray,Today,Primary Care Clinic,Cardiac Enzymes,Today,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab
Main Hospital Radiology,Chest X-Ray,Today,Primary Care Clinic,CBC,1 Year,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab
Main Hospital Radiology,Chest X-Ray,Today,Main Hospital Lab,Creatinine,1 Year,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab
Main Hospital Radiology,Chest X-Ray,Today,Primary Care Clinic,Electrolyte Panel,1 Year,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab
Main Hospital Radiology,Chest X-Ray,Today,Main Hospital Lab,Glucose,1 Year,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab
Main Hospital Radiology,Chest X-Ray,Today,Primary Care Clinic,PT/INR,3 Weeks,1 tab,lisinopril,#90,Refill 3,PO,daily,10 mg Tab,1 tab,nitroglycerin,#30,Refill 1,SL,q15min PRN,0.4 mg Sublingual Tab,1 tab,warfarin sodium,#90,Refill 3,PO,daily,3 mg Tab,1 tab,metoprolol tartrate,#90,Refill 3,PO,daily,25 mg Tab,1 tab,furosemide,#90,Refill 3,PO,daily,40 mg Tab,1 tab,potassium chloride ER,#90,Refill 3,PO,daily,10 mEq Tab


root
 |-- imaging_location: string (nullable = true)
 |-- imaging_name: string (nullable = true)
 |-- imaging_time: string (nullable = true)
 |-- labs_location: string (nullable = true)
 |-- labs_name: string (nullable = true)
 |-- labs_time: string (nullable = true)
 |-- medications_aceInhibitors_dose: string (nullable = true)
 |-- medications_aceInhibitors_name: string (nullable = true)
 |-- medications_aceInhibitors_pillCount: string (nullable = true)
 |-- medications_aceInhibitors_refills: string (nullable = true)
 |-- medications_aceInhibitors_route: string (nullable = true)
 |-- medications_aceInhibitors_sig: string (nullable = true)
 |-- medications_aceInhibitors_strength: string (nullable = true)
 |-- medications_antianginal_dose: string (nullable = true)
 |-- medications_antianginal_name: string (nullable = true)
 |-- medications_antianginal_pillCount: string (nullable = true)
 |-- medications_antianginal_refills: string (nullable = true)
 |-- medications_antianginal_route: st