### Import relevant Libraries

In [1]:
from typing import Dict, List, Union, Callable
from datetime import datetime

from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import from_unixtime, col, udf, col, current_date, datediff, floor
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, LongType


StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 3, Finished, Available, Finished)

### Define the Sequence of Transformations

All the steps that is performed on the `JSON` data in the `user_data` folder is defined here along with a `sequence_mapper` decorator that helps track the current state of the transformations

In [2]:
sequence_map = {
    "Step 1": "Read the JSON data from Abfss filepath as Complex Dataframe\n",
    "Step 2": "Extracting payload\n",
    "Step 3": "Flatten the payload\n",
    "Step 4": "Creating input DataFrame\n",
    "Step 5": "Capitalize column names\n",
    "Step 6": "Convert Timestamp column to Date and/or Datetime\n",
    "Step 7": "Extract and Create Timezone Location and offset from GMT fields\n",
    "Step 8": "Create float field from GMT offset field\n",
    "Step 9": "Extract and Create the Ages field\n",
    "Step 10": "Reorder the columns and add Ingestion Date field\n",
    "Step 11": "Writing the cleaned dataframe to Delta Table"
}

def sequence_mapper(step_val: str):
    """
    A decorator that prints the current step description from a predefined sequence map.

    This decorator can be applied to functions to provide contextual information 
    about the processing steps being executed. It retrieves the step description 
    corresponding to the provided `step_val` and prints it before calling the 
    decorated function.

    Parameters:
    step_val (str): The key in the sequence_map that corresponds to the current 
                    processing step. It should match one of the predefined keys 
                    in the sequence_map.

    Returns:
    function: The wrapped function that will be executed after the step description 
              is printed.
    """

    def decorator(func):
        """
        A helper function that wraps the original function with additional behavior.

        This function takes the original function and adds the functionality of 
        printing the step description from the sequence_map.

        Parameters:
        func (function): The original function to be decorated.

        Returns:
        function: A wrapper function that includes the step description print 
                  functionality.
        """
        
        def wrapper(*args, **kwargs):
            print(sequence_map[step_val])
            result = func(*args, **kwargs)
            return result
        
        return wrapper
    
    return decorator


StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 4, Finished, Available, Finished)

### Function Definitions
This section defines all the functions used to perform the transformations. Each function has its own set of docstrings explaining what they do, along with the `sequence_mapper` decorator for tracking. The functions used here are:
1. **read_data**
2. **extract_payload**
3. **parse_payload**
4. **create_schema**
5. **create_dataframe_from_parsed_payload**
6. **capitalize_columns**
7. **parse_timestamps**
8. **split_timezone**
9. **timezone_hours_from_gmt**
10. **extract_age**
11. **final_cleanup**
12. **write_to_delta**

In [3]:
@sequence_mapper("Step 1")
def read_data(abfss_path: str)->DataFrame:
    """
    Reads JSON data from the specified ABFSS filepath into a Spark DataFrame.

    This function reads a JSON file from the specified Azure Blob Storage 
    File System (ABFSS) path and returns the data as a Spark DataFrame.

    Parameters:
    abfss_path (str): The path to the JSON file in the ABFSS.

    Returns:
    DataFrame: A Spark DataFrame containing the data read from the JSON file.
    """

    df = spark.read.option("multiline", "true").json(abfss_path)
    return df

abfss_path = "abfss://fabric_cdc@onelake.dfs.fabric.microsoft.com/cdc_lakehouse.Lakehouse/Files/user_folder"
df = read_data(abfss_path)
display(df)

StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 5, Finished, Available, Finished)

Read the JSON data from Abfss filepath as Complex Dataframe



SynapseWidget(Synapse.DataFrame, e6423d48-3f7f-4a44-9922-c42c3a71af0b)

In [4]:
@sequence_mapper("Step 2")
def extract_payload(df: DataFrame)-> List:
    """
    Extracts the payload from a Spark DataFrame by dropping unnecessary columns 
    and converting it to a list of dictionaries.

    This function removes the "schema" column from the DataFrame and converts 
    the resulting DataFrame to a list of dictionaries, where each dictionary 
    represents a record.

    Parameters:
        df (DataFrame): The input Spark DataFrame from which to extract the payload.

    Returns:
        List: A list of dictionaries containing the payload data, with each 
              dictionary corresponding to a record in the DataFrame.
    """

    payload_df = df.drop("schema")
    payload_array = payload_df.toPandas().to_dict(orient = "records")
    return payload_array

