In [3]:
import os
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType, StringType, DoubleType, DateType
import datetime
from utils.spark_session import spark

base_path = "spark_data/hello_world"
os.makedirs(base_path, exist_ok=True)

customers_path = f"{base_path}/customers"
products_path = f"{base_path}/products"
orders_path = f"{base_path}/orders"

# Configurations
num_customers = 5_000_000  
num_products = 5_000_000     
num_orders = 5_000_000       

# ---------------------------------
# Helper: Generate a range DataFrame
# ---------------------------------
def generate_range_df(n, col_name="id"):
    return spark.range(1, n + 1).withColumnRenamed("id", col_name)

# ---------------------------------
# Step 1: Generate customers
# ---------------------------------
print("Generating customers ...")

countries = ["USA", "Canada", "UK", "Germany", "France"]

customers_df = generate_range_df(num_customers, "customer_id") \
    .withColumn("name", f.concat(f.lit("Customer_"), f.col("customer_id").cast(StringType()))) \
    .withColumn("country", f.when(f.rand() < 0.2, countries[0])
                          .when(f.rand() < 0.4, countries[1])
                          .when(f.rand() < 0.6, countries[2])
                          .when(f.rand() < 0.8, countries[3])
                          .otherwise(countries[4])) \
    .withColumn("age", (f.rand() * 52 + 18).cast(IntegerType()))

customers_df.write.mode("overwrite").parquet(customers_path)
print(f"Generated {customers_path}")

# ---------------------------------
# Step 2: Generate products
# ---------------------------------
print("Generating products ...")

categories = ["Electronics", "Clothing", "Books", "Toys", "Sports"]

products_df = generate_range_df(num_products, "product_id") \
    .withColumn("product_name", f.concat(f.lit("Product_"), f.col("product_id").cast(StringType()))) \
    .withColumn("category", f.when(f.rand() < 0.2, categories[0])
                            .when(f.rand() < 0.4, categories[1])
                            .when(f.rand() < 0.6, categories[2])
                            .when(f.rand() < 0.8, categories[3])
                            .otherwise(categories[4])) \
    .withColumn("price", (f.rand() * 495 + 5).cast(DoubleType()))

products_df.write.mode("overwrite").parquet(products_path)
print(f"Generated {products_path}")

# ---------------------------------
# Step 3: Generate orders (Partitioned by year)
# ---------------------------------
print("Generating orders ...")

start_date = datetime.date(2020, 1, 1)
end_date = datetime.date(2025, 1, 1)
days_diff = (end_date - start_date).days

orders_df = generate_range_df(num_orders, "order_id") \
    .withColumn("customer_id", (f.rand() * num_customers + 1).cast(IntegerType())) \
    .withColumn("product_id", (f.rand() * num_products + 1).cast(IntegerType())) \
    .withColumn("quantity", (f.rand() * 9 + 1).cast(IntegerType())) \
    .withColumn("price", (f.rand() * 495 + 5).cast(DoubleType())) \
    .withColumn("random_days", (f.rand() * days_diff).cast(IntegerType())) \
    .withColumn("order_date", f.expr(f"date_add('{start_date}', random_days)")) \
    .withColumn("year", f.year("order_date")) \
    .drop("random_days")

# Write as Parquet, partitioned by year
orders_df.write \
    .mode("overwrite") \
    .partitionBy("year") \
    .parquet(orders_path)

print(f"Generated {orders_path}")


In [2]:
from utils.spark_session import spark
from utils.display_def import display
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

customers_path = f"hdfs:/{base_path}/customers/part-00000-b98e4cda-ea1a-4351-8341-04ce69be9f9a-c000.snappy.parquet"
orders_path = f"{base_path}/orders"
products_path = f"{base_path}/products"

customers_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("country", StringType(), True),
    StructField("age", IntegerType(), True)
])

customers_df = spark.read.parquet(customers_path)

display(customers_df)

products_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True)
])

orders_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("order_date", DateType(), True),
    StructField("year", IntegerType(), True)
])


orders_df = spark.read.schema(orders_schema).parquet(orders_path)
products_df = spark.read.schema(products_schema).parquet(products_path)


# # Example Transformations
# customers_filtered = customers_df.filter(F.col("country") == "USA")
# orders_enriched = orders_df.withColumn("total_price", F.col("price") * F.col("quantity"))
# orders_products = orders_enriched.join(products_df, on="product_id", how="inner")
# full_data = orders_products.join(customers_filtered, on="customer_id", how="inner")

# customer_spending = (
#     full_data
#     .groupBy("customer_id", "name")
#     .agg(
#         F.sum("total_price").alias("total_spent"),
#         F.countDistinct("order_id").alias("total_orders")
#     )
#     .orderBy(F.desc("total_spent"))
# )

# # Action: Collect top 20 customers by total spending
# top_customers = customer_spending.limit(20)

# # Display results
# display(top_customers)

# # Additional action: Show aggregated sales per product category
# category_sales = (
#     full_data
#     .groupBy("category")
#     .agg(
#         F.sum("total_price").alias("category_sales"),
#         F.countDistinct("order_id").alias("order_count")
#     )
#     .orderBy(F.desc("category_sales"))
# )

# display(category_sales)


Py4JJavaError: An error occurred while calling o245.parquet.
: java.io.IOException: Incomplete HDFS URI, no host: hdfs:/file:/mnt/c/wsl_mount_point/spark_data/hello_world/customers/part-00000-b98e4cda-ea1a-4351-8341-04ce69be9f9a-c000.snappy.parquet
	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:185)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3615)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:777)
	at scala.collection.immutable.List.map(List.scala:247)
	at scala.collection.immutable.List.map(List.scala:79)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:775)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:575)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:419)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:290)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:286)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:286)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:249)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:280)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:280)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80)
	at org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$1(Dataset.scala:115)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:113)
	at org.apache.spark.sql.classic.DataFrameReader.load(DataFrameReader.scala:109)
	at org.apache.spark.sql.classic.DataFrameReader.load(DataFrameReader.scala:58)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:457)
	at org.apache.spark.sql.classic.DataFrameReader.parquet(DataFrameReader.scala:306)
	at org.apache.spark.sql.classic.DataFrameReader.parquet(DataFrameReader.scala:58)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:185)
		at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3615)
		at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
		at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
		at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
		at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
		at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
		at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:777)
		at scala.collection.immutable.List.map(List.scala:247)
		at scala.collection.immutable.List.map(List.scala:79)
		at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:775)
		at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:575)
		at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:419)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
		at scala.Option.getOrElse(Option.scala:201)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
		at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
		at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
		at scala.collection.immutable.List.foldLeft(List.scala:79)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
		at scala.collection.immutable.List.foreach(List.scala:334)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:290)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:286)
		at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:286)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:249)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:280)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:280)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 23 more
