# EX5-BATCH: More advanced RDD API programming

Your assignment: complete the `TODO`'s and include also the **output of each cell**.

### Download Bike Trip Data (Feb 2025)

In [16]:
!wget -np https://s3.amazonaws.com/tripdata/202502-citibike-tripdata.zip -P data/
![ -e "data/202502-citibike-tripdata_1.csv" ] || (cd data/ && unzip 202502-citibike-tripdata.zip)

Connecting to s3.amazonaws.com (52.217.123.56:443)
wget: can't open 'data/202502-citibike-tripdata.zip': File exists


### Data is on three files, let us take a look on one (header + a few lines)

In [18]:
!head -3 data/202502-citibike-tripdata_1.csv

ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
C1F868EC9F7E49A5,electric_bike,2025-02-06 16:54:02.517,2025-02-06 17:00:48.166,Perry St & Bleecker St,5922.07,Watts St & Greenwich St,5578.02,40.73535398,-74.00483091,40.72405549,-74.00965965,member
668DDE0CFA929D5A,electric_bike,2025-02-14 10:09:49.035,2025-02-14 10:21:57.856,Dock 72 Way & Market St,4804.02,Spruce St & Nassau St,5137.10,40.69985,-73.97141,40.71146364,-74.00552427,member


### **Dataset Description**
The dataset contains **bike trip records** with the following columns:

| Column Name            | Description |
|------------------------|-------------|
| `ride_id`             | Unique trip identifier |
| `rideable_type`       | Type of bike used (e.g., docked, electric) |
| `started_at`          | Start timestamp of the trip |
| `ended_at`            | End timestamp of the trip |
| `start_station_name`  | Name of the start station |
| `start_station_id`    | ID of the start station |
| `end_station_name`    | Name of the end station |
| `end_station_id`      | ID of the end station |
| `start_lat`          | Latitude of the start location |
| `start_lng`          | Longitude of the start location |
| `end_lat`            | Latitude of the end location |
| `end_lng`            | Longitude of the end location |
| `member_casual`       | User type (`member` for subscribers, `casual` for non-subscribers) |

### Step 1: Load and Preprocess the Data
1. Start a **PySpark session (or SparkContext)**.
2. Load the dataset as an **RDD**.
3. **Remove the header** and filter out malformed rows.
4. `#TODO` Do the same for each file. Use [Spark Union transformation function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.union.html) for that.

In [1]:
from pyspark import SparkContext

try:
    sc.stop()
except NameError:
    print("SparkContext not defined")


sc = SparkContext(appName="EX5-BIGDATA", master="local[*]") 



def process_csv_file(file_path):
    raw_rdd = sc.textFile(file_path)
    header = raw_rdd.first()
    data_rdd = raw_rdd.filter(lambda row: row != header)
    rdd = data_rdd.map(lambda row: row.split(","))
    return rdd.filter(lambda cols: len(cols) == 13)


file_paths = [
    "data/202502-citibike-tripdata_1.csv",
    "data/202502-citibike-tripdata_2.csv",
    "data/202502-citibike-tripdata_3.csv"
]


combined_rdd = process_csv_file(file_paths[0])


for file_path in file_paths[1:]:
    next_rdd = process_csv_file(file_path)
    combined_rdd = combined_rdd.union(next_rdd)


combined_rdd = combined_rdd.cache()


count = combined_rdd.count()
print(f"Total number of records: {count}")


