# NYC Green Taxi
A simple program that downloads a sample from the "green taxi trip records" dataset from `azureml.opendatasets`, performs some transformations on its records and sends them as events to two Azure Event Hub instances.

**Requires the setup and configuration of two Azure Event Hubs to function.**


#### Links
1. [Microsoft documentation: NYC Taxi & Limousine Commission - green taxi trip
records
](https://docs.microsoft.com/en-us/azure/open-datasets/dataset-taxi-green?tabs=azureml-opendatasets)
2. [NYC Taxi & Limousine Commission: TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
3. [Microsoft documentation: Azure Event Hubs — A big data streaming platform and event ingestion service
](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-about)

## Install and import libraries
Note: You may need to restart your runtime -check the output.

In [None]:
!pip install azureml.opendatasets
!pip install azure-eventhub

In [1]:
from azureml.opendatasets import NycTlcGreen
from azure.eventhub import EventHubProducerClient, EventData
from azure.eventhub.exceptions import EventHubError
from datetime import datetime
from dateutil.relativedelta import relativedelta
import pandas as pd
from threading import *
from time import *

## Load green taxi dataset and convert it to Pandas dataframe
Set the `start`, `end` dates, number_of_months and `sample_size` variables depending on your scenario. From the month below you can pull at least one million records.

In [3]:
green_df_raw = pd.DataFrame([])
start = datetime.strptime("1/1/2016","%m/%d/%Y")
end = datetime.strptime("1/31/2016","%m/%d/%Y")

number_of_months = 1
sample_size = 1000

for sample_month in range(number_of_months):
    temp_df_green = NycTlcGreen(start + relativedelta(months=sample_month), end + relativedelta(months=sample_month)).to_pandas_dataframe()
    green_df_raw = green_df_raw.append(temp_df_green.sample(sample_size))

[Info] read from /tmp/tmpb8a7j1ku/https%3A/%2Fazureopendatastorage.azurefd.net/nyctlc/green/puYear=2016/puMonth=1/part-00119-tid-4753095944193949832-fee7e113-666d-4114-9fcb-bcd3046479f3-2689-1.c000.snappy.parquet


In [4]:
# print a sample of the extracted data
green_df_raw.head(10)

Unnamed: 0,vendorID,lpepPickupDatetime,lpepDropoffDatetime,passengerCount,tripDistance,puLocationId,doLocationId,pickupLongitude,pickupLatitude,dropoffLongitude,dropoffLatitude,rateCodeID,storeAndFwdFlag,paymentType,fareAmount,extra,mtaTax,improvementSurcharge,tipAmount,tollsAmount,ehailFee,totalAmount,tripType
256061,2,2016-01-20 18:44:28,2016-01-20 19:03:25,1,3.53,,,-73.986351,40.703751,-73.961197,40.718967,1,N,1,15.0,1.0,0.5,0.3,3.36,0.0,,20.16,1.0
1190321,2,2016-01-13 13:30:48,2016-01-13 13:33:21,1,0.52,,,-73.958008,40.681644,-73.966988,40.684036,1,N,2,4.0,0.0,0.5,0.3,0.0,0.0,,4.8,1.0
705096,2,2016-01-18 22:49:08,2016-01-18 23:04:43,1,4.27,,,-73.976616,40.683056,-74.012108,40.650417,1,N,2,15.0,0.5,0.5,0.3,0.0,0.0,,16.3,1.0
1288178,1,2016-01-02 21:04:59,2016-01-02 21:18:40,1,9.7,,,-73.928024,40.811672,-73.82505,40.867828,1,N,2,27.0,0.5,0.5,0.3,0.0,0.0,,28.3,1.0
610385,2,2016-01-17 10:31:47,2016-01-17 10:35:02,5,1.45,,,-73.942245,40.839844,-73.926857,40.844608,1,N,2,6.0,0.0,0.5,0.3,0.0,0.0,,6.8,1.0
322829,2,2016-01-28 12:13:26,2016-01-28 12:19:21,1,0.84,,,-73.991791,40.69091,-73.999596,40.683697,1,N,1,5.5,0.0,0.5,0.3,2.0,0.0,,8.3,1.0
1438152,2,2016-01-18 09:05:53,2016-01-18 09:09:54,1,0.86,,,-73.954704,40.733776,-73.952713,40.74263,1,N,1,5.5,0.0,0.5,0.3,1.26,0.0,,7.56,1.0
298684,2,2016-01-27 10:13:12,2016-01-27 10:17:22,1,0.45,,,-73.954796,40.701157,-73.952568,40.695034,1,N,1,4.5,0.0,0.5,0.3,0.0,0.0,,5.3,1.0
606368,2,2016-01-17 05:43:52,2016-01-17 06:09:48,1,17.59,,,-73.948021,40.804104,-73.78299,40.643871,2,N,2,52.0,0.0,0.5,0.3,0.0,5.54,,58.34,1.0
315643,2,2016-01-27 18:58:08,2016-01-27 19:00:47,1,0.43,,,-73.966393,40.804398,-73.961655,40.811031,1,N,2,4.0,1.0,0.5,0.3,0.0,0.0,,5.8,1.0


## Preprocess dataset
*   The dataframe's index is used to create the `recordId` column.
*   We create two dataframes, `dfpayment` which has payment related columns and `dffare` which has fare related columns.
* For each dataframe we create the `event` column which has the rest of the columns in JSON format so that they can be published to the Event Hub.
* For each dataframe we create the pKey column which is a concatenation of the `vendorID` and `recordId` columns and which will be used as  
* Finally we sort both dataframes by the `recordId` column. 

In [5]:
green_df_raw['recordId'] = green_df_raw.index
cols = ['vendorID','recordId']
dfpayment = green_df_raw.filter(['recordId','vendorID', 'rateCodeID','paymentType', 'fareAmount', 'extra', 'mtaTax','improvementSurcharge','tipAmount','tollsAmount','totalAmount'])
dfpayment['event'] = dfpayment.apply(lambda x: x.to_json(), axis=1)
dfpayment['pKey'] = dfpayment[cols].apply(lambda row: '-'.join(row.values.astype(str)), axis=1)
dfpayment = dfpayment.sort_values(by=['recordId'])
dffare = green_df_raw.filter(['recordId','vendorID','tripType','lpepPickupDatetime', 'lpepDropoffDatetime','passengerCount','tripDistance','puLocationId','doLocationId','pickupLongitude','pickupLatitude','dropoffLongitude','dropoffLatitude'])
dffare['event'] = dffare.apply(lambda x: x.to_json(), axis=1)
dffare['pKey'] = dffare[cols].apply(lambda row: '-'.join(row.values.astype(str)), axis=1)
dffare = dffare.sort_values(by=['recordId'])

In [6]:
# print a sample of the dffare dataset
dffare.head(5)

Unnamed: 0,recordId,vendorID,tripType,lpepPickupDatetime,lpepDropoffDatetime,passengerCount,tripDistance,puLocationId,doLocationId,pickupLongitude,pickupLatitude,dropoffLongitude,dropoffLatitude,event,pKey
294,294,2,1.0,2016-01-01 19:03:39,2016-01-01 19:11:51,1,1.04,,,-73.891754,40.748749,-73.889114,40.760555,"{""recordId"":294,""vendorID"":2,""tripType"":1.0,""l...",2-294
1909,1909,2,1.0,2016-01-01 19:36:58,2016-01-01 20:02:55,1,4.09,,,-73.952087,40.803623,-73.979523,40.754642,"{""recordId"":1909,""vendorID"":2,""tripType"":1.0,""...",2-1909
2003,2003,2,1.0,2016-01-01 19:48:15,2016-01-01 20:06:19,1,4.62,,,-73.942459,40.618969,-73.911072,40.674255,"{""recordId"":2003,""vendorID"":2,""tripType"":1.0,""...",2-2003
3505,3505,2,1.0,2016-01-01 20:28:47,2016-01-01 20:36:20,1,1.12,,,-73.897095,40.829681,-73.910408,40.8246,"{""recordId"":3505,""vendorID"":2,""tripType"":1.0,""...",2-3505
6811,6811,2,1.0,2016-01-01 21:58:14,2016-01-01 22:05:18,1,1.54,,,-73.965508,40.710602,-73.945572,40.725483,"{""recordId"":6811,""vendorID"":2,""tripType"":1.0,""...",2-6811


In [7]:
# print a sample of the dfpayment dataset
dfpayment.head(5)

Unnamed: 0,recordId,vendorID,rateCodeID,paymentType,fareAmount,extra,mtaTax,improvementSurcharge,tipAmount,tollsAmount,totalAmount,event,pKey
294,294,2,1,1,7.0,0.0,0.5,0.3,1.56,0.0,9.36,"{""recordId"":294,""vendorID"":2,""rateCodeID"":1,""p...",2-294
1909,1909,2,1,1,20.0,0.0,0.5,0.3,4.16,0.0,24.96,"{""recordId"":1909,""vendorID"":2,""rateCodeID"":1,""...",2-1909
2003,2003,2,1,1,17.5,0.0,0.5,0.3,3.66,0.0,21.96,"{""recordId"":2003,""vendorID"":2,""rateCodeID"":1,""...",2-2003
3505,3505,2,1,2,6.5,0.5,0.5,0.3,0.0,0.0,7.8,"{""recordId"":3505,""vendorID"":2,""rateCodeID"":1,""...",2-3505
6811,6811,2,1,1,7.0,0.5,0.5,0.3,1.66,0.0,9.96,"{""recordId"":6811,""vendorID"":2,""rateCodeID"":1,""...",2-6811


## Configure connection strings to Azure Event Hub
You need the Connection string–primary key and the event hub name for each Event Hub instance.

In [8]:
evh_fare_connection_string = "Endpoint=sb://evh-namespace-alpha.servicebus.windows.net/;SharedAccessKeyName=xxx;SharedAccessKey=dfdddddddsddffsdfsetggtr=;EntityPath=evh-taxi-fare-data"
evh_fare_event_hub_name = "evh-taxi-fare-data"
evh_payment_connection_string = "Endpoint=sb://evh-namespace-alpha.servicebus.windows.net/;SharedAccessKeyName=xxx;SharedAccessKey=dfdddddddsddffsdfsetggtr=;EntityPath=evh-taxi-payment-data"
evh_payment_event_hub_name = "evh-taxi-payment-data"

## Send taxi records to event hub
We have two scenarios:
* send the dataframe records one-by-one to the Event Hub, or
* send the dataframe records in batches of 1000 events (configurable with the `max_batch_size` variable below)

Note: In both cases we use the `pKey` value as partition key. You can remove the `partition_key` variable altogether to send events in round-robin fashion.

On Event Hub partitioning, see also: [Partitioning in Event Hubs and Kafka
](https://docs.microsoft.com/en-us/azure/architecture/reference-architectures/event-hubs/partitioning-in-event-hubs-and-kafka)

### Publish records one-by-one

In [9]:
class Producer(Thread):
    def publishSingleRecord(self, name, df,connection_str,evh_name):
        producer = EventHubProducerClient.from_connection_string(conn_str=connection_str,eventhub_name=evh_name)
        for index, row in df.iterrows():
            event_data_batch = producer.create_batch(partition_key = row['pKey'])
            event_data_batch.add(EventData(row['event']))
            producer.send_batch(event_data_batch)
        producer.close()

producer = Producer()

t1 = Thread(target=producer.publishSingleRecord, args=("fare data", dffare,evh_fare_connection_string,evh_fare_event_hub_name))
t1.start()

t2 = Thread(target=producer.publishSingleRecord, args=("payment data", dfpayment,evh_payment_connection_string,evh_payment_event_hub_name))
t2.start()

### Publish records in batches

In [None]:
# in batches of 500 records
max_batch_size = 1000

class Producer(Thread):
    def publishBatch(self, name, df,connection_str,evh_name):
        producer = EventHubProducerClient.from_connection_string(conn_str=connection_str,eventhub_name=evh_name)
        event_data_batch = None
        for index, row in df.iterrows():
            if event_data_batch is None:
                    event_data_batch = producer.create_batch(partition_key = row['pKey'])
            event_data = EventData(row['event'])
            event_data_batch.add(event_data)
            if len(event_data_batch) == max_batch_size:
                print('reached max batch size')
                producer.send_batch(event_data_batch)
                event_data_batch = None
        producer.close()

producer = Producer()

t1 = Thread(target=producer.publishBatch, args=("fare data", dffare,evh_fare_connection_string,evh_fare_event_hub_name))
t1.start()

t2 = Thread(target=producer.publishBatch, args=("payment data", dfpayment,evh_payment_connection_string,evh_payment_event_hub_name))
t2.start()