In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appNxame("DataFrame_Operations").getOrCreate()


24/12/10 12:15:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [10]:
# Create data for the DataFrame
data = [
    ("Alice", "New York", 25, "Female"),
    ("Bob", "Los Angeles", 30, "Male"),
    ("Charlie", "Chicago", 22, "Male"),
    ("Diana", "Houston", 28, "Female"),
    ("Eve", "Phoenix", 35, "Female"),
    ("Frank", "Philadelphia", 40, "Male"),
    ("Grace", "San Antonio", 29, "Female"),
    ("Hank", "San Diego", 33, "Male"),
    ("Ivy", "Dallas", 31, "Female"),
    ("Jack", "San Jose", 27, "Male"),
]

# Define the schema (column names)
columns = ["name", "city", "age", "gender"]

# Create the DataFrame
df = spark.createDataFrame(data, schema=columns)

# Show the DataFrame
df.show()

+-------+------------+---+------+
|   name|        city|age|gender|
+-------+------------+---+------+
|  Alice|    New York| 25|Female|
|    Bob| Los Angeles| 30|  Male|
|Charlie|     Chicago| 22|  Male|
|  Diana|     Houston| 28|Female|
|    Eve|     Phoenix| 35|Female|
|  Frank|Philadelphia| 40|  Male|
|  Grace| San Antonio| 29|Female|
|   Hank|   San Diego| 33|  Male|
|    Ivy|      Dallas| 31|Female|
|   Jack|    San Jose| 27|  Male|
+-------+------------+---+------+



In [11]:
# Filter rows where age > 30
filtered_df = df.filter(df.age > 30)

# Show the filtered DataFrame
filtered_df.show()


+-----+------------+---+------+
| name|        city|age|gender|
+-----+------------+---+------+
|  Eve|     Phoenix| 35|Female|
|Frank|Philadelphia| 40|  Male|
| Hank|   San Diego| 33|  Male|
|  Ivy|      Dallas| 31|Female|
+-----+------------+---+------+



In [12]:
from pyspark.sql.functions import col

df = df.withColumn("tax", col("age") * 0.1)

df = df.withColumnRenamed("age", "years")



In [13]:

df = df.drop("city", "gender")

df.show()

+-------+-----+------------------+
|   name|years|               tax|
+-------+-----+------------------+
|  Alice|   25|               2.5|
|    Bob|   30|               3.0|
|Charlie|   22|               2.2|
|  Diana|   28|2.8000000000000003|
|    Eve|   35|               3.5|
|  Frank|   40|               4.0|
|  Grace|   29|2.9000000000000004|
|   Hank|   33|3.3000000000000003|
|    Ivy|   31|               3.1|
|   Jack|   27|               2.7|
+-------+-----+------------------+



In [19]:

# Sample data
data = [
    (101, "Laptop", "Electronics", 800, 50, 4.5),
    (102, "Headphones", "Electronics", 50, 200, 4.3),
    (103, "Smartphone", "Electronics", 500, 80, 4.6),
    (104, "Sofa", "Furniture", 300, 20, 4.1),
    (105, "Dining Table", "Furniture", 400, 15, 4.0),
    (106, "Chair", "Furniture", 50, 100, 4.2),
    (107, "Notebook", "Stationery", 5, 500, 4.8),
    (108, "Pen", "Stationery", 2, 1000, 4.9),
    (109, "Printer", "Electronics", 150, 30, 4.0),
    (110, "Bed", "Furniture", 700, 10, 4.3)
]

# Define schema
columns = ["ProductID", "ProductName", "Category", "Price", "StockQuantity", "Rating"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# Ensure correct data types for Price and StockQuantity
df = df.withColumn("Price", col("Price").cast("float"))
df = df.withColumn("StockQuantity", col("StockQuantity").cast("int"))

# Show initial DataFrame
print("Initial DataFrame:")
df.show()



Initial DataFrame:
+---------+------------+-----------+-----+-------------+------+
|ProductID| ProductName|   Category|Price|StockQuantity|Rating|
+---------+------------+-----------+-----+-------------+------+
|      101|      Laptop|Electronics|800.0|           50|   4.5|
|      102|  Headphones|Electronics| 50.0|          200|   4.3|
|      103|  Smartphone|Electronics|500.0|           80|   4.6|
|      104|        Sofa|  Furniture|300.0|           20|   4.1|
|      105|Dining Table|  Furniture|400.0|           15|   4.0|
|      106|       Chair|  Furniture| 50.0|          100|   4.2|
|      107|    Notebook| Stationery|  5.0|          500|   4.8|
|      108|         Pen| Stationery|  2.0|         1000|   4.9|
|      109|     Printer|Electronics|150.0|           30|   4.0|
|      110|         Bed|  Furniture|700.0|           10|   4.3|
+---------+------------+-----------+-----+-------------+------+



In [20]:
# Sort the DataFrame
sorted_df = df.orderBy(col("Price").desc(), col("Category").asc())
print("Sorted DataFrame (by Price descending, then by Category ascending):")
sorted_df.show()

Sorted DataFrame (by Price descending, then by Category ascending):
+---------+------------+-----------+-----+-------------+------+
|ProductID| ProductName|   Category|Price|StockQuantity|Rating|
+---------+------------+-----------+-----+-------------+------+
|      101|      Laptop|Electronics|800.0|           50|   4.5|
|      110|         Bed|  Furniture|700.0|           10|   4.3|
|      103|  Smartphone|Electronics|500.0|           80|   4.6|
|      105|Dining Table|  Furniture|400.0|           15|   4.0|
|      104|        Sofa|  Furniture|300.0|           20|   4.1|
|      109|     Printer|Electronics|150.0|           30|   4.0|
|      102|  Headphones|Electronics| 50.0|          200|   4.3|
|      106|       Chair|  Furniture| 50.0|          100|   4.2|
|      107|    Notebook| Stationery|  5.0|          500|   4.8|
|      108|         Pen| Stationery|  2.0|         1000|   4.9|
+---------+------------+-----------+-----+-------------+------+



In [24]:
from pyspark.sql.functions import sum

# Add a column for total sales (Price * StockQuantity)
sales_df = df.withColumn("TotalSales", col("Price") * col("StockQuantity"))

# Group by Category and calculate total sales
sales_by_category = sales_df.groupBy("Category").agg(sum(col("TotalSales")).alias("TotalSalesByCategory"))

# Show the result
sales_by_category.show()


+-----------+--------------------+
|   Category|TotalSalesByCategory|
+-----------+--------------------+
|Electronics|             94500.0|
|  Furniture|             24000.0|
| Stationery|              4500.0|
+-----------+--------------------+



In [25]:
from pyspark.sql.functions import sum

# Group by ProductName and calculate total sales and quantity sold
sales_by_product = sales_df.groupBy("ProductName").agg(
    sum("TotalSales").alias("TotalSalesAmount"),
    sum("StockQuantity").alias("TotalQuantitySold")
)

# Show the result
sales_by_product.show()


+------------+----------------+-----------------+
| ProductName|TotalSalesAmount|TotalQuantitySold|
+------------+----------------+-----------------+
|      Laptop|         40000.0|               50|
|  Headphones|         10000.0|              200|
|  Smartphone|         40000.0|               80|
|        Sofa|          6000.0|               20|
|Dining Table|          6000.0|               15|
|       Chair|          5000.0|              100|
|    Notebook|          2500.0|              500|
|         Pen|          2000.0|             1000|
|         Bed|          7000.0|               10|
|     Printer|          4500.0|               30|
+------------+----------------+-----------------+