payload_array: List = extract_payload(df)
payload_array

StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 6, Finished, Available, Finished)

Extracting payload



[{'payload': {'after': {'address': '3973 دکتر لواسانی, ایلام, آذربایجان شرقی - Iran 86952',
    'cell': '0943-252-6704',
    'date_of_birth': 201271548020000,
    'email': 'yly.qsmy@example.com',
    'gender': 'male',
    'id': 1140,
    'insertion_time': 1727516957704587,
    'latitude': 15.2516,
    'longitude': -101.2421,
    'name': 'Mr ایلیا قاسمی',
    'phone': '023-98186430',
    'picture_url': 'https://randomuser.me/api/portraits/men/87.jpg',
    'registered_date': 1353356496219000,
    'timezone': 'Newfoundland (Offset -3:30)'},
   'before': None,
   'op': 'c',
   'source': {'connector': 'postgresql',
    'db': 'airflowdatabaseflexible',
    'lsn': 70934087296,
    'name': 'fabric_deb_conn',
    'schema': 'fabric',
    'sequence': '["70934086952","70934087296"]',
    'snapshot': 'false',
    'table': 'user_data',
    'ts_ms': 1727516958411,
    'txId': 293345,
    'version': '2.2.1.Final',
    'xmin': None},
   'transaction': None,
   'ts_ms': 1727516958995}},
 {'payload': {'a

In [5]:
@sequence_mapper("Step 3")
def parse_payload(payload_array: List):
    """
    Parses a list of payload dictionaries to extract main information.

    This function iterates over the provided list of payloads, attempting to 
    extract the "after" portion of the payload. If a KeyError occurs (i.e., 
    the expected structure is not present), the function will skip that payload.

    Parameters:
    payload_array (List): A list of dictionaries representing the payload data.

    Returns:
    List: A list of dictionaries containing the parsed main information from 
          each payload, with each dictionary representing the "after" data.
    """
    parsed_payload = []

    for payload in payload_array:
        try:
            main_info: Dict = payload["payload"]["after"]
        except KeyError:
            continue
        
        parsed_payload.append(main_info)

    return parsed_payload

parsed_payload: List = parse_payload(payload_array)
parsed_payload[0]

StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 7, Finished, Available, Finished)

Flatten the payload



{'address': '3973 دکتر لواسانی, ایلام, آذربایجان شرقی - Iran 86952',
 'cell': '0943-252-6704',
 'date_of_birth': 201271548020000,
 'email': 'yly.qsmy@example.com',
 'gender': 'male',
 'id': 1140,
 'insertion_time': 1727516957704587,
 'latitude': 15.2516,
 'longitude': -101.2421,
 'name': 'Mr ایلیا قاسمی',
 'phone': '023-98186430',
 'picture_url': 'https://randomuser.me/api/portraits/men/87.jpg',
 'registered_date': 1353356496219000,
 'timezone': 'Newfoundland (Offset -3:30)'}

In [6]:
def create_schema()-> StructType:
    """
    Creates and returns a Spark DataFrame schema.

    This function defines the structure of the DataFrame by specifying the 
    column names and their corresponding data types. The schema includes 
    fields for personal information such as name, gender, address, 
    and various timestamps.

    Returns:
    StructType: A StructType object representing the schema of the DataFrame, 
                which includes the following fields:
                - name (StringType)
                - gender (StringType)
                - id (StringType)
                - address (StringType)
                - timezone (StringType)
                - email (StringType)
                - phone (StringType)
                - cell (StringType)
                - date_of_birth (LongType)
                - registered_date (LongType)
                - picture_url (StringType)
                - insertion_time (LongType)
                - latitude (DoubleType)
                - longitude (DoubleType)
    """
    
    schema = StructType([
        StructField("name", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("id", StringType(), True),
        StructField("address", StringType(), True),
        StructField("timezone", StringType(), True),
        StructField("email", StringType(), True),
        StructField("phone", StringType(), True),
        StructField("cell", StringType(), True),
        StructField("date_of_birth", LongType(), True),
        StructField("registered_date", LongType(), True),
        StructField("picture_url", StringType(), True),
        StructField("insertion_time", LongType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True)
        ])

    return schema


@sequence_mapper("Step 4")
def create_dataframe_from_parsed_payload(parse_payload: List, schema_maker: Callable[[], DataFrame] = create_schema())-> DataFrame:
    """
    Creates a Spark DataFrame from a parsed payload using a specified schema.

    This function takes a list of parsed payloads and a callable schema maker 
    function to create a DataFrame. The schema maker should return a StructType 
    defining the structure of the DataFrame.

    Parameters:
    parse_payload (List): A list of dictionaries containing the parsed payload data.
    schema_maker (Callable[[], DataFrame]): A callable that returns a DataFrame schema.

    Returns:
    DataFrame: A Spark DataFrame created from the parsed payload and schema.
    """
    parsed_df = spark.createDataFrame(parsed_payload, schema_maker)
    return parsed_df

parsed_df = create_dataframe_from_parsed_payload(parsed_payload, create_schema())
display(parsed_df)

StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 8, Finished, Available, Finished)

Creating input DataFrame



SynapseWidget(Synapse.DataFrame, 55000faa-a037-4855-9e84-2d951f067850)

In [7]:
@sequence_mapper("Step 5")
def capitalize_columns(df: DataFrame)-> DataFrame:
    """
    Capitalizes the names of all columns in a Spark DataFrame.

    This function iterates over the columns of the input DataFrame and modifies 
    their names to be capitalized.

    Parameters:
    df (DataFrame): The input Spark DataFrame whose columns are to be capitalized.

    Returns:
    DataFrame: A new Spark DataFrame with capitalized column names.
    """

    for col in df.columns:
        df = df.toDF(*[col.capitalize() for col in df.columns])

    return df

transform_df: DataFrame = capitalize_columns(parsed_df)
display(transform_df)

StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 9, Finished, Available, Finished)

Capitalize column names



SynapseWidget(Synapse.DataFrame, 994161d0-bd7a-43f2-952e-c744978157c1)

In [8]:
@sequence_mapper("Step 6")
def parse_timestamp(df: DataFrame, column_name: str, date_only: bool = False, time_scale: int = 1_000_000_000)-> DataFrame:
    """
    Parses a timestamp column in a Spark DataFrame.

    This function updates the specified column in the DataFrame by converting 
    its values from a Unix timestamp format. If the `date_only` parameter is 
    set to True, the column will be cast to a date type.

    Parameters:
    df (DataFrame): The input Spark DataFrame containing the timestamp column.
    column_name (str): The name of the column to be parsed.
    date_only (bool, optional): If True, the column will be cast to date type. 
                                 Defaults to False.
    time_scale (int, optional): A factor to scale the timestamp values. 
                                 Defaults to 1,000,000,000.

    Returns:
    DataFrame: A Spark DataFrame with the updated timestamp column.
    """
    print(f"Timestamp column: {column_name}")
    # Update the existing column with the parsed timestamp
    df = df.withColumn(column_name, from_unixtime(col(column_name) / time_scale))
    
    if date_only:
        # Cast the updated column to date type
        df = df.withColumn(column_name, col(column_name).cast("date"))
    
    return df


transform_df: DataFrame = parse_timestamp(transform_df, "Insertion_time", date_only=False)
transform_df: DataFrame = parse_timestamp(transform_df, "Registered_date", date_only=True)
transform_df: DataFrame = parse_timestamp(transform_df, "Date_of_birth", date_only=True)

display(transform_df)

StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 10, Finished, Available, Finished)

