<a href="https://colab.research.google.com/github/suriarasai/BEAD2025/blob/main/colab/03b_DataMungingUsingFunctions_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Munging Examples
Data munging (or data wrangling) in PySpark for taxi booking services typically involves cleaning, transforming, and enriching raw data to make it suitable for analysis.
Here are some common data munging tasks with examples:

Install Spark

In [None]:
# install pyspark using pip
!pip install --ignore-install -q pyspark
# install findspark using pip
!pip install --ignore-install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## SparkSession
Start Spark Session

In [None]:
from pyspark.sql import SparkSession
# import collections
spark = SparkSession.builder.master("local").appName("My First PySpark App").getOrCreate()

## Drive
Connect to Google Drive

In [None]:
# to read in data from a text file, first upload the data file into your google drive and then mount your google drive onto colab
from google.colab import drive
# to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True)
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


## Taxi Data Munging
Data munging (or data wrangling) in PySpark for taxi booking services typically involves cleaning, transforming, and enriching raw data to make it suitable for analysis. Here are some common data munging tasks with examples:

### Handling Missing Values
Taxi booking datasets often contain missing values in fields like dropoff_location, fare_amount, etc.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("TaxiDataMunging").getOrCreate()

# Sample Data
data = [
    (1, "2025-02-18 10:00:00", "2025-02-18 10:30:00", None, 15.5),
    (2, "2025-02-18 11:00:00", None, "Downtown", 20.0),
    (3, None, "2025-02-18 12:00:00", "Airport", None),
]
columns = ["trip_id", "pickup_time", "dropoff_time", "dropoff_location", "fare_amount"]

df = spark.createDataFrame(data, columns)


# Fill missing values
In PySpark you can handle missing (null or NaN) values in a couple of ways:

1. Using the DataFrame’s fillna() Method
This method lets you replace missing values with a specified constant. You can either fill every column with the same value or pass a dictionary to specify different fill values per column.
2. Using the Imputer from pyspark.ml.feature
For numerical columns, you might prefer to impute missing values with a statistic such as the mean or median.

Both methods are commonly used depending on the nature of your data and the imputation strategy you need.

fillna() / na.fill(): Use these for simple replacements where a constant or specific value per column is appropriate.

Imputer: Best for numerical columns when you want a data-driven replacement (e.g., mean or median).

In [2]:
df_cleaned = df.fillna({"dropoff_location": "Unknown", "fare_amount": 0.0})

df_cleaned.show()


+-------+-------------------+-------------------+----------------+-----------+
|trip_id|        pickup_time|       dropoff_time|dropoff_location|fare_amount|
+-------+-------------------+-------------------+----------------+-----------+
|      1|2025-02-18 10:00:00|2025-02-18 10:30:00|         Unknown|       15.5|
|      2|2025-02-18 11:00:00|               NULL|        Downtown|       20.0|
|      3|               NULL|2025-02-18 12:00:00|         Airport|        0.0|
+-------+-------------------+-------------------+----------------+-----------+



# Converting Datetime Formats
Timestamps are usually in string format. We convert them to PySpark Timestamp for easy calculations. Below example converts time fields from string to timestamp for further processing.


In [3]:
from pyspark.sql.functions import to_timestamp

df = df.withColumn("pickup_time", to_timestamp(col("pickup_time")))
df = df.withColumn("dropoff_time", to_timestamp(col("dropoff_time")))

df.show()


+-------+-------------------+-------------------+----------------+-----------+
|trip_id|        pickup_time|       dropoff_time|dropoff_location|fare_amount|
+-------+-------------------+-------------------+----------------+-----------+
|      1|2025-02-18 10:00:00|2025-02-18 10:30:00|            NULL|       15.5|
|      2|2025-02-18 11:00:00|               NULL|        Downtown|       20.0|
|      3|               NULL|2025-02-18 12:00:00|         Airport|       NULL|
+-------+-------------------+-------------------+----------------+-----------+



# Filtering Invalid Data
Remove bookings with negative fares or missing crucial data. Below example keeps only valid trips where fare is positive and pickup time is not missing.


In [4]:
df_valid = df.filter((col("fare_amount") > 0) & col("pickup_time").isNotNull())
df_valid.show()


+-------+-------------------+-------------------+----------------+-----------+
|trip_id|        pickup_time|       dropoff_time|dropoff_location|fare_amount|
+-------+-------------------+-------------------+----------------+-----------+
|      1|2025-02-18 10:00:00|2025-02-18 10:30:00|            NULL|       15.5|
|      2|2025-02-18 11:00:00|               NULL|        Downtown|       20.0|
+-------+-------------------+-------------------+----------------+-----------+



# Feature Engineering
This example focuses on Trip Duration. The computation is in minutes.

In [8]:
from pyspark.sql.functions import unix_timestamp, col

df = df.withColumn("trip_duration_mins", \
                   (unix_timestamp(col("dropoff_time")) -  \
                    unix_timestamp(col("pickup_time"))) / 60)

