In [None]:
!pip install apache-beam[gcp]


Collecting apache-beam[gcp]
  Downloading apache_beam-2.59.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[gcp])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam[gcp])
  Downloading orjson-3.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam[gcp])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro<2,>=0.23.

In [None]:
!pip install kaggle
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json




In [None]:
!kaggle datasets download -d nagasai524/nyc-taxi-trip-records-from-jan-2023-to-jun-2023
!unzip nyc-taxi-trip-records-from-jan-2023-to-jun-2023.zip -d data/

Dataset URL: https://www.kaggle.com/datasets/nagasai524/nyc-taxi-trip-records-from-jan-2023-to-jun-2023
License(s): U.S. Government Works
Downloading nyc-taxi-trip-records-from-jan-2023-to-jun-2023.zip to /content
 99% 404M/407M [00:06<00:00, 56.2MB/s]
100% 407M/407M [00:06<00:00, 67.9MB/s]
Archive:  nyc-taxi-trip-records-from-jan-2023-to-jun-2023.zip
  inflating: data/nyc_yellow_taxi_trip_records_from_Jan_to_Aug_2023.csv  


In [None]:
import pandas as pd

df_sample = pd.read_csv('/content/data/nyc_yellow_taxi_trip_records_from_Jan_to_Aug_2023.csv', usecols=['tpep_pickup_datetime', 'trip_distance', 'fare_amount'], nrows=1000)

df_sample.to_csv('nyc_taxi_sample.csv', index=False)


In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def format_taxi_data(element):
    """Splits CSV line into fields and returns a dictionary."""
    fields = element.split(',')
    return {
        'pickup_datetime': fields[0],
        'trip_distance': float(fields[1]),
        'fare_amount': float(fields[2])
    }

class FilterShortTrips(beam.DoFn):
    """Filter trips shorter than 1 mile."""
    def process(self, element):
        if element['trip_distance'] >= 1.0:
            yield element

pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:

    taxi_data = (p
                 | 'Read CSV File' >> beam.io.ReadFromText('nyc_taxi_sample.csv', skip_header_lines=1)
                 | 'Format Taxi Data' >> beam.Map(format_taxi_data)  # Composite Transform
                )

    filtered_trips = (taxi_data
                      | 'Filter Short Trips' >> beam.ParDo(FilterShortTrips())
                     )

    windowed_trips = (filtered_trips
                      | 'Window' >> beam.WindowInto(beam.window.FixedWindows(60))  # 60-second windows for demonstration
                     )

    output = (windowed_trips
              | 'Format Output' >> beam.Map(lambda x: f"Distance: {x['trip_distance']}, Fare: {x['fare_amount']}")
              | 'Write Output' >> beam.io.WriteToText('output/filtered_trips.txt')
             )

p.run().wait_until_finish()






'DONE'

In [None]:
import apache_beam.transforms.window as window

with beam.Pipeline(options=pipeline_options) as p:

    taxi_data = (p
                 | 'Read CSV File' >> beam.io.ReadFromText('nyc_taxi_sample.csv', skip_header_lines=1)
                 | 'Format Taxi Data' >> beam.Map(format_taxi_data)
                )

    windowed_trips = (taxi_data
                      | 'Window' >> beam.WindowInto(beam.window.FixedWindows(60),  # 60-second windows
                                                    trigger=beam.trigger.AfterProcessingTime(10),
                                                    accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
                     )

    output = (windowed_trips
              | 'Format Output' >> beam.Map(lambda x: f"Distance: {x['trip_distance']}, Fare: {x['fare_amount']}")
              | 'Write Output' >> beam.io.WriteToText('output/triggered_trips.txt')
             )

p.run().wait_until_finish()




'DONE'