https://medium.com/@thomaspt748/how-to-flatten-json-files-dynamically-using-apache-pyspark-c6b1b5fd4777

In [0]:
from typing import Final, Dict, Tuple

from pyspark.sql.session import SparkSession
from pyspark.sql import DataFrame as SDF
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, ArrayType

In [0]:
def rename_dataframe_cols(df: SDF, col_names: Dict[str, str]) -> SDF:
    """
    Rename all columns in dataframe
    """
    return df.select(*[col(col_name).alias(col_names.get(col_name, col_name)) for col_name in df.columns])

def update_column_names(df: SDF, index: int) -> SDF:
    df_temp = df
    all_cols = df_temp.columns
    new_cols = dict((column, f"{column}*{index}") for column in all_cols)
    df_temp = df_temp.transform(lambda df_x: rename_dataframe_cols(df_x, new_cols))

    return df_temp


In [0]:
def flatten_json(df_arg: SDF, index: int = 1) -> SDF:
    """
    Flatten Json in a spark dataframe using recursion
    """
	# Update all column names with index 1
    df = update_column_names(df_arg, index) if index == 1 else df_arg

	# Get all field names fron the dataframe
    fields = df.schema.fields

	# For all columns in the dataframe
    for field in fields:
        data_type = str(field.dataType)
        column_name = field.name

        first_10_chars = data_type[0:10]
	
        # If it is an Array column
        if first_10_chars == 'ArrayType(':
            # Explode Array column
            df_temp = df.withColumn(column_name, explode_outer(col(column_name)))
            return flatten_json(df_temp, index + 1)

        # If it is a json object
        elif first_10_chars == 'StructType':
            current_col = column_name
            
            append_str = current_col

            # Get data type of current column
            data_type_str = str(df.schema[current_col].dataType)

            # Change the column name if the current column name exists in the data type string
            df_temp = df.withColumnRenamed(column_name, column_name + "#1") \
                if column_name in data_type_str else df
            current_col = current_col + "#1" if column_name in data_type_str else current_col

            # Expand struct column values
            df_before_expanding = df_temp.select(f"{current_col}.*")
            newly_gen_cols = df_before_expanding.columns

            # Find next level value for the column
            begin_index = append_str.rfind('*')
            end_index = len(append_str)
            level = append_str[begin_index + 1: end_index]
            next_level = int(level) + 1

            # Update column names with new level
            custom_cols = dict((field, f"{append_str}->{field}*{next_level}") for field in newly_gen_cols)
            df_temp2 = df_temp.select("*", f"{current_col}.*").drop(current_col)
            df_temp3 = df_temp2.transform(lambda df_x: rename_dataframe_cols(df_x, custom_cols))
            return flatten_json(df_temp3, index + 1)

    return df

In [0]:
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("FlatJson") \
        .master("local[*]").getOrCreate()

In [0]:
sample_json1 = """
[
	{
		"id": "0001",
		"type": "donut",
		"name": "Cake",
		"___tr": true,
		"ppu": 0.55,
		"batters":
			{
				"batter":
					[
						{ "id": "1001", "type": "Regular" },
						{ "id": "1002", "type": "Chocolate" },
						{ "id": "1003", "type": "Blueberry" },
						{ "id": "1004", "type": "Devil's Food" }
					]
			},
		"topping":
			[
				{ "id": "5001", "type": "None" },
				{ "id": "5002", "type": "Glazed" },
				{ "id": "5005", "type": "Sugar" },
				{ "id": "5007", "type": "Powdered Sugar" },
				{ "id": "5006", "type": "Chocolate with Sprinkles" },
				{ "id": "5003", "type": "Chocolate" },
				{ "id": "5004", "type": "Maple" }
			]
	},
	{
		"id": "0002",
		"type": "donut",
		"name": "Raised",
		"ppu": 0.55,
		"batters":
			{
				"batter":
					[
						{ "id": "1001", "type": "Regular" }
					]
			},
		"topping":
			[
				{ "id": "5001", "type": "None" },
				{ "id": "5002", "type": "Glazed" },
				{ "id": "5005", "type": "Sugar" },
				{ "id": "5003", "type": "Chocolate" },
				{ "id": "5004", "type": "Maple" }
			]
	},
	{
		"id": "0003",
		"type": "donut",
		"name": "Old Fashioned",
		"ppu": 0.55,
		"batters":
			{
				"batter":
					[
						{ "id": "1001", "type": "Regular" },
						{ "id": "1002", "type": "Chocolate" }
					]
			},
		"topping":
			[
				{ "id": "5001", "type": "None" },
				{ "id": "5002", "type": "Glazed" },
				{ "id": "5003", "type": "Chocolate" },
				{ "id": "5004", "type": "Maple" }
			]
	}
]
"""