Convert Timestamp column to Date and/or Datetime

Timestamp column: Insertion_time
Convert Timestamp column to Date and/or Datetime

Timestamp column: Registered_date
Convert Timestamp column to Date and/or Datetime

Timestamp column: Date_of_birth


SynapseWidget(Synapse.DataFrame, fddb611e-d73d-443d-90f0-4b9e5e2d28af)

In [9]:
@sequence_mapper("Step 7")
def split_timezone(df: DataFrame)-> DataFrame:
    """
    Splits the timezone column into separate location and offset columns.

    This function extracts the timezone location and offset from a 
    formatted timezone string in the DataFrame. The original timezone 
    column is removed after extraction.

    Parameters:
    df (DataFrame): The input Spark DataFrame containing the timezone column.

    Returns:
    DataFrame: A Spark DataFrame with the timezone split into separate 
                "Timezone_location" and "Timezone_offset" columns.
    """
    split_timezone = F.split(df["Timezone"], "\(Offset")
    

    df = df.withColumn("Timezone_location", split_timezone.getItem(0)) \
           .withColumn("Timezone_offset", split_timezone.getItem(1)) \

    remove_bracket = F.regexp_replace("Timezone_offset", r"\)", "")
    df = df.withColumn("Timezone_offset", remove_bracket)

    df = df.drop("Timezone")
    return df

