In [0]:
# Import necessary libraries for your ETL pipeline
from pyspark.sql import functions as F
import time

In [0]:

# Logging function to capture the start time of pipeline execution
def on_execution_start(pipeline_name):
    start_time = time.time()  # Capture the start time (timestamp)
    print(f"Pipeline {pipeline_name} started at {start_time}")
    return start_time

# Logging function to capture the end time of pipeline execution
def on_execution_end(start_time, pipeline_name):
    end_time = time.time()  # Capture the end time (timestamp)
    execution_duration = end_time - start_time  # Calculate the duration
    print(f"Pipeline {pipeline_name} completed in {execution_duration} seconds")

In [0]:
import uuid
def set_uuid():
    global unique_id
    unique_id = uuid.uuid4()
    print(str(unique_id))

In [0]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType

def combine_files():
    # Define the schema for `w.details`
    details_schema = StructType([
        StructField("path", StringType(), True)  # Assuming `details` contains a JSON with `path`
    ])

    # Run SQL Query to join all logs
    df = spark.sql("""
        SELECT 
            s.uuid AS `uuid`,
            s.source_file AS `source`,
            s.file_format,
            s.additional_info AS `read_parameters`,
            s.timestamp AS `read_timestamp`,
            t.transformation_name,
            t.details AS `modification_details`,
            t.timestamp AS `modification_timestamp`,
            w.action_name AS `write_action`,
            w.details AS `write_details`,
            w.timestamp AS `write_timestamp`     
        FROM delta.`/mnt/datalake/Pipeline1/source1/source_logs/` AS s 
        INNER JOIN delta.`/mnt/datalake/Pipeline1/transformation1/trans_logs/` AS t 
        ON s.uuid = t.uuid 
        INNER JOIN delta.`/mnt/datalake/Pipeline1/destination1/write_logs/` AS w 
        ON s.uuid = w.uuid
    """)

    # Convert `w.details` (JSON string) into a struct and extract `path`
    df = df.withColumn("destination", from_json(col("write_details"), details_schema).getField("path"))

    # Write the transformed data to Delta Lake
    df.write.mode("append").option("mergeSchema", "True").format("delta").save("/mnt/datalake/Pipeline1/lineage/lineage_logs/")

In [0]:
def combine_sql_file():
    from delta.tables import DeltaTable
    from pyspark.sql.functions import col

    # Read new data from SQL logs
    new_df = spark.read.format("delta").load("/mnt/datalake/Pipeline1/sql/sql_logs/")

    # Define the target path
    target_path = "/mnt/datalake/Pipeline1/lineage/lineage_logs/"

    # Check if the Delta table already exists
    if DeltaTable.isDeltaTable(spark, target_path):
        # Load the existing Delta table
        target_table = DeltaTable.forPath(spark, target_path)

        target_table.alias("target").merge(
            new_df.alias("source"),
            "target.uuid = source.uuid" 
        ).whenNotMatchedInsert(values={
            "source": col("source.source_tables"),
            "modification_details": col("source.query"),
            "modification_timestamp": col("source.timestamp"),
            "destination": col("source.destination_tables") 
        }).execute()

    else:
        # If the table doesn't exist, create it
        new_df.write.format("delta").mode("overwrite").save(target_path)


In [0]:
from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
import os
from datetime import datetime
import uuid

# Global variable to store log directory path
logs_dir = "/mnt/datalake/source_logs/"  # Default value

# Function to set logs directory path dynamically
def set_source_logs_dir(path):
    global logs_dir
    logs_dir = path
    print(f"Logs directory set to: {logs_dir}")

# Wrapper class around the DataFrameReader to intercept file read operations
class CustomDataFrameReader(DataFrameReader):
    def __init__(self, spark: SparkSession):
        super().__init__(spark)
        self.spark = spark

    def _log_source_details(self, file_path, file_format, additional_info=None):
        global logs_dir

        # Ensure the logs directory exists (create if not present)
        try:
            # Check if the directory exists, create if not
            if not any(dbutils.fs.ls(logs_dir)):
                dbutils.fs.mkdirs(logs_dir)  # Create the directory if it doesn't exist
            print(f"Log directory {logs_dir} is ready.")
        except Exception as e:
            print(f"Error while creating directory {logs_dir}: {e}")
        
        
        # Metadata to log
        metadata = {
            'uuid': str(unique_id),
            'source_file': file_path,
            'file_format': file_format,
            'additional_info': str(additional_info) if additional_info else '',
            'timestamp': str(datetime.now())
        }
        
        # Create a DataFrame for the source log and write it to Delta
        lineage_df = self.spark.createDataFrame([metadata])
        
        try:
            lineage_df.write.format("delta").mode("append").save(logs_dir)
            print(f"Source log successfully written for {file_path} to {logs_dir}")
        except Exception as e:
            print(f"Error while writing to {logs_dir}: {e}")

    def csv(self, path, **options):
        """Overriding the CSV read method to include logging"""
        # Log source details
        self._log_source_details(path, 'csv', options)
        
        # Call the original CSV read method
        return super().csv(path, **options)

    def parquet(self, path, **options):
        """Overriding the Parquet read method to include logging"""
        # Log source details
        self._log_source_details(path, 'parquet', options)
        
        # Call the original Parquet read method
        return super().parquet(path, **options)

    def format(self, source_format):
        """Overriding format method to capture format type"""
        self._current_format = source_format
        return super().format(source_format)
    
    def load(self, path=None, **options):
        """Overriding load method to log details before reading"""
        file_format = getattr(self, "_current_format", "unknown")
        if path:
            self._log_source_details(path, file_format, options)
        return super().load(path, **options)

