In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()

spark.session_id

In [0]:
from pyspark.sql import Row
import random
import datetime

categories = ['Electronics', 'Books', 'Clothing', 'Home', 'Toys']
payment_methods = ['Credit Card', 'Debit Card', 'Gift Card', 'Net Banking', 'UPI']
cities = ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix']
states = ['NY', 'CA', 'IL', 'TX', 'AZ']
customer_names = ['John Doe', 'Jane Smith', 'Alice Johnson', 'Robert Brown', 'Emily Davis']

def random_date():
    start = datetime.date(2024, 1, 1)
    end = datetime.date(2024, 12, 31)
    return start + datetime.timedelta(days=random.randint(0, (end - start).days))

def random_address():
    return f"{random.randint(100, 9999)} {random.choice(['Main St', '2nd St', '3rd St', '4th St', '5th St'])}, {random.choice(cities)}, {random.choice(states)}"

unique_rows = []
for i in range(1, 21):
    customer_id = f"CUST{random.randint(1000, 9999)}"
    row = Row(
        order_id=f"ORD{i:04d}",
        customer_id=customer_id,
        customer_name=random.choice(customer_names),
        address=random_address(),
        product_id=f"PROD{random.randint(100, 999)}",
        category=random.choice(categories),
        order_date=str(random_date()),
        quantity=random.randint(1, 5),
        price=round(random.uniform(10, 500), 2),
        payment_method=random.choice(payment_methods),
        city=random.choice(cities),
        state=random.choice(states)
    )
    unique_rows.append(row)

# 5 duplicate rows (pick from unique_rows)
duplicate_rows = [unique_rows[i] for i in random.sample(range(20), 5)]

# 5 rows with missing values in 'price'
missing_rows = []
for i in range(21, 26):
    customer_id = f"CUST{random.randint(1000, 9999)}"
    row = Row(
        order_id=f"ORD{i:04d}",
        customer_id=customer_id,
        customer_name=random.choice(customer_names),
        address=random_address(),
        product_id=f"PROD{random.randint(100, 999)}",
        category=random.choice(categories),
        order_date=str(random_date()),
        quantity=random.randint(1, 5),
        price=None,
        payment_method=random.choice(payment_methods),
        city=random.choice(cities),
        state=random.choice(states)
    )
    missing_rows.append(row)

all_rows = unique_rows + duplicate_rows + missing_rows

df_sales = spark.createDataFrame(all_rows)
display(df_sales)

In [0]:
df_sales.display()
df_sales.count()

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", FloatType(), True),
    StructField("payment_method", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True)
])

sales_raw_data = spark.createDataFrame(all_rows, schema)

display(sales_raw_data)

In [0]:
sales_raw_data.printSchema()


In [0]:
spark.sql("show catalogs").display()

In [0]:
spark.sql("create catalog if not exists lakehouse")
spark.sql("show catalogs").display()

In [0]:
spark.sql("DESCRIBE CATALOG lakehouse").display()

In [0]:
spark.sql("show schemas in lakehouse").show()

In [0]:
spark.sql("create schema if not exists lakehouse.dev")
spark.sql("show schemas in lakehouse").show()


In [0]:
spark.sql("DESCRIBE SCHEMA lakehouse.dev").display()

In [0]:
sales_raw_data.write.mode("overwrite").saveAsTable("lakehouse.dev.sales_raw_data")


In [0]:
spark.sql("select * from lakehouse.dev.sales_raw_data").show(5, truncate=True)

In [0]:
spark.sql("DESCRIBE EXTENDED lakehouse.dev.sales_raw_data").display()

In [0]:
# Normal tables in Databricks are typically managed tables stored in the default data source (e.g., Hive).
# Delta tables in Databricks are managed tables that use the Delta Lake format, providing ACID transactions, scalable metadata handling, and data versioning.

# Example of creating a normal table using Delta format

spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.dev.normal_table (
  id INT,
  name STRING,
  value DOUBLE
)
USING DELTA
""")

# Inserting data into the normal table
spark.sql("""
INSERT INTO lakehouse.dev.normal_table VALUES
(1, 'Alice', 10.5),
(2, 'Bob', 20.0),
(3, 'Charlie', 30.5)
""")

# Example of creating a Delta table
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.dev.delta_table (
  id INT,
  name STRING,
  value DOUBLE
)
USING DELTA
""")

# Inserting data into the Delta table
spark.sql("""
INSERT INTO lakehouse.dev.delta_table VALUES
(1, 'Alice', 10.5),
(2, 'Bob', 20.0),
(3, 'Charlie', 30.5)
""")

