<mark>_**Enter lakehouse name in the cell below**_</mark>

In [1]:
%%configure -f
{
    "defaultLakehouse": {  
        "name": "lakehouse03"
    }
}

StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, -1, Finished, Available, Finished)

<mark>_**Enter json file path in the cell below**_</mark>

In [2]:
#Enter json file path
filepath = "Files/json sample/product_sample.json"
tablenameprefix = "products"

StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 3, Finished, Available, Finished)

In [3]:
import hashlib
from pyspark.sql.functions import explode, col, explode_outer, udf, concat_ws
from delta.tables import DeltaTable
from pyspark.sql.types import ArrayType, StructType, StringType


StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 4, Finished, Available, Finished)

In [4]:
# Function to create hash key based on all column values
def create_hash_key(concatenated_values):
    return hashlib.sha256(concatenated_values.encode()).hexdigest()

# Register UDF
hash_key_udf = udf(create_hash_key, StringType())

StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 5, Finished, Available, Finished)

In [5]:
# Function to concatenate all non-array and non-struct column values into a single string
def concatenate_columns(df):    
    concatenated_columns = []
    for column in df.columns:
        if not isinstance(df.schema[column].dataType, (StructType, ArrayType)):
            concatenated_columns.append(col(column).cast(StringType()))
    return concat_ws("", *concatenated_columns)


StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 6, Finished, Available, Finished)

In [6]:
# Function to check if DataFrame has columns with data types other than ArrayType or StructType
def check_non_array_struct_column_types(df):
    for field in df.schema.fields:
        if not isinstance(field.dataType, (ArrayType, StructType)):
            return True
    return False


StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 7, Finished, Available, Finished)

In [7]:
#Check if the DataFrame has Parent tag, if not add one
from pyspark.sql.functions import struct, col

def checkOrAddParentTag(df, ParentTag):

    if len(df.columns) == 1:
        first_col = df.columns[0]
        first_col_type = dict(df.dtypes)[first_col]

        # If the single column is an array or struct, assume it's already wrapped
        if "array" in first_col_type or "struct" in first_col_type:
            df_with_parent = df
        else:
            # Not a nested structure, wrap it
            df_with_parent = df.select(struct([col(c) for c in df.columns]).alias(ParentTag))
    else:
        # Multiple top-level columns, wrap them
        df_with_parent = df.select(struct([col(c) for c in df.columns]).alias(ParentTag))

    return(df_with_parent)


StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 8, Finished, Available, Finished)

In [8]:
# Read JSON data
json_data = spark.read.option("multiline","true").json(filepath)
display(json_data)

StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5ad25ba9-e0c1-404c-912d-891c1dafcfb1)

In [9]:
#Check if the DataFrame has Parent tag, if not add one
json_data = checkOrAddParentTag(json_data,tablenameprefix)
#display(json_data)

StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 10, Finished, Available, Finished)

In [10]:
# Function to recursively extract nested JSON structures into separate DataFrames
# arguments to the function are
#   1. dataframe containing the json file
#   2. prefix to the table names (optional)
#   3. isParent - This is used to for a specific scenario while processing the nested json and is assigned during the function calls
#   4. nested_dfs - This dict contains dataframes corresponding to the nested json which will be coverted to delta tables in the end