# Override the default DataFrameReader with our custom one
def wrap_spark_reader(spark: SparkSession):
    spark._read = CustomDataFrameReader(spark)
    return spark

# Initialize the SparkSession and apply the wrapper
spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()
wrap_spark_reader(spark)

In [0]:
from pyspark.sql import DataFrame
from datetime import datetime
import pyspark.sql.functions as F

# Global variable to store log directory path
transformation_logs_dir = "/mnt/datalake/trans_logs/"  # Default value

# Function to set logs directory path dynamically
def set_trans_logs_dir(path):
    global transformation_logs_dir
    transformation_logs_dir = path
    print(f"Logs directory set to: {transformation_logs_dir}")

# Function to log transformations
# Global variable to track if the directory has been created
log_directory_created = False  

def log_transformation(transformation_name, details):
    global transformation_logs_dir, log_directory_created

    if not log_directory_created:
        try:
            dbutils.fs.ls(transformation_logs_dir)  # Check if directory exists
        except Exception:
            dbutils.fs.mkdirs(transformation_logs_dir)  # Create if not exists
            print(f"Log directory {transformation_logs_dir} is ready.")
        log_directory_created = True  # Mark as created to avoid repeated checks

    metadata = {
        'uuid': str(unique_id),
        'transformation_name': transformation_name,
        'details': str(details),
        'timestamp': str(datetime.now())
    }
    lineage_df = spark.createDataFrame([metadata])
    lineage_df.write.format("delta").mode("append").save(transformation_logs_dir)


# Class to log and wrap DataFrame transformations
class CapturingDataFrame:
    def __init__(self, spark_df, pipeline_name):
        self._df = spark_df  # Store the original DataFrame
        self.pipeline_name = pipeline_name  # Store the pipeline name

    def _log_and_apply(self, transformation_name, *args, **kwargs):
        """Helper function to log and apply transformations."""
        log_transformation(transformation_name, {"args": args, "kwargs": kwargs})
        return getattr(self._df, transformation_name)(*args, **kwargs)

    # Overriding DataFrame transformations
    def filter(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("filter", *args, **kwargs), self.pipeline_name)

    def groupBy(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("groupBy", *args, **kwargs), self.pipeline_name)

    def agg(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("agg", *args, **kwargs), self.pipeline_name)

    def withColumnRenamed(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("withColumnRenamed", *args, **kwargs), self.pipeline_name)

    def select(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("select", *args, **kwargs), self.pipeline_name)

    def join(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("join", *args, **kwargs), self.pipeline_name)

    def withColumn(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("withColumn", *args, **kwargs), self.pipeline_name)

    def drop(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("drop", *args, **kwargs), self.pipeline_name)

    def dropDuplicates(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("dropDuplicates", *args, **kwargs), self.pipeline_name)

    def distinct(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("distinct", *args, **kwargs), self.pipeline_name)

    def orderBy(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("orderBy", *args, **kwargs), self.pipeline_name)

    def limit(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("limit", *args, **kwargs), self.pipeline_name)

    def cache(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("cache", *args, **kwargs), self.pipeline_name)

    def persist(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("persist", *args, **kwargs), self.pipeline_name)

    def repartition(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("repartition", *args, **kwargs), self.pipeline_name)

    def coalesce(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("coalesce", *args, **kwargs), self.pipeline_name)

    def fillna(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("fillna", *args, **kwargs), self.pipeline_name)

    def dropna(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("dropna", *args, **kwargs), self.pipeline_name)

    def transform(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("transform", *args, **kwargs), self.pipeline_name)

    def alias(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("alias", *args, **kwargs), self.pipeline_name)

    def crossJoin(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("crossJoin", *args, **kwargs), self.pipeline_name)

    def union(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("union", *args, **kwargs), self.pipeline_name)

    def unionByName(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("unionByName", *args, **kwargs), self.pipeline_name)

    def intersect(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("intersect", *args, **kwargs), self.pipeline_name)

    def exceptAll(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("exceptAll", *args, **kwargs), self.pipeline_name)

    def intersectAll(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("intersectAll", *args, **kwargs), self.pipeline_name)

    def repartitionByRange(self, *args, **kwargs):
        return CapturingDataFrame(self._log_and_apply("repartitionByRange", *args, **kwargs), self.pipeline_name)

    # Proxy methods for other DataFrame operations can be added as needed
    def __getattr__(self, attr):
        """For any other DataFrame methods that are not explicitly wrapped, 
        use the original DataFrame methods."""
        return getattr(self._df, attr)

# Wrapper function to capture the initial DataFrame and pipeline name
def wrap_dataframe(df, pipeline_name):
    return CapturingDataFrame(df, pipeline_name)

In [0]:
from pyspark.sql import DataFrame
from datetime import datetime
import pyspark.sql.functions as F

# Global variables
write_logs_dir = "/mnt/datalake/write_logs/"  # Default log directory
logs_initialized = False  # Flag to track directory initialization

# Function to set logs directory path dynamically
def set_write_logs_dir(path):
    global write_logs_dir, logs_initialized
    write_logs_dir = path
    logs_initialized = False  # Reset the flag when path changes
    print(f"Logs directory set to: {write_logs_dir}")