transform_df: DataFrame = split_timezone(transform_df)
display(transform_df)


StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 11, Finished, Available, Finished)

Extract and Create Timezone Location and offset from GMT fields



SynapseWidget(Synapse.DataFrame, 110bba1e-a0da-41ff-8f00-f1e95b555bce)

In [10]:
@sequence_mapper("Step 8")
def timezone_hours_from_gmt(df: DataFrame, offset_column: str, output_col: str)->DataFrame:
    """
    Converts a GMT offset string to total hours and adds it as a new column.

    This function defines a helper function to calculate the total hours from a 
    GMT offset string (formatted as "+hh:mm" or "-hh:mm") and applies it to the 
    specified offset column. The result is added as a new column in the DataFrame.

    Parameters:
    df (DataFrame): The input Spark DataFrame containing the GMT offset column.
    offset_column (str): The name of the column with GMT offset strings.
    output_col (str): The name of the new column to store the total hours.

    Returns:
    DataFrame: A Spark DataFrame with an additional column containing the 
                total hours derived from the GMT offset.
    """
    
    def calculate_offset_in_hours(offset: str) -> float:
        # Determine the sign
        sign = 1.0 if offset.strip().startswith('+') else -1.0
        # Extract hours and minutes without the sign
        hours, minutes = map(int, offset.strip().strip("-").strip("+").split(':'))
        # Calculate total hours
        total_hours = sign * (hours + minutes/60.0)
        return total_hours

    # Register the UDF
    calculate_offset_udf = udf(calculate_offset_in_hours, DoubleType())

    # Apply the UDF to create a new column with the total minutes offset
    df = transform_df.withColumn(output_col, calculate_offset_udf(col(offset_column)))

    return df

transform_df: DataFrame = timezone_hours_from_gmt(transform_df, "Timezone_offset", "Hours_from_gmt")
display(transform_df)

StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 12, Finished, Available, Finished)

Create float field from GMT offset field



SynapseWidget(Synapse.DataFrame, a81331c9-4a58-42c3-ac78-61b85c009aa1)

In [11]:
@sequence_mapper("Step 9")
def extract_age(df: DataFrame, date_column: str)-> DataFrame:
    """
    Calculates the age based on a date column and adds it as a new column.

    This function converts the specified date column to a date type, calculates 
    the age in years based on the current date, and adds a new column named "Age" 
    to the DataFrame.

    Parameters:
    df (DataFrame): The input Spark DataFrame containing the date column.
    date_column (str): The name of the column to be used for age calculation.

    Returns:
    DataFrame: A Spark DataFrame with an additional "Age" column containing 
                the calculated ages.
    """
    
    df = df.withColumn(date_column, col(date_column).cast("date"))

    # Calculate the age
    df = df.withColumn("Age", floor(datediff(current_date(), col(date_column)) / 365.25))

    return df

transform_df: DataFrame = extract_age(transform_df, "Date_of_birth")
display(transform_df)

StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 13, Finished, Available, Finished)

Extract and Create the Ages field



SynapseWidget(Synapse.DataFrame, b206c616-3058-497c-a4f9-67637ed12b6b)

In [12]:
@sequence_mapper("Step 10")
def final_cleanup(df: DataFrame, cols: list)->DataFrame:
    """
    Performs final cleanup on the DataFrame by selecting specified columns 
    and adding an ingestion date.

    This function selects the columns specified in the `cols` list and adds a 
    new column named "Ingestion_Date" with the current timestamp.

    Parameters:
    df (DataFrame): The input Spark DataFrame to be cleaned.
    cols (list): A list of column names to retain in the cleaned DataFrame.

    Returns:
    DataFrame: A Spark DataFrame containing only the specified columns along 
                with the "Ingestion_Date" column.
    """

    df = df.select(*cols)

    df = df.withColumn("Ingestion_Date", F.lit(datetime.now()))
    return df


ordered_columns = ["Name", "Gender", "Id", "Age", "Date_of_birth", "Address", "Timezone_location", "Timezone_offset", "Hours_from_gmt",
 "Longitude", "Latitude", "Cell", "Phone", "Email", "Registered_date", "Picture_url", "Insertion_time"]

