In [0]:
from pyspark.sql.functions import col

df_raw = spark.table("zomato_reviews_cluster.default.raw_zomato_reviews")
df_raw.show(5)
df_raw.printSchema()


+--------------------+------+--------------------+-------------------+-------+
|           review_id|rating|         review_text|        review_date|helpful|
+--------------------+------+--------------------+-------------------+-------+
|90749778-cd88-4c1...|     4|kindly requesting...|2025-11-27 08:15:26|      0|
|aa848bb6-d242-4a7...|     1|Hiked prices, pac...|2025-11-27 08:08:31|      0|
|4f888388-9f28-44a...|     5|       good discount|2025-11-27 04:20:28|      0|
|490a16b3-aacf-420...|     1|Zomato in its ini...|2025-11-27 03:34:38|      0|
|0090a503-13b8-474...|     5|    good application|2025-11-27 02:50:58|      0|
+--------------------+------+--------------------+-------------------+-------+
only showing top 5 rows
root
 |-- review_id: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- helpful: long (nullable = true)



In [0]:
from pyspark.sql.functions import to_date, row_number, lit
from pyspark.sql.window import Window

# Parse review_date
df_step = df_raw.withColumn("review_date", to_date(col("review_date")))

# Define a window for ordering rows
w = Window.orderBy(col("review_date"), col("review_id"))

# Create new numeric review_id starting from 140601
df_step = df_step.withColumn(
    "review_id_int",
    (row_number().over(w) + lit(140600)).cast("int")  # 140600 + 1 = 140601
)

# Replace old review_id (string) with new int column
df_step = (
    df_step
    .drop("review_id")
    .withColumnRenamed("review_id_int", "review_id")
)

df_step.show(5)
df_step.printSchema()




+------+--------------------+-----------+-------+---------+
|rating|         review_text|review_date|helpful|review_id|
+------+--------------------+-----------+-------+---------+
|  NULL|                NULL|       NULL|   NULL|   140601|
|     5|Worst application...|       NULL|   NULL|   140602|
|  NULL|                   0|       NULL|   NULL|   140603|
|     1|first it shows 14...| 2025-09-29|      0|   140604|
|     1|very much delay i...| 2025-09-29|      0|   140605|
+------+--------------------+-----------+-------+---------+
only showing top 5 rows
root
 |-- rating: long (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- helpful: long (nullable = true)
 |-- review_id: integer (nullable = false)





In [0]:
from pyspark.sql.functions import col

df_clean = (
    df_step
    .withColumn("rating", col("rating").cast("double"))
    .withColumn("helpful", col("helpful").cast("int"))
)

df_clean.show(5)
df_clean.printSchema()




+------+--------------------+-----------+-------+---------+
|rating|         review_text|review_date|helpful|review_id|
+------+--------------------+-----------+-------+---------+
|  NULL|                NULL|       NULL|   NULL|   140601|
|   5.0|Worst application...|       NULL|   NULL|   140602|
|  NULL|                   0|       NULL|   NULL|   140603|
|   1.0|first it shows 14...| 2025-09-29|      0|   140604|
|   1.0|very much delay i...| 2025-09-29|      0|   140605|
+------+--------------------+-----------+-------+---------+
only showing top 5 rows
root
 |-- rating: double (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- helpful: integer (nullable = true)
 |-- review_id: integer (nullable = false)





In [0]:
from pyspark.sql.functions import year, date_format, when, size, split

# Year + month
df_clean = (
    df_clean
    .withColumn("review_year", year(col("review_date")))
    .withColumn("review_month", date_format(col("review_date"), "yyyy-MM"))
)

# Rating bucket
df_clean = df_clean.withColumn(
    "rating_category",
    when(col("rating") <= 2, "Low")
    .when(col("rating") == 3, "Medium")
    .otherwise("High")  # 4 and 5
)

# Helpful group
df_clean = df_clean.withColumn(
    "helpful_group",
    when(col("helpful") == 0, "No votes")
    .when(col("helpful") <= 2, "Low (1–2)")
    .when(col("helpful") <= 5, "Medium (3–5)")
    .otherwise("High (6+)")
)

# Review length (word count)
df_clean = df_clean.withColumn(
    "review_length_words",
    size(split(col("review_text"), " "))
)

df_clean.show(5)
df_clean.printSchema()




+------+--------------------+-----------+-------+---------+-----------+------------+---------------+-------------+-------------------+
|rating|         review_text|review_date|helpful|review_id|review_year|review_month|rating_category|helpful_group|review_length_words|
+------+--------------------+-----------+-------+---------+-----------+------------+---------------+-------------+-------------------+
|  NULL|                NULL|       NULL|   NULL|   140601|       NULL|        NULL|           High|    High (6+)|               NULL|
|   5.0|Worst application...|       NULL|   NULL|   140602|       NULL|        NULL|           High|    High (6+)|                 12|
|  NULL|                   0|       NULL|   NULL|   140603|       NULL|        NULL|           High|    High (6+)|                  1|
|   1.0|first it shows 14...| 2025-09-29|      0|   140604|       2025|     2025-09|            Low|     No votes|                 27|
|   1.0|very much delay i...| 2025-09-29|      0|   140



In [0]:
df_clean.write.mode("overwrite").saveAsTable("zomato_reviews_cluster.default.clean_zomato_reviews")