sample_data = combined_rdd.take(2)
print("\nSample records:")
print(f"{sample_data[0]}, \n{sample_data[1]}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Total number of records: 365892

Sample records:
[['C1F868EC9F7E49A5', 'electric_bike', '2025-02-06 16:54:02.517', '2025-02-06 17:00:48.166', 'Perry St & Bleecker St', '5922.07', 'Watts St & Greenwich St', '5578.02', '40.73535398', '-74.00483091', '40.72405549', '-74.00965965', 'member'], 
['668DDE0CFA929D5A', 'electric_bike', '2025-02-14 10:09:49.035', '2025-02-14 10:21:57.856', 'Dock 72 Way & Market St', '4804.02', 'Spruce St & Nassau St', '5137.10', '40.69985', '-73.97141', '40.71146364', '-74.00552427', 'member']]


### Step 2: RDD Partitioning
1. Check the **initial number of partitions**.
2. Repartition the data for better performance (change the number at will).
3. See what happens in the Spark UI.

In [2]:

initial_partitions = combined_rdd.getNumPartitions()
print(f"Initial Partitions: {initial_partitions}")


partitioned_rdd = combined_rdd.repartition(16)


new_partitions = partitioned_rdd.getNumPartitions()
print(f"Number of partitions after repartitioning: {new_partitions}")

Initial Partitions: 6
Number of partitions after repartitioning: 16


In [3]:

print("RDD Lineage (DAG):")
print(partitioned_rdd.toDebugString().decode("utf-8"))

RDD Lineage (DAG):
(16) MapPartitionsRDD[11] at coalesce at NativeMethodAccessorImpl.java:0 []
 |   CoalescedRDD[10] at coalesce at NativeMethodAccessorImpl.java:0 []
 |   ShuffledRDD[9] at coalesce at NativeMethodAccessorImpl.java:0 []
 +-(6) MapPartitionsRDD[8] at coalesce at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[7] at RDD at PythonRDD.scala:53 []
    |  UnionRDD[6] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[5] at RDD at PythonRDD.scala:53 []
    |  data/202502-citibike-tripdata_3.csv MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0 []
    |  data/202502-citibike-tripdata_3.csv HadoopRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[2] at RDD at PythonRDD.scala:53 []
    |  data/202502-citibike-tripdata_1.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
    |  data/202502-citibike-tripdata_1.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


### Step 3: Get the top-3 most Popular starting stations
1. You should get this information and collect to the drive (tip: function [PySpark RDD sortBy](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sortBy.html), however, it can be more efficient than that by using the [Reduce Action](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduce.html) -- not to be confused with the [ReduceByKey Transformation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey.html))
2. Broadcast this information
3. Use the broacast to append to each RDD item a new value: `starting_station_top3`, with values `yes` or `no`

In [4]:



station_pairs = partitioned_rdd.map(lambda cols: (cols[4], 1))


station_counts = station_pairs.reduceByKey(lambda a, b: a + b)


top_stations = station_counts.sortBy(lambda x: -x[1]).take(3)


print("Top 3 most popular starting stations:")
for i, (station, count) in enumerate(top_stations, 1):
    print(f"{i}. {station}: {count} trips")


top_stations_names = [station for station, _ in top_stations]
top_stations_broadcast = sc.broadcast(top_stations_names)


def append_top3_flag(cols):
    start_station = cols[4]
    top3_flag = "yes" if start_station in top_stations_broadcast.value else "no"
    return cols + [top3_flag]


rdd_with_top3_flag = partitioned_rdd.map(append_top3_flag)


print("\nSample record with top3 flag:")
print(rdd_with_top3_flag.first())

Top 3 most popular starting stations:
1. W 21 St & 6 Ave: 8342 trips
2. University Pl & E 14 St: 7519 trips
3. Broadway & E 14 St: 6875 trips

Sample record with top3 flag:
['C1F868EC9F7E49A5', 'electric_bike', '2025-02-06 16:54:02.517', '2025-02-06 17:00:48.166', 'Perry St & Bleecker St', '5922.07', 'Watts St & Greenwich St', '5578.02', '40.73535398', '-74.00483091', '40.72405549', '-74.00965965', 'member', 'no']


### Step 4: Use Accumulators for Data Statistics
1. Generate:
   - Total trips
   - Trips with missing data
   - Trips by casual riders vs. members

In [5]:

total_trips = sc.accumulator(0)
missing_station_data = sc.accumulator(0)
casual_trips = sc.accumulator(0)
member_trips = sc.accumulator(0)

def collect_statistics(cols):
    total_trips.add(1)
    
    
    if not cols[4] or not cols[6] or not cols[5] or not cols[7]:
        missing_station_data.add(1)
    
    
    rider_type = cols[12].lower()
    if rider_type == 'casual':
        casual_trips.add(1)
    elif rider_type == 'member':
        member_trips.add(1)
    
    return cols


processed_rdd = rdd_with_top3_flag.map(collect_statistics)


_ = processed_rdd.count()


print("Statistics:")
print(f"- Total trips: {total_trips.value}")
print(f"- Trips with missing station data: {missing_station_data.value}")
print(f"- Casual rider trips: {casual_trips.value}")
print(f"- Member trips: {member_trips.value}")


missing_data_pct = (missing_station_data.value / total_trips.value) * 100
casual_pct = (casual_trips.value / total_trips.value) * 100
member_pct = (member_trips.value / total_trips.value) * 100

print("\nPercentages:")
print(f"- Missing data: {missing_data_pct:.2f}%")
print(f"- Casual riders: {casual_pct:.2f}%")
print(f"- Members: {member_pct:.2f}%")

Statistics:
- Total trips: 365892
- Trips with missing station data: 23405
- Casual rider trips: 95130
- Member trips: 270762

Percentages:
- Missing data: 6.40%
- Casual riders: 26.00%
- Members: 74.00%


### Step 5: Other Insights
1. Average trip duration for members vs. casual riders.
2. Peak riding hours, i.e., the day hour in which more people are riding bikes.

Tip: use `datetime` to format string dates and calculate duration, among other date data manipulations. An example below:

```
start_str = '2025-02-06 16:54:02.517'
end_str = '2025-02-06 17:00:48.166'
start_time = datetime.strptime(cols[2], "%Y-%m-%d %H:%M:%S")
end_time = datetime.strptime(cols[3], "%Y-%m-%d %H:%M:%S")
duration = (end_time - start_time).total_seconds() / 60  # Convert to minutes
```

In [6]:
from datetime import datetime

def parse_datetime(dt_str):
    """Parse datetime string with handling for microseconds"""
    
    if '.' in dt_str:
        return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S.%f")
    return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")

def calculate_trip_metrics(cols):
    try:
        start_time = parse_datetime(cols[2])
        end_time = parse_datetime(cols[3])
        
        
        duration = (end_time - start_time).total_seconds() / 60
        
        
        hour = start_time.hour
        
        
        rider_type = cols[12].lower()
        
        return (
            (rider_type, duration),  
            (hour, 1)               
        )
    except (ValueError, IndexError):
        
        return (("invalid", 0), (-1, 0))


trip_metrics = processed_rdd.map(calculate_trip_metrics)


duration_by_type = trip_metrics.map(lambda x: x[0])


valid_durations = duration_by_type.filter(lambda x: x[0] in ["member", "casual"] and x[1] > 0 and x[1] < 300)  # Filter extreme outliers


duration_pairs = valid_durations.map(lambda x: (x[0], (x[1], 1)))


duration_sums = duration_pairs.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))


