In [4]:
import pandas as pd
import numpy as np

# Fixed lists for consistency
products = ["Laptop", "Smartphone", "Tablet", "Monitor", "Keyboard"]
regions = ["North", "South", "East", "West", "Central"]
customer_names = ["Alice", "Bob", "Charlie", "David", "Eva", "Frank", "Grace", "Helen", "Ian", "Jack"]

# sales_data_1
sales_data_1 = pd.DataFrame({
    "sale_id": list(range(1, 101)),
    "customer_name": [customer_names[i % 10] if i % 10 != 0 else None for i in range(1, 101)],
    "product": [products[i % 5] if i % 7 != 0 else "" for i in range(1, 101)],
    "quantity": np.random.randint(1, 10, size=100),
    "price": np.round(np.random.uniform(10.0, 500.0, size=100), 2),
    "sale_date": pd.date_range(start="2023-01-01", periods=100, freq='D'),
    "region": [regions[i % 5] if i % 15 != 0 else None for i in range(1, 101)]
})

# sales_data_2
sales_data_2 = pd.DataFrame({
    "sale_id": list(range(50, 150)),
    "customer_name": [customer_names[i % 10] if i % 8 != 0 else None for i in range(50, 150)],
    "product": [products[i % 5] if i % 6 != 0 else "" for i in range(50, 150)],
    "quantity": np.random.randint(1, 15, size=100),
    "price": np.round(np.random.uniform(5.0, 1000.0, size=100), 2),
    "sale_date": pd.date_range(start="2023-02-15", periods=100, freq='D'),
    "region": [regions[i % 5] if i % 12 != 0 else None for i in range(50, 150)],
    "discount": [round(np.random.uniform(0, 50), 2) if i % 5 != 0 else None for i in range(50, 150)]
})


from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("SalesData").getOrCreate()

