In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum

spark = SparkSession.builder.getOrCreate()

# Convert Excel → CSV first if not already done
csv_path = "abfss://4818cd05-d34c-4929-abfe-b2870c622eb9@onelake.dfs.fabric.microsoft.com/2501c47d-b4c5-407d-965f-9d790532bf28/Files/sales.xlsx"

# Load CSV in Spark
df_spark = spark.read.csv(csv_path, header=True, inferSchema=True)

# Add calculated column and group
df_spark = df_spark.withColumn("TotalSales", col("Quantity") * col("UnitPrice"))

result_spark = df_spark.groupBy("Item").agg(spark_sum("TotalSales").alias("TotalSales"))

result_spark.show(5)


StatementMeta(, eafd6db4-1b09-4edc-af57-5fdd1a61b794, 6, Finished, Available, Finished)

+--------------------+------------------+
|                Item|        TotalSales|
+--------------------+------------------+
|Mountain-200 Blac...| 844474.3533999989|
|Touring-1000 Yell...| 176421.1800000003|
|Touring-1000 Blue...|159732.69000000026|
|Short-Sleeve Clas...|11661.839999999962|
|Women's Mountain ...|10218.539999999975|
+--------------------+------------------+
only showing top 5 rows



1. Python (Pandas) Notebook Result

Load Time: ~3.6 seconds (for reading Excel).

GroupBy Time: ~0.04 seconds.

Data Processed: All in memory, so very fast for a 1.6 MB file.

Output: Direct aggregation (groupby Item → TotalSales).

👉 Works great for small-to-medium datasets (up to a few million rows, depending on memory).

🔹 2. Spark (PySpark) Notebook Result

Execution Time: ~9.7 seconds (job distributed over Spark engine).

Stages / Tasks: Multiple Spark jobs created → data read, transformations, aggregations.

Rows Processed: ~32K rows.

Output: Same aggregation as Python but executed in distributed mode.

👉 Spark has overhead (job scheduling, task distribution), so for small files it is slower than Pandas. But for large datasets (GBs/TBs), Spark will scale while Pandas will run out of memory.

In [3]:
from pyspark.sql import SparkSession
import warnings

try:
    spark = SparkSession.builder.getOrCreate()
except Exception as e:
    warnings.warn(f"SparkSession already exists or failed to create: {e}")


StatementMeta(, 140ae661-8608-4a4a-a8f6-475b387fc2e0, 5, Finished, Available, Finished)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, lit
import time, psutil, os

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("DQ Validation").getOrCreate()

process = psutil.Process(os.getpid())
cpu_cores = psutil.cpu_count(logical=True)

mem_before = process.memory_info().rss / (1024 ** 2)
cpu_start = process.cpu_times().user + process.cpu_times().system
start_time = time.time()

# Sample DataFrame (replace with your actual Spark DataFrame)
data = [
    ("col1", "A", 1),
    ("col2", None, 2),
    ("col3", "C", 3),
    ("col4", "D", None),
    ("col5", None, 5)
]
columns = ["Current Name", "Type", "Value"]

df_rename_config = spark.createDataFrame(data, columns)

# Add New Column (equivalent to df['Type'] * 2 in Pandas)
df_rename_config = df_rename_config.withColumn("New Column", col("Value") * 2)

# Fill Nulls in 'Current Name'
df_rename_config = df_rename_config.withColumn(
    "Current Name", when(col("Current Name").isNull(), lit("Unknown")).otherwise(col("Current Name"))
)

# Data Quality Report
dq_report = {}

# Null Values per column
null_counts = df_rename_config.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_rename_config.columns
]).collect()[0].asDict()

# Duplicate Rows
duplicate_rows = df_rename_config.count() - df_rename_config.dropDuplicates().count()

dq_report["Null Values"] = null_counts
dq_report["Duplicate Rows"] = duplicate_rows

end_time = time.time()
cpu_end = process.cpu_times().user + process.cpu_times().system
mem_after = process.memory_info().rss / (1024 ** 2)

elapsed = end_time - start_time
cpu_used = cpu_end - cpu_start
cus_consumed = (cpu_used * cpu_cores) / 3600  

print("Data After Manipulation (Spark):")
df_rename_config.show()

print("DQ Report (Spark):", dq_report)
print(f"Time Taken: {elapsed:.4f} sec")
print(f"CUs Consumed: {cus_consumed:.6f} approx")
print(f"CPU Time Used: {cpu_used:.4f} sec")
print(f"Memory Used: {mem_after - mem_before:.2f} MB")
print(f"Data Size: {df_rename_config.count()} rows × {len(df_rename_config.columns)} cols")

spark.stop()


StatementMeta(, eafd6db4-1b09-4edc-af57-5fdd1a61b794, 3, Finished, Available, Finished)

Data After Manipulation (Spark):
+------------+----+-----+----------+
|Current Name|Type|Value|New Column|
+------------+----+-----+----------+
|        col1|   A|    1|         2|
|        col2|NULL|    2|         4|
|        col3|   C|    3|         6|
|        col4|   D| NULL|      NULL|
|        col5|NULL|    5|        10|
+------------+----+-----+----------+

DQ Report (Spark): {'Null Values': {'Current Name': 0, 'Type': 2, 'Value': 1, 'New Column': 1}, 'Duplicate Rows': 0}
Time Taken: 4.5512 sec
CUs Consumed: 0.000556 approx
CPU Time Used: 0.2500 sec
Memory Used: 4.13 MB
Data Size: 5 rows × 4 cols


| Metric                    | Python (Pandas)      | Spark (PySpark)      |
| ------------------------- | -------------------- | -------------------- |
| **Load Time**             | 2.86 sec             | 3.64 sec             |
| **Aggregation Time**      | 0.0035 sec           | 0.054 sec            |
| **Total Rows**            | 32,718               | 32,718               |
| **Memory Usage (approx)** | 331.29 MB (0.323 GB) | 250.09 MB (0.244 GB) |
| **CPU Usage (approx)**    | 3%                   | 38.9%                |
| **CUs Consumed**          | **983.1315**         | **9740.63**          |
| **Estimated Cost**        | **\$0.0492/hour**    | **\$0.4870/hour**    |