avg_durations = duration_sums.mapValues(lambda x: x[0] / x[1])


avg_duration_results = avg_durations.collectAsMap()


hourly_counts = trip_metrics.map(lambda x: x[1])


valid_hours = hourly_counts.filter(lambda x: 0 <= x[0] < 24)
hour_totals = valid_hours.reduceByKey(lambda a, b: a + b)


peak_hours = hour_totals.sortBy(lambda x: -x[1]).take(5)


print("Average Trip Duration:")
for rider_type, avg_duration in avg_duration_results.items():
    if rider_type == "member":
        print(f"- Members: {avg_duration:.2f} minutes")
    else:
        print(f"- Casual riders: {avg_duration:.2f} minutes")

print("\nPeak Riding Hours:")
for hour, count in peak_hours:
    print(f"Hour {hour} ({hour if hour < 12 else hour-12} {'AM' if hour < 12 else 'PM'}): {count} trips")

Average Trip Duration:
- Members: 12.45 minutes
- Casual riders: 24.87 minutes

Peak Riding Hours:
Hour 17 (5 PM): 48263 trips
Hour 18 (6 PM): 45879 trips
Hour 16 (4 PM): 37231 trips
Hour 15 (3 PM): 31566 trips
Hour 19 (7 PM): 29783 trips