transform_df: DataFrame = final_cleanup(transform_df, ordered_columns)

display(transform_df)


StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 14, Finished, Available, Finished)

Reorder the columns and add Ingestion Date field



SynapseWidget(Synapse.DataFrame, ec0068ee-c110-4ee2-9c31-0d195c0a6148)

In [13]:
@sequence_mapper("Step 11")
def write_to_delta(*, df: DataFrame, table_name: str)-> None:
    """
    Writes the provided DataFrame to a specified Delta table.

    This function saves the input DataFrame to a Delta table with the 
    specified name, using the "overwrite" mode to replace any existing 
    data in the table.

    Parameters:
    df (DataFrame): The Spark DataFrame to be written to the Delta table.
    table_name (str): The name of the Delta table where the DataFrame 
                      will be saved.

    Returns:
    None: This function does not return a value but prints a confirmation 
          message upon successful writing to the table.
    """

    df.write.format("delta").mode("overwrite").saveAsTable(table_name)
    print(f"Data successfully written to {table_name}")

table_name = "user_data"
write_to_delta(df = transform_df, table_name = table_name)

StatementMeta(, ce70ca6f-88e6-4362-adf8-ab7e5f4124a9, 15, Finished, Available, Finished)

Writing the cleaned dataframe to Delta Table
Data successfully written to user_data


### Run sequence
Here, we apply all the functions we defined earlier. All transformations of `JSON` data from **Ingestion** to **Upload** to Delta Table is done in this sequence.

In [22]:
# Read the JSON files from the "user_folder"
abfss_path = "abfss://fabric_cdc@onelake.dfs.fabric.microsoft.com/cdc_lakehouse.Lakehouse/Files/user_folder"
df = read_data(abfss_path)

# Extract the payload from the input dataframe df
payload_array: List = extract_payload(df)

# Flatten the payload and create a dataframe with all the required fields
parsed_payload: List = parse_payload(payload_array)
parsed_df = create_dataframe_from_parsed_payload(parsed_payload, create_schema())

# Capitalize the column names
transform_df: DataFrame = capitalize_columns(parsed_df)

# Convert Insertion_time, Registered_date, and Date_of_birth fields from Timestamp to Date and/or Datetime values
transform_df: DataFrame = parse_timestamp(transform_df, "Insertion_time", date_only=False)
transform_df: DataFrame = parse_timestamp(transform_df, "Registered_date", date_only=True)
transform_df: DataFrame = parse_timestamp(transform_df, "Date_of_birth", date_only=True)

# Split the Timezone location from the Timezone offset and derive number of hours from GMT
transform_df: DataFrame = split_timezone(transform_df)
transform_df: DataFrame = timezone_hours_from_gmt(transform_df, "Timezone_offset", "Hours_from_gmt")

# Extract the Age from the Date_of_birth column
transform_df: DataFrame = extract_age(transform_df, "Date_of_birth")

# Reorder the columns and add an Ingestion_date.
ordered_columns = ["Name", "Gender", "Id", "Age", "Date_of_birth", "Address", "Timezone_location", "Timezone_offset", "Hours_from_gmt",
 "Longitude", "Latitude", "Cell", "Phone", "Email", "Registered_date", "Picture_url", "Insertion_time"]

transform_df: DataFrame = final_cleanup(transform_df, ordered_columns)

# Write the transformed DataFrame to Delta table
table_name = "user_data"
write_to_delta(df = transform_df, table_name = table_name)

StatementMeta(, e5546cb7-215e-4773-94bb-209ae0b541f6, 24, Finished, Available, Finished)

Read the JSON data from Abfss filepath as Complex Dataframe

Extracting payload

Flatten the payload

Creating input DataFrame

Capitalize column names

Convert Timestamp column to Date and/or Datetime

Timestamp column: Insertion_time
Convert Timestamp column to Date and/or Datetime

Timestamp column: Registered_date
Convert Timestamp column to Date and/or Datetime

Timestamp column: Date_of_birth
Extract and Create Timezone Location and offset from GMT fields

Create float field from GMT offset field

Extract and Create the Ages field

Reorder the columns and add Ingestion Date field

Writing the cleaned dataframe to Delta Table
Data successfully written to user_data


In [None]:
display(transform_df)

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, a3e1a4a9-9c7d-4717-8cd0-e7cbcfbbae98)