<a href="https://colab.research.google.com/github/selvarajruban/colab-notebooks/blob/main/json_flattening.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
import json
import pandas as pd

In [8]:
data = '''
{
"technologies":
         [
         { "Courses": "Spark", "Fee": 22000,"Duration":"40Days"},
         { "Courses": "PySpark","Fee": 25000,"Duration":"60Days"},
         { "Courses": "Hadoop", "Fee": 23000,"Duration":"50Days"}
         ],
"status": ["ok"]
}
'''

json_df = json.loads(data)

print(type(json_df))

df = pd.DataFrame(json_df)

<class 'dict'>


ValueError: ignored

In [None]:

def execute_autoflatten(df):
    '''
    Description:
    This function executes the core autoflattening operation
    :param df: [type: pyspark.sql.dataframe.DataFrame] dataframe to be used for flattening
    :param json_column_name: [type: string] name of the column with json string
    :return df: DataFrame containing flattened records
    '''
    # gets all fields of StructType or ArrayType in the nested_fields dictionary
    nested_fields = dict([
        (field.name, field.dataType)
        for field in df.schema.fields
        if isinstance(field.dataType, ArrayType) or isinstance(field.dataType, StructType)
    ])

    # repeat until all nested_fields i.e. belonging to StructType or ArrayType are covered
    while nested_fields:
        # if there are any elements in the nested_fields dictionary
        if nested_fields:
            # get a column
            column_name = list(nested_fields.keys())[0]
            # if field belongs to a StructType, all child fields inside it are accessed
            # and are aliased with complete path to every child field
            if isinstance(nested_fields[column_name], StructType):
                unnested = [col(column_name + '.' + child).alias(column_name + '_' + child) for child in [ n.name for n in  nested_fields[column_name]]]
                df = df.select("*", *unnested).drop(column_name)
            # else, if the field belongs to an ArrayType, an explode_outer is done
            elif isinstance(nested_fields[column_name], ArrayType):
                df = df.withColumn(column_name, explode_outer(column_name))

        # Now that df is updated, gets all fields of StructType and ArrayType in a fresh nested_fields dictionary
        nested_fields = dict([
            (field.name, field.dataType)
            for field in df.schema.fields
            if isinstance(field.dataType, ArrayType) or isinstance(field.dataType, StructType)
        ])

    # renaming all fields extracted with json> to retain complete path to the field
    #for df_col_name in df.columns:
    #    df = df.withColumnRenamed(df_col_name, df_col_name.replace("transformedJSON", json_column_name))
    return df