sample_json = """

{
    "customer": {
      "id": 123,
      "name": "John Doe",
      "number": [1, 2, 3, 4, 5],
      "address": {
        "city": "New York",
        "country": "USA"
      }
    },
    "orders": [
      {"id": 101, "amount": 50},
      {"id": 102, "amount": 100}
    ]
  }
  """

In [0]:
sc = spark.sparkContext
tsRDD = sc.parallelize([sample_json1])

df2 = spark.read.option("multiline", "true").json(tsRDD)

# df2.show(10, False)
df2.display()
df3 = flatten_json(df2)

df3.display()

In [0]:
!pip install pysparketl

In [0]:
data = """ 
  {
    "id": 75200,
    "drug": {
      "current_status": "MARKETED",
      "id": 40,
      "drug_code": 10044,
      "updated_date": "2021-02-08T05:00:09-05:00",
      "company": {
        "id": 6,
        "created_date": "2017-03-09T13:22:33-05:00",
        "updated_date": "2017-03-09T13:22:33-05:00",
        "company_code": 3699,
        "mfr_code": "BAX08",
        "name": "BAXTER CORPORATION",
        "company_type": "DIN OWNER",
        "address_mailing_flag": true,
        "address_billing_flag": true,
        "address_notification_flag": true,
        "address_other": false,
        "address_report_display": true,
        "suite_number": "",
        "street_name": "7125 MISSISSAUGA ROAD",
        "province": "ONTARIO",
        "country": "CANADA",
        "city": "MISSISSAUGA",
        "postal_code": "L5N 0C2",
        "post_office_box": ""
      },
      "drug_ingredients": [
        {
          "id": 6959346,
          "ingredient": {
            "id": 44,
            "ingredient_code": 53,
            "en_name": "POTASSIUM CHLORIDE",
            "fr_name": "Chlorure de potassium"
          },
          "ingredient_supplied_ind": "I",
          "strength": "150",
          "strength_unit": "MG",
          "strength_type": "",
          "dosage_value": "100",
          "dosage_unit": "ML",
          "base": false
        },
        {
          "id": 6959345,
          "ingredient": {
            "id": 16,
            "ingredient_code": 55,
            "en_name": "SODIUM CHLORIDE",
            "fr_name": "Chlorure de sodium"
          },
          "ingredient_supplied_ind": "I",
          "strength": "450",
          "strength_unit": "MG",
          "strength_type": "",
          "dosage_value": "100",
          "dosage_unit": "ML",
          "base": false
        },
        {
          "id": 6959344,
          "ingredient": {
            "id": 10,
            "ingredient_code": 42,
            "en_name": "DEXTROSE",
            "fr_name": "Dextrose"
          },
          "ingredient_supplied_ind": "I",
          "strength": "5",
          "strength_unit": "G",
          "strength_type": "",
          "dosage_value": "100",
          "dosage_unit": "ML",
          "base": false
        }
      ],
      "din": "00437999",
      "brand_name": "(20MMOL/L) POTASSIUM CHLORIDE IN 5% DEXTROSE AND 0.45% SODIUM CHLORIDE INJECTION USP",
      "brand_name_fr": "",
      "descriptor": "",
      "pediatric_flag": false,
      "accession_number": "69690",
      "number_of_ais": "3",
      "ai_group_no": "0300097005"
    }
  }
"""

In [0]:
from pysparketl.dataframes import flattenDF
sc = spark.sparkContext
# tsRDD = sc.parallelize([sample_json1])
tsRDD = sc.parallelize([data])
# Read your input dataframe that contains Json data
rawJsonDF = spark.read.json(tsRDD)
# Pass your nested json dataframe as input to the flattenDF function
flattenedDF = flattenDF(rawJsonDF)

display(flattenedDF)