# Function to initialize log directory (only once)
def initialize_log_directory():
    global logs_initialized
    if logs_initialized:
        return  # Skip if already initialized

    try:
        # Check if the directory exists, create if not
        if not any(dbutils.fs.ls(write_logs_dir)):
            dbutils.fs.mkdirs(write_logs_dir)  # Create the directory if it doesn't exist
            print(f"Log directory {write_logs_dir} created.")
        else:
            print(f"Log directory {write_logs_dir} already exists.")
    except Exception as e:
        print(f"Error while creating directory {write_logs_dir}: {e}")

    logs_initialized = True  # Set flag to prevent redundant execution

# Function to log write actions
def log_write_action(action_name, details):
    initialize_log_directory()  # Ensure the log directory is initialized only once

    metadata = {
        'uuid': str(unique_id),
        'action_name': action_name,
        'details': str(details),
        'timestamp': str(datetime.now())
    }

    # Log into a Delta table (or any other logging system)
    lineage_df = spark.createDataFrame([metadata])
    lineage_df.write.format("delta").mode("append").save(write_logs_dir)

# Class to wrap DataFrame and capture write actions
class CapturingWriteDataFrame:
    def __init__(self, spark_df, pipeline_name):
        self._df = spark_df
        self.pipeline_name = pipeline_name  # Store the pipeline name
        self._write_mode = None  # This will hold the write mode
        self._write_path = None  # This will hold the write path

    def _log_write_and_apply(self, action_name, *args, **kwargs):
        """Helper function to log write actions."""
        log_write_action(action_name, {"args": args, "kwargs": kwargs, "path": self._write_path})
        # Perform the actual write operation
        return getattr(self._df, action_name)(*args, **kwargs)

    def save(self, *args, **kwargs):
        """Override the save method to log the write operation."""
        return self._log_write_and_apply("save", *args, **kwargs)

    def insertInto(self, *args, **kwargs):
        """Override insertInto method for tables to log the write operation."""
        return self._log_write_and_apply("insertInto", *args, **kwargs)

    # Handle 'mode' method separately to capture the write mode
    def mode(self, mode):
        """Override mode method to capture the write mode."""
        self._write_mode = mode
        return self  # Return the current instance to allow chaining

    # Capture the path (location) where the data is being written
    def path(self, path):
        """Method to set the write path and log it."""
        self._write_path = path
        return self  # Return self to allow method chaining

    # Capture path for the specific write functions (parquet, csv, etc.)
    def _capture_path(self, *args, **kwargs):
        """Capture path from any arguments or kwargs passed."""
        if args:
            self._write_path = args[0]  # Capture from args if it's the first argument
        elif 'path' in kwargs:
            self._write_path = kwargs['path']

    # Intercept and capture paths for all specific write functions like parquet, csv, etc.
    def parquet(self, *args, **kwargs):
        """Override parquet write method to capture the path."""
        self._capture_path(*args, **kwargs)
        return self._log_write_and_apply("parquet", *args, **kwargs)

    def csv(self, *args, **kwargs):
        """Override csv write method to capture the path."""
        self._capture_path(*args, **kwargs)
        return self._log_write_and_apply("csv", *args, **kwargs)

    def json(self, *args, **kwargs):
        """Override json write method to capture the path."""
        self._capture_path(*args, **kwargs)
        return self._log_write_and_apply("json", *args, **kwargs)

    # New `line_write` method to accept and log path
    def line_write(self, path=None, *args, **kwargs):
        """Custom write method to handle DataFrame write operations with mode."""
        # If path is provided explicitly, use it
        if path:
            self._write_path = path
        
        # Log the write action before actually writing the data
        log_write_action("line_write", {
            "mode": self._write_mode, 
            "df_schema": str(self._df.schema),
            "path": self._write_path
        })
        
        writer = self._df.write
        if self._write_mode:
            writer = writer.mode(self._write_mode)

        # Check if the path is provided or set previously
        if self._write_path:
            # Log the actual write operation with path
            return writer.parquet(self._write_path, *args, **kwargs)  # Assuming parquet, but can be adjusted for other formats
        else:
            return writer.parquet(*args, **kwargs)  # If no path, still write without a path

    # Proxy methods for other DataFrame operations (excluding transformations)
    def __getattr__(self, attr):
        """For any other DataFrame methods that are not explicitly wrapped, 
        use the original DataFrame methods."""
        return getattr(self._df, attr)

# Wrapper function to capture the initial DataFrame and pipeline name
def wrap_write_dataframe(df, pipeline_name):
    return CapturingWriteDataFrame(df, pipeline_name)

In [0]:
import re
import uuid
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Define log directory for combined source and destination logs
log_dir = "/mnt/datalake/sql_activity_logs/"  # Log path for both source and destination

# Initialize the Spark session
spark = SparkSession.builder.appName("SQLLogger").getOrCreate()

def set_sql_logs_dir(path):
    global log_dir
    log_dir = path
    print(f'Sql logs dir set to: {log_dir}')

def initialize_log_directory():
    """
    Initializes log directory for both source and destination tables if it doesn't exist.
    """
    try:
        # Check if the directory exists by listing the parent directory
        dbutils.fs.mkdirs(log_dir)  # This creates the directory if it doesn't exist
        print(f"Directory created or already exists: {log_dir}")
    except Exception as e:
        print(f"Error while creating directory {log_dir}: {str(e)}")