In [0]:
spark.sql("select * from lakehouse.dev.normal_table").display()
spark.sql("select * from lakehouse.dev.delta_table").display()

In [0]:
spark.sql("DESCRIBE EXTENDED lakehouse.dev.normal_table").display()
spark.sql("DESCRIBE EXTENDED lakehouse.dev.delta_table").display()


In [0]:
def is_delta_table(table_name):
    details = spark.sql(f"DESCRIBE EXTENDED {table_name}").collect()
    for row in details:
        if row.col_name == 'Provider' and row.data_type == 'delta':
            return True
    return False

# Example usage
table_name = "lakehouse.dev.normal_table"
if is_delta_table(table_name):
    print(f"{table_name} is a Delta table.")
else:
    print(f"{table_name} is a normal table.")

In [0]:
sales_raw_data.write.format("delta").mode("overwrite").saveAsTable("lakehouse.dev.sales_delta")
spark.sql("select * from lakehouse.dev.sales_delta").display()

In [0]:
spark.sql("show tables in lakehouse.dev").show()

In [0]:
from pyspark.sql.functions import col

# Load the sales_delta table into a DataFrame
sales_df = spark.table("lakehouse.dev.sales_delta")

# Example of an inner join
inner_join_df = sales_df.alias("a").join(sales_df.alias("b"), col("a.order_id") == col("b.order_id"), "inner")
display(inner_join_df)

# Example of a left join
left_join_df = sales_df.alias("a").join(sales_df.alias("b"), col("a.order_id") == col("b.order_id"), "left")
display(left_join_df)

# Example of a right join
right_join_df = sales_df.alias("a").join(sales_df.alias("b"), col("a.order_id") == col("b.order_id"), "right")
display(right_join_df)

# Example of a full outer join
full_outer_join_df = sales_df.alias("a").join(sales_df.alias("b"), col("a.order_id") == col("b.order_id"), "outer")
display(full_outer_join_df)

creating two tables and doing operations


In [0]:
spark.sql("CREATE CATALOG IF NOT EXISTS practice")
spark.sql("CREATE SCHEMA IF NOT EXISTS practice.dev")

In [0]:
spark.sql("show tables in practice.dev").display()


In [0]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS practice.dev.sales (
        sale_id INT,
        product STRING,
        amount DOUBLE,
        order_id INT
    )
""")

spark.sql("""
    INSERT INTO practice.dev.sales VALUES
    (1, 'Product A', 100.0, 1),
    (2, 'Product B', 150.0, 2),
    (3, 'Product C', 200.0, 3),
    (4, 'Product D', 250.0, NULL),
    (1, 'Product A', 100.0, 1)
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS practice.dev.orders (
        order_id INT,
        customer STRING,
        order_date DATE
    )
""")

spark.sql("""
    INSERT INTO practice.dev.orders VALUES
    (1, 'Customer X', '2025-06-25'),
    (2, 'Customer Y', '2025-06-26'),
    (3, 'Customer Z', '2025-06-27'),
    (NULL, 'Customer W', '2025-06-28'),
    (1, 'Customer X', '2025-06-25')
""")

In [0]:
%sql
select * from practice.dev.orders;


In [0]:
%sql
select * from practice.dev.sales;

In [0]:
# First drop duplicates from both tables by creating df for both tables

df_sales = spark.table("practice.dev.sales")
df_orders = spark.table("practice.dev.orders")

# Drop duplicates 
df_sales_dedup = df_sales.dropDuplicates()
df_orders_dedup = df_orders.dropDuplicates()

# Display unique records
df_sales_dedup.sort("sale_id", ascending=True).display()
df_orders_dedup.sort("order_id", ascending=True).display()

In [0]:
from pyspark.sql.functions import sum

# Join the deduplicated sales and orders dataframes
df_joined = df_sales_dedup.join(
    df_orders_dedup,
    df_sales_dedup.order_id == df_orders_dedup.order_id,
    "inner"
)
df_joined.orderBy("orders.order_id", ascending=True).display()
#df_joined.sort("sale_id", ascending=True).display()


# Group by product and calculate the total sales amount for each product
df_product_sales = df_joined.groupBy("product").agg(sum("amount").alias("total_sales"))
display(df_product_sales)

# Find the product with the highest sales
df_highest_sales_product = df_product_sales.orderBy("total_sales", ascending=False).limit(1)

# Display the product with the highest sales
display(df_highest_sales_product)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as spark_sum

# Define a window specification
window_spec = Window.orderBy("sale_id").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate the running total of sales
df_running_total = df_joined.withColumn("running_total", spark_sum("amount").over(window_spec))

# Display the running total of sales
display(df_running_total)