In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Fix Headers") \
    .master("local[2]") \
    .getOrCreate()

spark

23/09/24 19:58:31 WARN Utils: Your hostname, Amandeeps-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.114 instead (on interface en0)
23/09/24 19:58:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/24 19:58:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
from pyspark.sql import DataFrame

raw_df = spark.read.option("multiline", True).json("/Users/amandeepsinghjohar/Code/JupterLab/data/data.json")


In [10]:
raw_df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{{{Wonderland, 123 Main St, 45678}, [{email, alice@email.com}, {phone, +1234567890}]}, Alice, 1}, {{{Dreamville, 456 Elm St, 78901}, [{email, bob@email.com}]}, Bob, 2}]|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [16]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df



json_df=flatten(raw_df)

Processing :data Type : <class 'pyspark.sql.types.ArrayType'>
Processing :data Type : <class 'pyspark.sql.types.StructType'>
Processing :data_details Type : <class 'pyspark.sql.types.StructType'>
Processing :data_details_address Type : <class 'pyspark.sql.types.StructType'>
Processing :data_details_contacts Type : <class 'pyspark.sql.types.ArrayType'>
Processing :data_details_contacts Type : <class 'pyspark.sql.types.StructType'>


In [18]:
json_df.show(truncate=True)

+---------+------------+-------------------------+---------------------------+----------------------------+--------------------------+---------------------------+
|data_name|data_user_id|data_details_address_city|data_details_address_street|data_details_address_zipcode|data_details_contacts_type|data_details_contacts_value|
+---------+------------+-------------------------+---------------------------+----------------------------+--------------------------+---------------------------+
|    Alice|           1|               Wonderland|                123 Main St|                       45678|                     email|            alice@email.com|
|    Alice|           1|               Wonderland|                123 Main St|                       45678|                     phone|                +1234567890|
|      Bob|           2|               Dreamville|                 456 Elm St|                       78901|                     email|              bob@email.com|
+---------+-----------