def extract_nested_json(df, prefix="", isParent="", nested_dfs={}):
    fields = df.schema.fields
    display("*******")
    display("recursive function call")
    display("*******")
    #nested_dfs = {}
    #isParent = False

    for field in fields:
        field_name = field.name
        field_type = field.dataType
        display("*******")
        display(f"field_name: {field_name}")

        # If field type is Struct
        if isinstance(field_type, StructType):
            display("StructType")

            # Check for the phrase '_HashKey' in the column names and capture the actual column name
            hashkey_columns = [col_name for col_name in df.columns if "_HashKey" in col_name]

            # Check if the DataFrame has columns with '_HashKey' in the name and add if not present
            if hashkey_columns:
                for col_name in hashkey_columns:
                    if col_name in df.columns:
                        nested_df = df.filter(col(field_name).isNotNull())
                        nested_df = nested_df.select(col(col_name), col(field_name + ".*"))  
            
            else:

                nested_df = df.filter(col(field_name).isNotNull())
                nested_df = nested_df.select(col(field_name + ".*"))

            # This checks if HashKey needs to be created
            if check_non_array_struct_column_types(nested_df) or isParent:
                hashkey = concatenate_columns(nested_df)
                hashkey = hash_key_udf(hashkey)
            
                # Add hash key column to DataFrame
                nested_df = nested_df.withColumn(field_name+"_HashKey", hashkey)                 

            display(f"nested_df1: {nested_df}")
            display(nested_df)
            
            #Add the nested json/dataframe to the dict of dataframes
            nested_dfs[prefix + field_name] = nested_df
            nested_dfs.update(extract_nested_json(nested_df, prefix + field_name + "_",isParent=False, nested_dfs=nested_dfs))

        # If field type is Array   
        elif isinstance(field_type, ArrayType):
        #elif isinstance(field_type, ArrayType) and isinstance(field_type.elementType, StructType):    
            display("ArrayType")

            # Check for the phrase '_HashKey' in the column names and capture the actual column name
            hashkey_columns = [col_name for col_name in df.columns if "_HashKey" in col_name]

            # Check if the DataFrame has columns with '_HashKey' in the name and add if not present
            if hashkey_columns:
                for col_name in hashkey_columns:
                    if col_name in df.columns:
                        nested_df = df.filter(col(field_name).isNotNull())
                        nested_df = nested_df.select(col(col_name), explode(col(field_name)).alias(field_name))    

                # If there is struct type within the arrays, do further parsing before creating nested dataframe
                if isinstance(field_type.elementType, StructType): 
                    display("Array+Struct")
                    extract_nested_json(nested_df, prefix + field_name + "_", isParent=True,nested_dfs=nested_dfs)  

                else:
                    display("Array+normal cols")

                    nested_dfs[prefix + field_name] = nested_df
                    nested_dfs.update(extract_nested_json(nested_df, prefix + field_name + "_",isParent=False,nested_dfs=nested_dfs))

            # '_HashKey' not present
            else:
                display("Array no hashkey")

                nested_df = df.filter(col(field_name).isNotNull())
                nested_df = nested_df.select(explode(col(field_name)).alias(field_name))   

                # If there is struct type within the arrays, do further parsing before creating nested dataframe
                if isinstance(field_type.elementType, StructType): 
                    extract_nested_json(nested_df, prefix + field_name + "_", isParent=True,nested_dfs=nested_dfs)  

                else:
                    display("Array+normal cols no hashkey")

                    nested_dfs[prefix + field_name] = nested_df
                    nested_dfs.update(extract_nested_json(nested_df, prefix + field_name + "_",isParent=False,nested_dfs=nested_dfs))

    return nested_dfs



StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 11, Finished, Available, Finished)

In [11]:
# Extract nested JSON structures into separate DataFrames
nested_dfs = extract_nested_json(json_data)
#display("******Done********")

# Show the extracted DataFrames
#for name, nested_df in nested_dfs.items():
#    print(f"DataFrame: {name}")
#    nested_df.show()

StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 12, Finished, Available, Finished)

'*******'

'recursive function call'

'*******'

'*******'

'field_name: products'

'StructType'

'nested_df1: DataFrame[brand: string, category: string, currency: string, description: string, dimensions: struct<depth:string,height:string,weight:string,width:string>, images: array<string>, price: double, productId: bigint, productName: string, ratings: struct<averageRating:double,numberOfReviews:bigint>, reviews: array<struct<comment:string,rating:bigint,reviewId:bigint,userId:bigint,username:string>>, stock: struct<available:boolean,quantity:bigint>, variants: array<struct<color:string,price:double,screenSize:string,size:string,stockQuantity:bigint,variantId:string>>, products_HashKey: string]'

SynapseWidget(Synapse.DataFrame, 027cf799-2826-48ab-8f47-14c39bb7204e)

'*******'

'recursive function call'

'*******'

'*******'

'field_name: brand'

'*******'

'field_name: category'

'*******'

'field_name: currency'

'*******'

'field_name: description'

'*******'

'field_name: dimensions'

'StructType'

'nested_df1: DataFrame[products_HashKey: string, depth: string, height: string, weight: string, width: string, dimensions_HashKey: string]'

SynapseWidget(Synapse.DataFrame, e124d053-4870-4a86-ba9c-9e09885721d1)

'*******'

'recursive function call'

'*******'