df.show()


+-------+-------------------+-------------------+----------------+-----------+------------------+
|trip_id|        pickup_time|       dropoff_time|dropoff_location|fare_amount|trip_duration_mins|
+-------+-------------------+-------------------+----------------+-----------+------------------+
|      1|2025-02-18 10:00:00|2025-02-18 10:30:00|            NULL|       15.5|              30.0|
|      2|2025-02-18 11:00:00|               NULL|        Downtown|       20.0|              NULL|
|      3|               NULL|2025-02-18 12:00:00|         Airport|       NULL|              NULL|
+-------+-------------------+-------------------+----------------+-----------+------------------+



# Geospatial Filtering (Pickups in Specific Area)
Filter trips that started from a given region (e.g., Manhattan). Selects trips that ended in Changi.


In [10]:
df_filtered = df.filter(col("dropoff_location") == "Changi")
df_filtered.show()


+-------+-----------+------------+----------------+-----------+------------------+
|trip_id|pickup_time|dropoff_time|dropoff_location|fare_amount|trip_duration_mins|
+-------+-----------+------------+----------------+-----------+------------------+
+-------+-----------+------------+----------------+-----------+------------------+



# Aggregation
Summarize daily revenue. Calculates total fare collected per day.

In [11]:
from pyspark.sql.functions import date_format, sum

df_daily_fare = df.withColumn("date", date_format(col("pickup_time"), "yyyy-MM-dd")) \
                  .groupBy("date").agg(sum("fare_amount").alias("total_fare"))

df_daily_fare.show()


+----------+----------+
|      date|total_fare|
+----------+----------+
|2025-02-18|      35.5|
|      NULL|      NULL|
+----------+----------+



# Data Deduplication



In [12]:
df_deduped = df.dropDuplicates(["trip_id"])
df_deduped.show()

+-------+-------------------+-------------------+----------------+-----------+------------------+
|trip_id|        pickup_time|       dropoff_time|dropoff_location|fare_amount|trip_duration_mins|
+-------+-------------------+-------------------+----------------+-----------+------------------+
|      1|2025-02-18 10:00:00|2025-02-18 10:30:00|            NULL|       15.5|              30.0|
|      2|2025-02-18 11:00:00|               NULL|        Downtown|       20.0|              NULL|
|      3|               NULL|2025-02-18 12:00:00|         Airport|       NULL|              NULL|
+-------+-------------------+-------------------+----------------+-----------+------------------+



# Bucketing Fare Amounts
Categorize trips based on fare price. Groups fares into Low, Medium, and High categories.


In [16]:
from pyspark.sql.functions import when
df = df.withColumn("fare_category", when(col("fare_amount") < 10, "Low Fare") \
       .when((col("fare_amount") >= 10) & (col("fare_amount") < 30),  \
             "Medium Fare").otherwise("High Fare"))

df.show()


+-------+-------------------+-------------------+----------------+-----------+------------------+-------------+
|trip_id|        pickup_time|       dropoff_time|dropoff_location|fare_amount|trip_duration_mins|fare_category|
+-------+-------------------+-------------------+----------------+-----------+------------------+-------------+
|      1|2025-02-18 10:00:00|2025-02-18 10:30:00|            NULL|       15.5|              30.0|  Medium Fare|
|      2|2025-02-18 11:00:00|               NULL|        Downtown|       20.0|              NULL|  Medium Fare|
|      3|               NULL|2025-02-18 12:00:00|         Airport|       NULL|              NULL|    High Fare|
+-------+-------------------+-------------------+----------------+-----------+------------------+-------------+



# Join Data
Merge booking details with customer data.

In [17]:
customer_data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
customer_df = spark.createDataFrame(customer_data, ["customer_id", "customer_name"])

trip_data = [(1, 1, 20.5), (2, 2, 15.0), (3, 1, 30.0)]
trip_df = spark.createDataFrame(trip_data, ["trip_id", "customer_id", "fare_amount"])

df_joined = trip_df.join(customer_df, "customer_id", "inner")
df_joined.show()


+-----------+-------+-----------+-------------+
|customer_id|trip_id|fare_amount|customer_name|
+-----------+-------+-----------+-------------+
|          1|      1|       20.5|        Alice|
|          1|      3|       30.0|        Alice|
|          2|      2|       15.0|          Bob|
+-----------+-------+-----------+-------------+



# Save Data
Save the cleaned dataset as Parquet for efficient querying. Stores processed data in Parquet format.

In [18]:
df_cleaned.write.mode("overwrite").parquet("output/taxi_cleaned.parquet")

# Final Thoughts
These PySpark data munging techniques help clean, filter, and transform raw taxi booking data, making it ready for analytics and reporting.
Would you like an end-to-end PySpark pipeline for this? 🚀

Yes, you can install Python and PyCharm on a Windows machine and run PySpark code within a Podman Kubernetes cluster.