def log_sql_activity(query, source_tables, destination_tables):
    """
    Log SQL activity, including both source and destination tables, and the query that was executed.
    """
    # Generate a unique UUID for the current query
    uuid_for_query = str(uuid.uuid4())
    timestamp = str(datetime.now())
    
    # Combine source and destination tables
    log_metadata = [{
        'uuid': uuid_for_query,
        'query': query,
        'source_tables': source_tables,
        'destination_tables': destination_tables,
        'timestamp': timestamp
    }]
    
    # Define schema explicitly
    schema = StructType([
        StructField("uuid", StringType(), True),
        StructField("query", StringType(), True),
        StructField("source_tables", StringType(), True),
        StructField("destination_tables", StringType(), True),
        StructField("timestamp", StringType(), True)
    ])
    
    # Create DataFrame with explicit schema
    log_df = spark.createDataFrame(log_metadata, schema)
    
    # Write log information to a Delta table (append mode)
    log_df.write.format("delta").mode("append").save(log_dir)


def execute(query):
    initialize_log_directory()
    return SQLExecutor(spark).execute_sql(query)

class SQLExecutor:
    def __init__(self, spark: SparkSession):
        self.spark = spark

    def execute_sql(self, query):
        """
        Execute the SQL query, track source and destination tables, and log the activity.
        """
        # Extract source and destination tables from the query
        source_tables, destination_tables = self._extract_tables_from_query(query)
        
        # If no destination tables found (like in UPDATE queries), log as "unknown"
        if not destination_tables:
            destination_tables = ["unknown"]
        
        # Log the source and destination tables along with the query
        log_sql_activity(query, source_tables, destination_tables)
        
        # Execute the SQL query and return the result
        return self.spark.sql(query)

    def _extract_tables_from_query(self, query):
        """
        Extract source and destination tables from the SQL query using regular expressions.
        """
        # Regex to find tables mentioned in 'FROM' or 'JOIN'
        source_tables = re.findall(r'FROM\s+([a-zA-Z0-9_]+)|JOIN\s+([a-zA-Z0-9_]+)', query)
        source_tables = [table for sublist in source_tables for table in sublist if table]

        # Regex to find tables mentioned in 'INSERT INTO' or 'CREATE TABLE AS' or 'CREATE TABLE IF NOT EXISTS'
        destination_tables = re.findall(r'(?:INSERT\s+INTO|CREATE\s+TABLE(?:\s+IF\s+NOT\s+EXISTS)?)\s+([a-zA-Z0-9_]+)|CREATE\s+TABLE\s+AS\s+([a-zA-Z0-9_]+)', query)
        destination_tables = [table for sublist in destination_tables for table in sublist if table]

        # For UPDATE or other queries where destination is not specified, mark as 'unknown'
        if 'UPDATE' in query:
            destination_tables = []

        return source_tables, destination_tables


In [0]:
#set the path as required by user
def set_log_path(source = "/mnt/datalake/source_logs/",
                 trans = "/mnt/datalake/trans_logs/",
                 dest = "/mnt/datalake/write_logs/",
                 sql =  "/mnt/datalake/sql_activity_logs/"):
    set_source_logs_dir(source)
    set_trans_logs_dir(trans)
    set_write_logs_dir(dest)
    set_sql_logs_dir(sql)

In [0]:
#Read the object and return the wrapped dataframe
class DefObject:
    def __init__(self, otype, pipeline_name):
        self.otype = otype
        self.pipeline_name = pipeline_name
        set_uuid()

    def read_object(self, query = None, file_path = None, file_type = None, **options):
        if self.otype.lower() == "file":
            if file_type.lower() == "csv":
                odf = spark._read.csv(file_path, **options)  # Use _read to access the custom reader
                df = wrap_dataframe(odf, self.pipeline_name) # wrap dataframe
                return df

            elif file_type.lower() == "parquet":
                odf = spark._read.parquet(file_path, **options)  # Use _read to access the custom reader
                df = wrap_dataframe(odf, self.pipeline_name) # wrap dataframe
                return df

            elif file_type.lower() == "avro":
                odf = spark._read.format("avro").load(file_path, **options)  # Use _read to access the custom reader
                df = wrap_dataframe(odf, self.pipeline_name) # wrap dataframe
                return df

        elif self.otype.lower() == "table":
            execute(query)

In [0]:
class WriteObject:
    def __init__(self, save_type, pipeline_name):
        self.pipeline_name = pipeline_name
        self.save_type = save_type

    def write_object(self, df, file_path, mode):
        if self.save_type.lower() == "file":
            # Write to destination (this will automatically log the write action)
            df_write = wrap_write_dataframe(df, self.pipeline_name)
            df_write.mode(mode).line_write(file_path)

            print(f"File written successfully at path: {file_path} with {mode} mode")
            combine_files()

        elif self.save_type.lower() == "table":
            pass