schema1 = StructType([
    StructField("sale_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("product", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("sale_date", DateType(), True),
    StructField("region", StringType(), True)
])

schema2 = StructType(schema1.fields + [StructField("discount", DoubleType(), True)])

sales_df1 = spark.createDataFrame(sales_data_1, schema=schema1)
sales_df2 = spark.createDataFrame(sales_data_2, schema=schema2)

# spark.stop()


25/05/31 20:26:55 WARN Utils: Your hostname, Vikashs-Laptop.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
25/05/31 20:26:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/31 20:26:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [39]:
# select column
# column_filter=sales_df1.select('product','customer_name',sales_df1.quantity,(sales_df1.quantity+10).alias("new_quantity_of_product")
#                               ,lower(sales_df1.region).alias("lower_region"),sales_df1.region,(sales_df1.price)).show()

# group by aggregation
# groupby_df = sales_df1.groupBy("product").agg(
#     round(sum("price"),2).alias("total_sales"),
#     sum("quantity").alias("total_quantity"),
#     round(avg("price"),2).alias("avg_price")
# ).show()

# withColumn(colName, col)
# sales_df1.withColumn('order_month',substring(sales_df1.sale_date,1,10)).show()
# df = sales_df1.withColumn("lower_region", lower(col("region"))).show()

# withColumnRenamed(existingCol, newCol)
# sales_df1.withColumnRenamed('sale_id','order_id').show()

# drop(*cols)To Drop a specific column.
# drop=sales_df1.drop('column_name1','column2').show()

# dropDuplicates(subset=None)
# sales_df1.dropDuplicates().show()
# sales_df1.dropDuplicates(("name1","score1","score2")).show()

# filter(condition): (Its alias ‘where’)
# Filter rows using a given condition.
# use '&' for 'and‘. '|' for 'or‘. (boolean expressions)
# Use column function isin() for multiple search.
# Or use IN Operator for SQL Style syntax.

# sales_df1.where((sales_df1.sale_id > 12) & (sales_df1.sale_id < 20)).show()
# sales_df1.where(sales_df1.region.isin('West','South',null)).show()

# sales_df1.where(
#     (col("region").isin("West", "South")) | (col("region").isNull())
# ).show()


sales_df1.where(
    (col("region").isin("West", "South")) & (col("region").isNotNull())
).show()

+-------+-------------+----------+--------+------+----------+------+
|sale_id|customer_name|   product|quantity| price| sale_date|region|
+-------+-------------+----------+--------+------+----------+------+
|      1|          Bob|Smartphone|       4|356.08|2023-01-01| South|
|      3|        David|   Monitor|       1|304.38|2023-01-03|  West|
|      6|        Grace|Smartphone|       5|257.49|2023-01-06| South|
|      8|          Ian|   Monitor|       7|316.01|2023-01-08|  West|
|     11|          Bob|Smartphone|       6| 16.65|2023-01-11| South|
|     13|        David|   Monitor|       5|397.58|2023-01-13|  West|
|     16|        Grace|Smartphone|       9|458.07|2023-01-16| South|
|     18|          Ian|   Monitor|       2|307.82|2023-01-18|  West|
|     21|          Bob|          |       3|315.19|2023-01-21| South|
|     23|        David|   Monitor|       6|100.85|2023-01-23|  West|
|     26|        Grace|Smartphone|       1|382.07|2023-01-26| South|
|     28|          Ian|          |

In [17]:
pivot_df = sales_df1.groupBy("product").pivot("region").agg(round(avg("price"),2))
pivot_df = pivot_df.fillna(0)
pivot_df.show()

+----------+------+-------+------+------+------+------+
|   product|  null|Central|  East| North| South|  West|
+----------+------+-------+------+------+------+------+
|    Laptop|189.66|    0.0|   0.0|200.55|   0.0|   0.0|
|    Tablet|   0.0|    0.0|228.28|   0.0|   0.0|   0.0|
|          |   0.0| 424.14|197.19|268.67|308.83|330.22|
|  Keyboard|   0.0| 271.26|   0.0|   0.0|   0.0|   0.0|
|Smartphone|   0.0|    0.0|   0.0|   0.0|280.97|   0.0|
|   Monitor|   0.0|    0.0|   0.0|   0.0|   0.0|271.34|
+----------+------+-------+------+------+------+------+



In [35]:
# unpivot
# .withColumn("region", lit("East"))
# This adds a new column called "region" with a constant value "East" for all rows.

# lit("East") means a literal (static) string.

from pyspark.sql.functions import lit

unpivot_df = (
    pivot_df.select("product", pivot_df["East"].alias("avg_price")).withColumn("region", lit("East"))
    .union(
        pivot_df.select("product", pivot_df["North"].alias("avg_price")).withColumn("region", lit("North"))
    )
    .union(
        pivot_df.select("product", pivot_df["South"].alias("avg_price")).withColumn("region", lit("South"))
    )
)

unpivot_df.select("product", "region", "avg_price").show()


+----------+------+---------+
|   product|region|avg_price|
+----------+------+---------+
|    Laptop|  East|      0.0|
|    Tablet|  East|   228.28|
|          |  East|   197.19|
|  Keyboard|  East|      0.0|
|Smartphone|  East|      0.0|
|   Monitor|  East|      0.0|
|    Laptop| North|   200.55|
|    Tablet| North|      0.0|
|          | North|   268.67|
|  Keyboard| North|      0.0|
|Smartphone| North|      0.0|
|   Monitor| North|      0.0|
|    Laptop| South|      0.0|
|    Tablet| South|      0.0|
|          | South|   308.83|
|  Keyboard| South|      0.0|
|Smartphone| South|   280.97|
|   Monitor| South|      0.0|
+----------+------+---------+



In [19]:
from pyspark.sql.functions import when, avg

result_df = sales_df1.groupBy("region").agg(
    avg(when(col("product") == "Smartphone", col("price"))).alias("avg_smartphone_price"),
    avg(when(col("product") == "Monitor", col("price"))).alias("avg_monitor_price"),
    avg(when(col("product") == "Keyboard", col("price"))).alias("avg_keyboard_price"),
    avg(when(col("product") == "Tablet", col("price"))).alias("avg_tablet_price"),
    avg(when(col("product") == "Laptop", col("price"))).alias("avg_laptop_price")
)

result_df.show()


+-------+--------------------+-----------------+------------------+------------------+------------------+
| region|avg_smartphone_price|avg_monitor_price|avg_keyboard_price|  avg_tablet_price|  avg_laptop_price|
+-------+--------------------+-----------------+------------------+------------------+------------------+
|  South|  280.96941176470585|             NULL|              NULL|              NULL|              NULL|
|Central|                NULL|             NULL| 271.2564705882353|              NULL|              NULL|
|   East|                NULL|             NULL|              NULL|228.28352941176468|              NULL|
|   West|                NULL|271.3370588235294|              NULL|              NULL|              NULL|
|  North|                NULL|             NULL|              NULL|              NULL|200.54666666666665|
|   NULL|                NULL|             NULL|              NULL|              NULL|189.65666666666667|
+-------+--------------------+----------------

In [33]:
from pyspark.sql.functions import when, avg

result_df = sales_df1.groupBy("region").agg(
    round(avg(when(sales_df1.product == "Smartphone", sales_df1.price).otherwise(0)), 2).alias("avg_smartphone_price"),
    avg(when(sales_df1.product == "Monitor", sales_df1.price)).alias("avg_monitor_price"),
    avg(when(sales_df1.product == "Keyboard", sales_df1.price)).alias("avg_keyboard_price"),
    avg(when(sales_df1.product == "Tablet", sales_df1.price)).alias("avg_tablet_price"),
    avg(when(sales_df1.product == "Laptop", sales_df1.price)).alias("avg_laptop_price")
)

result_df.show()


+-------+--------------------+-----------------+------------------+------------------+------------------+
| region|avg_smartphone_price|avg_monitor_price|avg_keyboard_price|  avg_tablet_price|  avg_laptop_price|
+-------+--------------------+-----------------+------------------+------------------+------------------+
|  South|              238.82|             NULL|              NULL|              NULL|              NULL|
|Central|                 0.0|             NULL| 271.2564705882353|              NULL|              NULL|
|   East|                 0.0|             NULL|              NULL|228.28352941176468|              NULL|
|   West|                 0.0|271.3370588235294|              NULL|              NULL|              NULL|
|  North|                 0.0|             NULL|              NULL|              NULL|200.54666666666665|
|   NULL|                 0.0|             NULL|              NULL|              NULL|189.65666666666667|
+-------+--------------------+----------------

In [43]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col, row_number

# Window spec for running sum ordered by sale_date
window_sum = Window.partitionBy("region").orderBy("sale_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Window spec for ranking sales by price (descending) per region
window_rank = Window.partitionBy("region").orderBy(col("price").desc())

# Calculate running sum of price
running_sum_df = sales_df1.withColumn("running_sum_price", sum(col("price")).over(window_sum))

# Add row number to rank prices per region descending
ranked_df = running_sum_df.withColumn("price_rank", row_number().over(window_rank))

# Filter or get the 3rd highest sale price per region
third_highest_df = ranked_df.filter(col("price_rank") == 3)

# Show the 3rd highest sale in each region
third_highest_df.select("sale_id", "region", "sale_date", "price", "price_rank", "running_sum_price").show()



+-------+-------+----------+------+----------+------------------+
|sale_id| region| sale_date| price|price_rank| running_sum_price|
+-------+-------+----------+------+----------+------------------+
|     90|   NULL|2023-03-31|214.98|         3|           1137.94|
|     39|Central|2023-02-08| 457.2|         3|           2416.79|
|     37|   East|2023-02-06|420.56|         3|1901.4700000000003|
|     50|  North|2023-02-19|342.24|         3|1617.0900000000001|
|     86|  South|2023-03-27| 397.9|         3| 5229.909999999999|
|     58|   West|2023-02-27|490.57|         3|           3358.85|
+-------+-------+----------+------+----------+------------------+



In [45]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, row_number

# Window for running sum ordered by sale_date
window_sum = Window.partitionBy("region").orderBy("sale_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Window for ranking sales by price descending per region
window_rank = Window.partitionBy("region").orderBy(sales_df1.price.desc())

# Calculate running sum
running_sum_df = sales_df1.withColumn("running_sum_price", sum(sales_df1.price).over(window_sum))

# Add row number rank by price descending
ranked_df = running_sum_df.withColumn("price_rank", row_number().over(window_rank))

# Filter 3rd highest sales per region
third_highest_df = ranked_df.filter(ranked_df.price_rank == 3)

third_highest_df.select("sale_id", "region", "sale_date", "price", "price_rank", "running_sum_price").show()


+-------+-------+----------+------+----------+------------------+
|sale_id| region| sale_date| price|price_rank| running_sum_price|
+-------+-------+----------+------+----------+------------------+
|     90|   NULL|2023-03-31|214.98|         3|           1137.94|
|     39|Central|2023-02-08| 457.2|         3|           2416.79|
|     37|   East|2023-02-06|420.56|         3|1901.4700000000003|
|     50|  North|2023-02-19|342.24|         3|1617.0900000000001|
|     86|  South|2023-03-27| 397.9|         3| 5229.909999999999|
|     58|   West|2023-02-27|490.57|         3|           3358.85|
+-------+-------+----------+------+----------+------------------+



In [47]:
from pyspark.sql.functions import avg
# we find the customer name whose product price higher then the average price of product for that region 

avg_price_df = sales_df1.groupBy("product").agg(avg("price").alias("avg_price"))

result_df = sales_df1.join(avg_price_df, on="product").filter(sales_df1.price > avg_price_df.avg_price)

result_df.select("sale_id", "customer_name", "product", "price", "avg_price", "region").show()


+-------+-------------+----------+------+------------------+-------+
|sale_id|customer_name|   product| price|         avg_price| region|
+-------+-------------+----------+------+------------------+-------+
|      5|        Frank|    Laptop|234.34|196.91666666666663|  North|
|     10|         NULL|    Laptop|372.74|196.91666666666663|  North|
|      4|          Eva|  Keyboard|423.94| 271.2564705882353|Central|
|      1|          Bob|Smartphone|356.08|280.96941176470585|  South|
|      3|        David|   Monitor|304.38| 271.3370588235294|   West|
|      8|          Ian|   Monitor|316.01| 271.3370588235294|   West|
|     15|        Frank|    Laptop|379.67|196.91666666666663|   NULL|
|     22|      Charlie|    Tablet|256.35|228.28352941176468|   East|
|     14|          Eva|          |453.99| 308.4628571428571|Central|
|     21|          Bob|          |315.19| 308.4628571428571|  South|
|     24|          Eva|  Keyboard|389.38| 271.2564705882353|Central|
|     16|        Grace|Smartphone|

In [53]:
from pyspark.sql.functions import first, last
window_spec = Window.partitionBy("region").orderBy("sale_date")
df = sales_df1.withColumn("first_price", first("price").over(window_spec))
df = df.withColumn("last_price", last("price").over(window_spec)).show()


+-------+-------------+--------+--------+------+----------+-------+-----------+----------+
|sale_id|customer_name| product|quantity| price| sale_date| region|first_price|last_price|
+-------+-------------+--------+--------+------+----------+-------+-----------+----------+
|     15|        Frank|  Laptop|       8|379.67|2023-01-15|   NULL|     379.67|    379.67|
|     30|         NULL|  Laptop|       4|270.16|2023-01-30|   NULL|     379.67|    270.16|
|     45|        Frank|  Laptop|       6|130.48|2023-02-14|   NULL|     379.67|    130.48|
|     60|         NULL|  Laptop|       5|  70.9|2023-03-01|   NULL|     379.67|      70.9|
|     75|        Frank|  Laptop|       6| 71.75|2023-03-16|   NULL|     379.67|     71.75|
|     90|         NULL|  Laptop|       6|214.98|2023-03-31|   NULL|     379.67|    214.98|
|      4|          Eva|Keyboard|       3|423.94|2023-01-04|Central|     423.94|    423.94|
|      9|         Jack|Keyboard|       2| 95.92|2023-01-09|Central|     423.94|     95.92|