'*******'

'field_name: products_HashKey'

'*******'

'field_name: depth'

'*******'

'field_name: height'

'*******'

'field_name: weight'

'*******'

'field_name: width'

'*******'

'field_name: dimensions_HashKey'

'*******'

'field_name: images'

'ArrayType'

'Array+normal cols'

'*******'

'recursive function call'

'*******'

'*******'

'field_name: products_HashKey'

'*******'

'field_name: images'

'*******'

'field_name: price'

'*******'

'field_name: productId'

'*******'

'field_name: productName'

'*******'

'field_name: ratings'

'StructType'

'nested_df1: DataFrame[products_HashKey: string, averageRating: double, numberOfReviews: bigint, ratings_HashKey: string]'

SynapseWidget(Synapse.DataFrame, 431686c6-9a0c-4160-bf28-91f345d8a39d)

'*******'

'recursive function call'

'*******'

'*******'

'field_name: products_HashKey'

'*******'

'field_name: averageRating'

'*******'

'field_name: numberOfReviews'

'*******'

'field_name: ratings_HashKey'

'*******'

'field_name: reviews'

'ArrayType'

'Array+Struct'

'*******'

'recursive function call'

'*******'

'*******'

'field_name: products_HashKey'

'*******'

'field_name: reviews'

'StructType'

'nested_df1: DataFrame[products_HashKey: string, comment: string, rating: bigint, reviewId: bigint, userId: bigint, username: string, reviews_HashKey: string]'

SynapseWidget(Synapse.DataFrame, e81ec26d-7fa1-47b8-8c8a-cc3c46b1c530)

'*******'

'recursive function call'

'*******'

'*******'

'field_name: products_HashKey'

'*******'

'field_name: comment'

'*******'

'field_name: rating'

'*******'

'field_name: reviewId'

'*******'

'field_name: userId'

'*******'

'field_name: username'

'*******'

'field_name: reviews_HashKey'

'*******'

'field_name: stock'

'StructType'

'nested_df1: DataFrame[products_HashKey: string, available: boolean, quantity: bigint, stock_HashKey: string]'

SynapseWidget(Synapse.DataFrame, fda3293d-819d-4942-9e33-cc230c9ba18b)

'*******'

'recursive function call'

'*******'

'*******'

'field_name: products_HashKey'

'*******'

'field_name: available'

'*******'

'field_name: quantity'

'*******'

'field_name: stock_HashKey'

'*******'

'field_name: variants'

'ArrayType'

'Array+Struct'

'*******'

'recursive function call'

'*******'

'*******'

'field_name: products_HashKey'

'*******'

'field_name: variants'

'StructType'

'nested_df1: DataFrame[products_HashKey: string, color: string, price: double, screenSize: string, size: string, stockQuantity: bigint, variantId: string, variants_HashKey: string]'

SynapseWidget(Synapse.DataFrame, 4094aa34-3568-4f7f-9b8b-3df724cfad60)

'*******'

'recursive function call'

'*******'

'*******'

'field_name: products_HashKey'

'*******'

'field_name: color'

'*******'

'field_name: price'

'*******'

'field_name: screenSize'

'*******'

'field_name: size'

'*******'

'field_name: stockQuantity'

'*******'

'field_name: variantId'

'*******'

'field_name: variants_HashKey'

'*******'

'field_name: products_HashKey'

In [12]:
# Create tables for the nested dataframes
for name, nested_df in nested_dfs.items():
    print(f"DataFrame: {name}")
    #display(nested_df)
    tablename=tablenameprefix+"_"+name
    nested_df.write.mode("overwrite").saveAsTable(tablename)

StatementMeta(, 18b46f53-9bf1-45cd-b919-f2cf275fe5ec, 13, Finished, Available, Finished)

DataFrame: products
DataFrame: products_dimensions
DataFrame: products_images
DataFrame: products_ratings
DataFrame: products_reviews_reviews
DataFrame: products_stock
DataFrame: products_variants_variants


<mark>_**Below is an optional code to drop the tables that were created. Convert the markdown to cell to execute it**_</mark>

# Drop tables
for name, nested_df in nested_dfs.items():
    tablename="user_"+name
    sqlquery="Drop Table "+tablename
    spark.sql(sqlquery)
    #nested_df.write.mode("overwrite").saveAsTable(tablename)