In [0]:
def etl_sql_pipeline():
    # Log the start of the pipeline execution
    start_time = on_execution_start("First_Pipeline")

    set_log_path(sql = "/mnt/datalake/Pipeline1/sql/sql_logs/")

    sql_query =  """
                CREATE TABLE IF NOT EXISTS employee_table5 (
                    id INT,
                    name STRING,
                    department STRING,
                    salary INT
                )
                USING DELTA;
                """
    
    insert_data_sql = """
                INSERT INTO employee_table5 VALUES
                (1, 'John Doe', 'Engineering', 1000),
                (2, 'Jane Smith', 'Marketing', 1200),
                (3, 'Alice Johnson', 'Finance', 1400),
                (4, 'Bob Brown', 'Engineering', 1300),
                (5, 'Charlie White', 'Sales', 1100);
                """

    select_query = """
                    SELECT * FROM employee_table5;
                   """

    DefObject("table", "First Pipeline").read_object(sql_query)
    DefObject("table", "First Pipeline").read_object(insert_data_sql)
    DefObject("table", "First Pipeline").read_object(select_query)

    # Log the end of pipeline execution
    on_execution_end(start_time, "First_Pipeline")

etl_sql_pipeline()

Pipeline First_Pipeline started at 1743498807.9519837
Logs directory set to: /mnt/datalake/source_logs/
Logs directory set to: /mnt/datalake/trans_logs/
Logs directory set to: /mnt/datalake/write_logs/
Sql logs dir set to: /mnt/datalake/Pipeline1/sql/sql_logs/
e253a561-602b-4bdc-b5f4-2b613096c789
Directory created or already exists: /mnt/datalake/Pipeline1/sql/sql_logs/


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1711996001619652>:37[0m
[1;32m     34[0m     [38;5;66;03m# Log the end of pipeline execution[39;00m
[1;32m     35[0m     on_execution_end(start_time, [38;5;124m"[39m[38;5;124mFirst_Pipeline[39m[38;5;124m"[39m)
[0;32m---> 37[0m etl_sql_pipeline()

File [0;32m<command-1711996001619652>:30[0m, in [0;36metl_sql_pipeline[0;34m()[0m
[1;32m     17[0m insert_data_sql [38;5;241m=[39m [38;5;124m"""[39m
[1;32m     18[0m [38;5;124m            INSERT INTO employee_table5 VALUES[39m
[1;32m     19[0m [38;5;124m            (1, [39m[38;5;124m'[39m[38;5;124mJohn Doe[39m[38;5;124m'[39m[38;5;124m, [39m[38;5;124m'[39m[38;5;124mEngineering[39m[38;5;124m'[39m[38;5;124m, 1000),[39m
[0;32m   (...)[0m
[1;32m     23[0m [38;5;124m            (5, [39m[38;5;124m'[39m

In [0]:
def run_etl_pipeline():
    # Log the start of the pipeline execution
    start_time = on_execution_start("First_Pipeline")

    set_log_path(source = "/mnt/datalake/Pipeline1/source1/source_logs/", 
                 trans = "/mnt/datalake/Pipeline1/transformation1/trans_logs/", 
                 dest = "/mnt/datalake/Pipeline1/destination1/write_logs/") #if they do not call this function default log files will be used

    df = DefObject("file", "First_Pipeline").read_object(file_path = 'dbfs:/FileStore/files/source_data.csv', 
                                                          file_type = "csv", header = True)

    # Apply transformations
    df = df.filter("amount > 100")  # Filter step
    df = df.groupBy("region").agg(F.sum("amount").alias("total_Profit"))  # Group and aggregate
    df = df.withColumnRenamed("total_profit", "sum_profit")  # Rename column

    # Additional transformations
    df = df.orderBy("sum_profit", ascending=False)  # Order by sum_amount
    df = df.distinct()  # Remove duplicate rows
    df = df.fillna({"sum_profit": 0})  # Fill missing values in the sum_amount column
    df = df.withColumn("new_column", F.lit("static_value"))  # Add a new column with a static value

    # Join with another DataFrame (create a sample DataFrame for the join)
    df_other = spark.createDataFrame([("North", 50), ("South", 30)], ["region", "new_data"])

    # Perform the join before dropping the region column
    df = df.join(df_other, on="region", how="left")  # Perform a left join on the region column

    # Drop the region column after the join
    df = df.drop("region")  # Drop the region column

    WriteObject("file", "First_Pipeline").write_object(df, file_path = "/mnt/datalake/output/answer.parquet", mode = "overwrite")
    
    # Log the end of pipeline execution
    on_execution_end(start_time, "First_Pipeline")

# Run the pipeline
run_etl_pipeline()

Pipeline First_Pipeline started at 1743498829.173399
Logs directory set to: /mnt/datalake/Pipeline1/source1/source_logs/
Logs directory set to: /mnt/datalake/Pipeline1/transformation1/trans_logs/
Logs directory set to: /mnt/datalake/Pipeline1/destination1/write_logs/
Sql logs dir set to: /mnt/datalake/sql_activity_logs/
82d9db3c-acea-4b91-a14a-da8c33e1930d
Log directory /mnt/datalake/Pipeline1/source1/source_logs/ is ready.
Source log successfully written for dbfs:/FileStore/files/source_data.csv to /mnt/datalake/Pipeline1/source1/source_logs/
Directory created or already exists: /mnt/datalake/sql_activity_logs/
File written successfully at path: /mnt/datalake/output/answer.parquet with overwrite mode
Pipeline First_Pipeline completed in 80.50483441352844 seconds


In [0]:
def run_etl_pipeline2():
    # Log the start of the pipeline execution
    start_time = on_execution_start("First_Pipeline")

    set_log_path(source = "/mnt/datalake/Pipeline1/source1/source_logs/", 
                 trans = "/mnt/datalake/Pipeline1/transformation1/trans_logs/", 
                 dest = "/mnt/datalake/Pipeline1/destination1/write_logs/") #if they do not call this function default log files will be used

    df = DefObject("file", "First_Pipeline").read_object(file_path = 'dbfs:/FileStore/files/mtcars.parquet', 
                                                          file_type = "parquet")

    # Apply transformations
    df = df.filter("cyl > 5")  # Filter step

    df = df.distinct()  # Remove duplicate rows

    # Drop the region column after the join
    df = df.drop("vs")  # Drop the region column

    WriteObject("file", "First_Pipeline").write_object(df, file_path = "/mnt/datalake/output/Parquetanswer.parquet", mode = "overwrite")

    df2 = DefObject("file", "First_Pipeline").read_object(file_path = 'dbfs:/tmp/sample_data.avro', 
                                                        file_type = "avro", header = True)

    # Apply transformations
    df2 = df2.filter("Age> 5")  # Filter step

    df2 = df2.distinct()  # Remove duplicate rows

    # Drop the region column after the join
    df2 = df2.drop("JoiningDate")  # Drop the region column

    # Write to destination (e.g., parquet file)
    # df.write.mode("overwrite").parquet("/mnt/datalake/output.parquet")

    WriteObject("file", "First_Pipeline").write_object(df2, file_path = "/mnt/datalake/output/Avroanswer.parquet", mode = "overwrite")

    
    # Log the end of pipeline execution
    on_execution_end(start_time, "First_Pipeline")

# Run the pipeline
run_etl_pipeline2()

Pipeline First_Pipeline started at 1743062814.3082209
Logs directory set to: /mnt/datalake/Pipeline1/source1/source_logs/
Logs directory set to: /mnt/datalake/Pipeline1/transformation1/trans_logs/
Logs directory set to: /mnt/datalake/Pipeline1/destination1/write_logs/
Sql logs dir set to: /mnt/datalake/sql_activity_logs/
2706e19d-44f7-414c-aecc-6ff1a614351d
Log directory /mnt/datalake/Pipeline1/source1/source_logs/ is ready.
Source log successfully written for dbfs:/FileStore/files/mtcars.parquet to /mnt/datalake/Pipeline1/source1/source_logs/
Directory created or already exists: /mnt/datalake/sql_activity_logs/
File written successfully at path: /mnt/datalake/output/Parquetanswer.parquet with overwrite mode
0679e0e6-b0b7-45d5-82e3-0d22f8d9686f
Log directory /mnt/datalake/Pipeline1/source1/source_logs/ is ready.
Source log successfully written for dbfs:/tmp/sample_data.avro to /mnt/datalake/Pipeline1/source1/source_logs/
Directory created or already exists: /mnt/datalake/sql_activity_l

In [0]:
#Source

# Read lineage logs as a Delta table
lineage_df = spark.read.format("delta").load("/mnt/datalake/Pipeline1/source1/source_logs/")

# Show all rows without truncation
lineage_df.sort(F.col("timestamp").desc()).display(truncate=False, n=lineage_df.count())

additional_info,file_format,source_file,timestamp,uuid
{'header': True},avro,dbfs:/tmp/sample_data.avro,2025-03-27 08:07:38.735051,0679e0e6-b0b7-45d5-82e3-0d22f8d9686f
,parquet,dbfs:/FileStore/files/mtcars.parquet,2025-03-27 08:06:54.407705,2706e19d-44f7-414c-aecc-6ff1a614351d
{'header': True},csv,dbfs:/FileStore/files/source_data.csv,2025-03-27 08:05:19.426137,f4694785-cdcd-4087-b193-0ba735363acb
{'header': True},csv,dbfs:/FileStore/files/source_data.csv,2025-03-26 10:06:30.070534,e97c3ac8-38be-44a2-aa98-9a545a6935ac
{'header': True},csv,dbfs:/FileStore/files/source_data.csv,2025-03-26 10:05:52.917329,a0306fc0-74c7-4274-8ce8-dd19f09359ad
{'header': True},avro,dbfs:/tmp/sample_data.avro,2025-03-26 09:22:09.736725,20bdfccc-fa44-4218-a391-1019371af7d9
,parquet,dbfs:/FileStore/files/mtcars.parquet,2025-03-26 09:21:42.563067,596b60d8-ffe2-410d-a830-176df59d10c7
{'header': True},csv,dbfs:/FileStore/files/source_data.csv,2025-03-26 09:20:54.757777,8cebedad-0008-4f4c-9ea3-b0c936237063


In [0]:
#Transformation

# Read lineage logs as a Delta table
lineage_df = spark.read.format("delta").load("/mnt/datalake/Pipeline1/transformation1/trans_logs/")

# Show all rows without truncation
lineage_df.sort(F.col("timestamp").desc()).display(truncate=False, n=lineage_df.count())

details,timestamp,transformation_name,uuid
"{'args': ('JoiningDate',), 'kwargs': {}}",2025-03-27 08:07:52.069528,drop,0679e0e6-b0b7-45d5-82e3-0d22f8d9686f
"{'args': (), 'kwargs': {}}",2025-03-27 08:07:48.294226,distinct,0679e0e6-b0b7-45d5-82e3-0d22f8d9686f
"{'args': ('Age> 5',), 'kwargs': {}}",2025-03-27 08:07:43.698757,filter,0679e0e6-b0b7-45d5-82e3-0d22f8d9686f
"{'args': ('vs',), 'kwargs': {}}",2025-03-27 08:07:08.096207,drop,2706e19d-44f7-414c-aecc-6ff1a614351d
"{'args': (), 'kwargs': {}}",2025-03-27 08:07:03.627471,distinct,2706e19d-44f7-414c-aecc-6ff1a614351d
"{'args': ('cyl > 5',), 'kwargs': {}}",2025-03-27 08:06:59.482955,filter,2706e19d-44f7-414c-aecc-6ff1a614351d
"{'args': ('region',), 'kwargs': {}}",2025-03-27 08:06:18.256485,drop,f4694785-cdcd-4087-b193-0ba735363acb
"{'args': (DataFrame[region: string, new_data: bigint],), 'kwargs': {'on': 'region', 'how': 'left'}}",2025-03-27 08:06:14.416994,join,f4694785-cdcd-4087-b193-0ba735363acb
"{'args': ('new_column', Column<'static_value'>), 'kwargs': {}}",2025-03-27 08:06:09.872729,withColumn,f4694785-cdcd-4087-b193-0ba735363acb
"{'args': ({'sum_profit': 0},), 'kwargs': {}}",2025-03-27 08:06:03.098294,fillna,f4694785-cdcd-4087-b193-0ba735363acb


In [0]:
#Destination

# Read lineage logs as a Delta table
lineage_df = spark.read.format("delta").load("/mnt/datalake/Pipeline1/destination1/write_logs/")

# Show all rows without truncation
lineage_df.sort(F.col("timestamp").desc()).display(truncate=False, n=lineage_df.count())

action_name,details,timestamp,uuid
line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('ID', IntegerType(), True), StructField('Name', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Salary', FloatType(), True)])"", 'path': '/mnt/datalake/output/Avroanswer.parquet'}",2025-03-27 08:07:57.835065,0679e0e6-b0b7-45d5-82e3-0d22f8d9686f
line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('model', StringType(), True), StructField('mpg', DoubleType(), True), StructField('cyl', IntegerType(), True), StructField('disp', DoubleType(), True), StructField('hp', IntegerType(), True), StructField('drat', DoubleType(), True), StructField('wt', DoubleType(), True), StructField('qsec', DoubleType(), True), StructField('am', IntegerType(), True), StructField('gear', IntegerType(), True), StructField('carb', IntegerType(), True)])"", 'path': '/mnt/datalake/output/Parquetanswer.parquet'}",2025-03-27 08:07:13.247953,2706e19d-44f7-414c-aecc-6ff1a614351d
line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-27 08:06:21.502141,f4694785-cdcd-4087-b193-0ba735363acb
line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-26 10:07:04.346735,e97c3ac8-38be-44a2-aa98-9a545a6935ac
line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('ID', IntegerType(), True), StructField('Name', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Salary', FloatType(), True)])"", 'path': '/mnt/datalake/output/Avroanswer.parquet'}",2025-03-26 09:22:22.331375,20bdfccc-fa44-4218-a391-1019371af7d9
line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('model', StringType(), True), StructField('mpg', DoubleType(), True), StructField('cyl', IntegerType(), True), StructField('disp', DoubleType(), True), StructField('hp', IntegerType(), True), StructField('drat', DoubleType(), True), StructField('wt', DoubleType(), True), StructField('qsec', DoubleType(), True), StructField('am', IntegerType(), True), StructField('gear', IntegerType(), True), StructField('carb', IntegerType(), True)])"", 'path': '/mnt/datalake/output/Parquetanswer.parquet'}",2025-03-26 09:21:56.889266,596b60d8-ffe2-410d-a830-176df59d10c7
line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-26 09:21:28.914155,8cebedad-0008-4f4c-9ea3-b0c936237063


In [0]:
#sql
# Read lineage logs as a Delta table
lineage_df = spark.read.format("delta").load("/mnt/datalake/Pipeline1/sql/sql_logs/")

# Show all rows without truncation
lineage_df.sort(F.col("timestamp").desc()).display(truncate=False, n=lineage_df.count())

uuid,query,source_tables,destination_tables,timestamp
c5ba42d2-0001-4ffc-8ce6-f6c5fe965202,"CREATE TABLE IF NOT EXISTS employee_table5 (  id INT,  name STRING,  department STRING,  salary INT  )  USING DELTA;",[],[employee_table5],2025-04-01 09:13:28.039220
4596d267-d623-4fb6-bd7b-d9ebda2943b1,SELECT * FROM employee_table5;,[employee_table5],[unknown],2025-03-27 08:05:12.680028
6ea268cd-1fda-41e4-9897-772509bcb2cc,"INSERT INTO employee_table5 VALUES  (1, 'John Doe', 'Engineering', 1000),  (2, 'Jane Smith', 'Marketing', 1200),  (3, 'Alice Johnson', 'Finance', 1400),  (4, 'Bob Brown', 'Engineering', 1300),  (5, 'Charlie White', 'Sales', 1100);",[],[employee_table5],2025-03-27 08:05:01.238966
35893c67-bf60-4bb9-bc5f-885b858a4f65,"CREATE TABLE IF NOT EXISTS employee_table5 (  id INT,  name STRING,  department STRING,  salary INT  )  USING DELTA;",[],[employee_table5],2025-03-27 08:04:49.613385
1cba944a-0967-40d8-9bb8-1838dec3ffc5,"CREATE TABLE IF NOT EXISTS employee_table4 (  id INT,  name STRING,  department STRING,  salary INT  )  USING DELTA;",[],[employee_table4],2025-03-27 08:03:19.809104
66659ad8-d13f-4799-abfd-37b76be491f7,SELECT * FROM employee_table4;,[employee_table4],[unknown],2025-03-26 10:09:36.290130
31549cd9-c816-4f9f-93da-929500f82fe2,"INSERT INTO employee_table4 VALUES  (1, 'John Doe', 'Engineering', 1000),  (2, 'Jane Smith', 'Marketing', 1200),  (3, 'Alice Johnson', 'Finance', 1400),  (4, 'Bob Brown', 'Engineering', 1300),  (5, 'Charlie White', 'Sales', 1100);",[],[employee_table4],2025-03-26 10:09:31.829305
3ac6c0ce-9837-4308-980c-2e8bed25004b,"CREATE TABLE IF NOT EXISTS employee_table4 (  id INT,  name STRING,  department STRING,  salary INT  )  USING DELTA;",[],[employee_table4],2025-03-26 10:09:26.634296
ee6b5b2a-af26-43a3-b38f-edbd1476c377,SELECT * FROM employee_table4;,[employee_table4],[unknown],2025-03-26 09:20:49.919760
9ace40ec-a6b0-4ff2-bddf-1ad9949098ce,"INSERT INTO employee_table4 VALUES  (1, 'John Doe', 'Engineering', 1000),  (2, 'Jane Smith', 'Marketing', 1200),  (3, 'Alice Johnson', 'Finance', 1400),  (4, 'Bob Brown', 'Engineering', 1300),  (5, 'Charlie White', 'Sales', 1100);",[],[employee_table4],2025-03-26 09:20:44.278997


In [0]:
#Combined File
combine_sql_file()
lineage_logs = spark.read.format("delta").load("/mnt/datalake/Pipeline1/lineage/lineage_logs/")

lineage_logs.display()

uuid,source,file_format,read_parameters,read_timestamp,transformation_name,modification_details,modification_timestamp,write_action,write_details,write_timestamp,destination
f4694785-cdcd-4087-b193-0ba735363acb,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-03-27 08:05:19.426137,agg,"{'args': (Column<'sum(amount) AS total_Profit'>,), 'kwargs': {}}",2025-03-27 08:05:43.319715,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-27 08:06:21.502141,/mnt/datalake/output/answer.parquet
f4694785-cdcd-4087-b193-0ba735363acb,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-03-27 08:05:19.426137,withColumnRenamed,"{'args': ('total_profit', 'sum_profit'), 'kwargs': {}}",2025-03-27 08:05:47.810618,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-27 08:06:21.502141,/mnt/datalake/output/answer.parquet
f4694785-cdcd-4087-b193-0ba735363acb,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-03-27 08:05:19.426137,orderBy,"{'args': ('sum_profit',), 'kwargs': {'ascending': False}}",2025-03-27 08:05:51.607075,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-27 08:06:21.502141,/mnt/datalake/output/answer.parquet
f4694785-cdcd-4087-b193-0ba735363acb,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-03-27 08:05:19.426137,distinct,"{'args': (), 'kwargs': {}}",2025-03-27 08:05:57.024610,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-27 08:06:21.502141,/mnt/datalake/output/answer.parquet
f4694785-cdcd-4087-b193-0ba735363acb,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-03-27 08:05:19.426137,fillna,"{'args': ({'sum_profit': 0},), 'kwargs': {}}",2025-03-27 08:06:03.098294,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-27 08:06:21.502141,/mnt/datalake/output/answer.parquet
f4694785-cdcd-4087-b193-0ba735363acb,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-03-27 08:05:19.426137,withColumn,"{'args': ('new_column', Column<'static_value'>), 'kwargs': {}}",2025-03-27 08:06:09.872729,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-27 08:06:21.502141,/mnt/datalake/output/answer.parquet
f4694785-cdcd-4087-b193-0ba735363acb,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-03-27 08:05:19.426137,join,"{'args': (DataFrame[region: string, new_data: bigint],), 'kwargs': {'on': 'region', 'how': 'left'}}",2025-03-27 08:06:14.416994,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-03-27 08:06:21.502141,/mnt/datalake/output/answer.parquet
82d9db3c-acea-4b91-a14a-da8c33e1930d,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-04-01 09:13:49.349385,filter,"{'args': ('amount > 100',), 'kwargs': {}}",2025-04-01 09:13:57.446831,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-04-01 09:14:40.491686,/mnt/datalake/output/answer.parquet
82d9db3c-acea-4b91-a14a-da8c33e1930d,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-04-01 09:13:49.349385,groupBy,"{'args': ('region',), 'kwargs': {}}",2025-04-01 09:14:03.988241,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-04-01 09:14:40.491686,/mnt/datalake/output/answer.parquet
82d9db3c-acea-4b91-a14a-da8c33e1930d,dbfs:/FileStore/files/source_data.csv,csv,{'header': True},2025-04-01 09:13:49.349385,agg,"{'args': (Column<'sum(amount) AS total_Profit'>,), 'kwargs': {}}",2025-04-01 09:14:07.235702,line_write,"{'mode': 'overwrite', 'df_schema': ""StructType([StructField('sum_profit', DoubleType(), False), StructField('new_column', StringType(), False), StructField('new_data', LongType(), True)])"", 'path': '/mnt/datalake/output/answer.parquet'}",2025-04-01 09:14:40.491686,/mnt/datalake/output/answer.parquet
