In [1]:
# First, cleanly stop your current session
%stop_session

# Then start a new session with all configurations at once
%start_session \
    --glue_version 3.0 \
    --idle_timeout 2880 \
    --worker_type G.1X \
    --number_of_workers 5 \
    --additional_python_modules "geopy==2.4.1" \
    --conf "{\"spark.pyspark.virtualenv.enabled\":\"true\"}"

Stopping session: c68c1ef4-b277-4de9-b200-93f65f2d3f5f
Stopped session.


UsageError: Line magic function `%start_session` not found.


Trying to create a Glue session for the kernel.
Session Type: etl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 3e13ad92-2a48-40b7-9f05-93c2db1eede4
Applying the following default arguments:
--glue_kernel_version 1.0.9
--enable-glue-datacatalog true
--additional-python-modules geopy==2.4.1
Waiting for session 3e13ad92-2a48-40b7-9f05-93c2db1eede4 to get into ready status...
Session 3e13ad92-2a48-40b7-9f05-93c2db1eede4 has been created.
IndentationError: unexpected indent (<stdin>, line 1)


In [30]:
!pip install gpxpy 
%pip install geopy

Note: you may need to restart the kernel to use updated packages.


In [3]:
import sys
print(sys.executable)

print(sys.path)


/usr/bin/python3
['/tmp', '/tmp/spark-f03ee63e-31b2-48af-ade2-53b6dcd76dd1/userFiles-25bbc46f-9001-4762-a8cc-1e143ad1b8cd', '/opt/amazon/spark/python/lib/pyspark.zip', '/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip', '/opt/amazon/lib/python3.6/site-packages', '/usr/lib64/python37.zip', '/usr/lib64/python3.7', '/usr/lib64/python3.7/lib-dynload', '/home/spark/.local/lib/python3.7/site-packages', '/usr/lib64/python3.7/site-packages', '/usr/lib/python3.7/site-packages']


In [4]:
# CORRECTED IMPORTS
import pandas as pd
import numpy as np
from geopy.distance import geodesic
from geopy.geocoders import Nominatim  # Fixed spelling
import time  # Fixed import name
geolocator = Nominatim(user_agent="OLABikes")




In [5]:
import boto3
import pandas as pd
import io

# Initialize boto3 client if not already done
s3 = boto3.client('s3')

bucket_name = 'demand-prediction-ola-rides'        # Replace with your bucket
file_key = 'processed/preprocessed_11.csv.gz'  # Your gzip file key

# Get the object from S3
response = s3.get_object(Bucket=bucket_name, Key=file_key)

# Read the compressed data from the response body
compressed_body = response['Body'].read()

# Convert bytes data to a BytesIO buffer
buffer = io.BytesIO(compressed_body)

# Read CSV with gzip compression
df = pd.read_csv(buffer, compression='gzip')

# Preview
print(df.head())


                    ts  number   pick_lat  ...  month  year  dayofweek
0  2020-03-26 07:07:17   14626  12.313621  ...      3  2020          3
1  2020-03-26 07:32:27   85490  12.943947  ...      3  2020          3
2  2020-03-26 07:36:44    5408  12.899603  ...      3  2020          3
3  2020-03-26 07:38:00   58940  12.918229  ...      3  2020          3
4  2020-03-26 07:39:29    5408  12.899490  ...      3  2020          3

[5 rows x 12 columns]


#### Reading Data from previous checkpoint

## Data Cleaning with Business Understanding

### There can be cases when a user requests a ride, and their booking request is logged in our database but this user re-books his/her ride due to longer wait hours or driver refused booking or user by mistake added wrong pickup or drop locations. 

<hr>

### `Handle Case1 Rebooking Again to Same Location`: Keep only one request of same user to same pickup latitude longitude in 1hour time frame of first ride request.

<hr>

### `Handle Case2 Location entry mistake`: Keep only last request of user within 8mintues of first booking request.
#### A person booking a ride would generally book a ride that would take 8mins of bike ride time. 
#### Also, Calculate distance b/w pickup and drop. Based on distance and request time different remove bad data entries.

#### `Handle Case2.1`: Pick Up and Drop Lat-Long Distance less than 50meters = 0.05 kms; No user would like to ride for just 50meters trip. 

<hr>

### `Handle Case3`: Booking Location Outside operation zone of OLABikes
#### Check lat-long bounding box coordinates

In [6]:
df['ts'] = pd.to_datetime(df['ts'])
df.sort_values(by = ['number','ts'], inplace = True)
df.reset_index(inplace = True)




In [7]:
# you need convert first to numpy array by values and cast to int64 - output is in ns, so need divide by 10 ** 9

df['booking_timestamp'] = df.ts.values.astype(np.int64)// 10 ** 9




In [8]:
df.head(50)

      index                  ts  number  ...  year  dayofweek  booking_timestamp
0   2374378 2020-10-10 07:34:16      -1  ...  2020          5         1602315256
1   2405894 2020-10-11 08:23:42      -1  ...  2020          6         1602404622
2   2405895 2020-10-11 08:23:50      -1  ...  2020          6         1602404630
3   2405896 2020-10-11 08:23:51      -1  ...  2020          6         1602404631
4   2405897 2020-10-11 08:23:54      -1  ...  2020          6         1602404634
5   2405898 2020-10-11 08:23:56      -1  ...  2020          6         1602404636
6   2406076 2020-10-11 11:57:17      -1  ...  2020          6         1602417437
7   2406077 2020-10-11 11:57:31      -1  ...  2020          6         1602417451
8   2500477 2020-10-16 17:51:07      -1  ...  2020          4         1602870667
9   2500478 2020-10-16 17:51:25      -1  ...  2020          4         1602870685
10  2694503 2020-10-30 09:00:44      -1  ...  2020          4         1604048444
11  3358326 2020-11-27 20:16

In [9]:
df['shift_booking_ts'] = df.groupby('number')['booking_timestamp'].shift(1)
df['shift_booking_ts'].fillna(0, inplace = True)




In [10]:
df['shift_booking_ts'] = df['shift_booking_ts'].astype('int64')




In [11]:
df['booking_time_diff_hr'] = round((df['booking_timestamp'] - df['shift_booking_ts'])//3600)
df['booking_time_diff_min'] = round((df['booking_timestamp'] - df['shift_booking_ts'])//60)




In [34]:
##Booking time different in mins
#df['booking_time_diff_min'].value_counts().to_dict()

In [35]:
##Booking time different in hours
#df['booking_time_diff_hr'].value_counts().to_dict()

### We observe that there are 8315382 - 4335828 = 39,79,554 booking that happen in less than 1 hour of request by a user

In [13]:
len(df)

8315382


In [14]:
### Handling Case 1: Re-booking Again to Same Location within 1hour by same user

df = df[~((df.duplicated(subset=['number','pick_lat','pick_lng'],keep=False)) & (df.booking_time_diff_hr<=1))]




In [15]:
## Before removing Row Count
len(df)

4335828


###  Removed 3979554 rows in `Case1` we now have 4335828

In [None]:
import boto3
import io

# Define your S3 bucket and object key
bucket_name = 'demand-prediction-ola-rides'
s3_key = 'processed/preprocessed_2.csv.gz'

# Convert DataFrame to GZIP-compressed CSV in-memory
buffer = io.BytesIO()
df.to_csv(buffer, index=False, compression='gzip')
buffer.seek(0)  # Important: reset buffer position

# Upload to S3
s3 = boto3.client('s3')
s3.put_object(Bucket=bucket_name, Key=s3_key, Body=buffer.getvalue())

print("GZIP-compressed CSV uploaded to S3 successfully!")

GZIP-compressed CSV uploaded to S3 successfully!


### Handling Case2: One user Books rides are different lat-long within 8mins time (ride time + driver arrival time)
#### Fraud User
#### Human error booking

In [23]:
import boto3
import pandas as pd
import io

# Initialize boto3 client if not already done
s3 = boto3.client('s3')

bucket_name = 'demand-prediction-ola-rides'        # Replace with your bucket
file_key = 'processed/preprocessed_2.csv.gz'  # Your gzip file key

# Get the object from S3
response = s3.get_object(Bucket=bucket_name, Key=file_key)

# Read the compressed data from the response body
compressed_body = response['Body'].read()

# Convert bytes data to a BytesIO buffer
buffer = io.BytesIO(compressed_body)

# Read CSV with gzip compression
df = pd.read_csv(buffer, compression='gzip')

# Preview
print(df.head())


     index                   ts  ...  booking_time_diff_hr  booking_time_diff_min
0  2374378  2020-10-10 07:34:16  ...                445087               26705254
1  2405894  2020-10-11 08:23:42  ...                    24                   1489
2  2406076  2020-10-11 11:57:17  ...                     3                    213
3  2500477  2020-10-16 17:51:07  ...                   125                   7553
4  2694503  2020-10-30 09:00:44  ...                   327                  19629

[5 rows x 17 columns]


In [24]:
print("Number of rides booked by same customer within 8mins time: {}".format(len(df[(df.booking_time_diff_min<8)])))
df = df[(df.booking_time_diff_min>=8)]

Number of rides booked by same customer within 8mins time: 0


### Assuming earth as ellipsoids, calculating geodesic distance b/w pickup and drop latitude and longitude

The geodesic distance is the shortest distance on the surface of an ellipsoidal model of the earth. The default algorithm uses the method is given by [Karney (2013)](https://link.springer.com/article/10.1007/s00190-012-0578-z) (geodesic); this is accurate to round-off and always converges.


In [37]:
import numpy as np

# Define fast vectorized Haversine function
def haversine_np(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in km
    lat1 = np.radians(lat1)
    lon1 = np.radians(lon1)
    lat2 = np.radians(lat2)
    lon2 = np.radians(lon2)

    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2)**2
    c = 2 * np.arcsin(np.sqrt(a))

    return np.round(R * c, 2)

# Apply it to your DataFrame
df['geodesic_distance'] = haversine_np(
    df['pick_lat'].values,
    df['pick_lng'].values,
    df['drop_lat'].values,
    df['drop_lng'].values
)





In [39]:
# %time 
# from geopy.distance import geodesic

# def geodestic_distance(pick_lat, pick_lng, drop_lat, drop_lng):
#     # 1mile = 1.60934 Kms
#     return round(geodesic((pick_lat, pick_lng), (drop_lat, drop_lng)).miles*1.60934,2)

# df['geodesic_distance'] = np.vectorize(geodestic_distance)(df['pick_lat'],df['pick_lng'],df['drop_lat'],df['drop_lng'])

##### Number of rides booked but same customer within 8mins time: 875816

In [38]:
df[df['geodesic_distance']<=0.5]['geodesic_distance'].value_counts()

0.50    1197
0.48    1083
0.47    1082
0.49    1061
0.46    1014
0.45     882
0.44     882
0.43     841
0.39     771
0.41     761
0.40     754
0.06     750
0.42     741
0.37     666
0.38     662
0.07     644
0.36     640
0.08     601
0.33     590
0.34     579
0.35     576
0.32     552
0.09     546
0.31     516
0.10     503
0.11     489
0.30     488
0.29     483
0.12     455
0.28     442
0.27     438
0.26     433
0.05     431
0.14     413
0.17     404
0.20     400
0.21     398
0.22     396
0.24     385
0.13     382
0.18     367
0.23     362
0.15     362
0.19     355
0.16     344
0.25     344
Name: geodesic_distance, dtype: int64


### Handle Case 2.1: Removing ride request less than 0.05 miles = 50meters

In [39]:
print("Number of Rides Requests less than 50meters: {}".format(len(df[df.geodesic_distance<=0.05])))

Number of Rides Requests less than 50meters: 431


In [40]:
df = df[df.geodesic_distance>0.05]
df

           index                   ts  ...  booking_time_diff_min  geodesic_distance
0        2374378  2020-10-10 07:34:16  ...               26705254              17.40
1        2405894  2020-10-11 08:23:42  ...                   1489               4.08
2        2406076  2020-10-11 11:57:17  ...                    213               4.05
3        2500477  2020-10-16 17:51:07  ...                   7553               3.62
4        2694503  2020-10-30 09:00:44  ...                  19629               3.10
...          ...                  ...  ...                    ...                ...
3723932  5768115  2021-02-12 19:37:11  ...                   1498               3.79
3723933  6102760  2021-02-19 20:43:25  ...                  10146              11.88
3723934  6137206  2021-02-20 17:34:45  ...                   1251               2.17
3723935  6555089  2021-02-27 08:26:23  ...                   9531               4.56
3723936  6888302  2021-03-04 19:13:13  ...                   7846

In [41]:
len(df)

3709482


In [42]:
df.to_csv('./../Data/preprocessed_3.csv',index = False, compression = 'gzip')

FileNotFoundError: [Errno 2] No such file or directory: './../Data/preprocessed_3.csv'


### Handle Case3: Rides request in non-operational regions
OLA Bikes OPERATION CITY (Bangalore)

### Ride requests due to some bug or crash in app.
<hr>

#### India: 'boundingbox': ['6.2325274', '35.6745457', '68.1113787', '97.395561']
#### Bangalore:'boundingbox': ['12.8340125', '13.1436649', '77.4601025', '77.7840515']
#### Karnataka: 'boundingbox': ['11.5945587', '18.4767308', '74.0543908', '78.588083']
Source: openstreetmap

In [43]:
df = pd.read_csv('./../Data/preprocessed_3.csv', compression = 'gzip')
location = geolocator.geocode("India")
location.raw

FileNotFoundError: [Errno 2] No such file or directory: './../Data/preprocessed_3.csv'


In [44]:
df

           index                   ts  ...  booking_time_diff_min  geodesic_distance
0        2374378  2020-10-10 07:34:16  ...               26705254              17.40
1        2405894  2020-10-11 08:23:42  ...                   1489               4.08
2        2406076  2020-10-11 11:57:17  ...                    213               4.05
3        2500477  2020-10-16 17:51:07  ...                   7553               3.62
4        2694503  2020-10-30 09:00:44  ...                  19629               3.10
...          ...                  ...  ...                    ...                ...
3723932  5768115  2021-02-12 19:37:11  ...                   1498               3.79
3723933  6102760  2021-02-19 20:43:25  ...                  10146              11.88
3723934  6137206  2021-02-20 17:34:45  ...                   1251               2.17
3723935  6555089  2021-02-27 08:26:23  ...                   9531               4.56
3723936  6888302  2021-03-04 19:13:13  ...                   7846

In [45]:
## How many rides outside india?
df[(df.pick_lat<=6.2325274) | (df.pick_lat>=35.6745457) | (df.pick_lng<=68.1113787) | (df.pick_lng>=97.395561) | (df.drop_lat<=6.2325274) | (df.drop_lat>=35.6745457) | (df.drop_lng<=68.1113787) | (df.drop_lng>=97.395561)]

           index                   ts  ...  booking_time_diff_min  geodesic_distance
4279      742031  2020-06-22 07:19:06  ...                  31271            8665.98
13163    2502259  2020-10-16 21:52:28  ...                      8           12168.03
14602    4344697  2021-01-07 13:17:11  ...                   3039           12643.28
20744    8297478  2021-03-26 20:16:01  ...                   1530            4041.87
26980    1786595  2020-09-09 09:36:43  ...               26660736            8666.62
...          ...                  ...  ...                    ...                ...
3679044  1985134  2020-09-21 14:40:51  ...                   5724            8671.97
3684907   834555  2020-07-02 08:40:16  ...                  31249           14216.92
3686028  2331458  2020-10-06 18:12:35  ...                   2895            2477.85
3695919  2747830  2020-11-02 18:57:02  ...                    561            8663.80
3715826  5191270  2021-02-01 18:32:03  ...                     10

### OLA Bikes is only operational in India
### Removing all rides for which pickup or drop is outside INDIA.
#### Number of such cases: 642

In [46]:
df.reset_index(inplace = True, drop = True)
outside_India = df[(df.pick_lat<=6.2325274) | (df.pick_lat>=35.6745457) | (df.pick_lng<=68.1113787) | (df.pick_lng>=97.395561) | (df.drop_lat<=6.2325274) | (df.drop_lat>=35.6745457) | (df.drop_lng<=68.1113787) | (df.drop_lng>=97.395561)]
df = df[~df.index.isin(outside_India.index)].reset_index(drop = True)




In [47]:
df

           index                   ts  ...  booking_time_diff_min  geodesic_distance
0        2374378  2020-10-10 07:34:16  ...               26705254              17.40
1        2405894  2020-10-11 08:23:42  ...                   1489               4.08
2        2406076  2020-10-11 11:57:17  ...                    213               4.05
3        2500477  2020-10-16 17:51:07  ...                   7553               3.62
4        2694503  2020-10-30 09:00:44  ...                  19629               3.10
...          ...                  ...  ...                    ...                ...
3708835  5768115  2021-02-12 19:37:11  ...                   1498               3.79
3708836  6102760  2021-02-19 20:43:25  ...                  10146              11.88
3708837  6137206  2021-02-20 17:34:45  ...                   1251               2.17
3708838  6555089  2021-02-27 08:26:23  ...                   9531               4.56
3708839  6888302  2021-03-04 19:13:13  ...                   7846

In [48]:
print("Number of Good Ride Requests: {}".format(len(df)))

Number of Good Ride Requests: 3708840


In [49]:
## How many pickups and drops are outside bangalore?
pck_outside_bng = df[(df.pick_lat<=12.8340125) | (df.pick_lat>=13.1436649) | (df.pick_lng<=77.4601025) | (df.pick_lng>=77.7840515)]
drp_outside_bng = df[(df.drop_lat<=12.8340125) | (df.drop_lat>=13.1436649) | (df.drop_lng<=77.4601025) | (df.drop_lng>=77.7840515)]
print("Number of Pickup Requests Outside Bangalore: ",len(pck_outside_bng))
print("Number of Customers pickup outside Bangalore: ",len(np.unique(pck_outside_bng['number'].values)))

print("Number of Drops Requests Outside Bangalore: ",len(drp_outside_bng))
print("Number of Customers Drop outside Bangalore: ",len(np.unique(drp_outside_bng['number'].values)))

Number of Pickup Requests Outside Bangalore:  155908
Number of Customers pickup outside Bangalore:  20473
Number of Drops Requests Outside Bangalore:  167338
Number of Customers Drop outside Bangalore:  26878


In [50]:
### Bounding PickUp Lat-Long Within State Karnataka
# ['11.5945587', '18.4767308', '74.0543908', '78.588083']
pck_outside_KA = df[(df.pick_lat<=11.5945587) | (df.pick_lat>=18.4767308) | (df.pick_lng<=74.0543908) | (df.pick_lng>=78.588083)]
drp_outside_KA = df[(df.drop_lat<=11.5945587) | (df.drop_lat>=18.4767308) | (df.drop_lng<=74.0543908) | (df.drop_lng>=78.588083)]
print("Pickups Outisde KA: {} \nDrop outside KA: {}".format(len(pck_outside_KA),len(drp_outside_KA)))
print("Number of Customers Drop outside KA: ",len(np.unique(drp_outside_KA['number'].values)))
print("Number of Customers pickup outside KA: ",len(np.unique(pck_outside_KA['number'].values)))

Pickups Outisde KA: 38807 
Drop outside KA: 39585
Number of Customers Drop outside KA:  6917
Number of Customers pickup outside KA:  6302


In [51]:
total_ride_outside_KA = df[(df.pick_lat<=11.5945587) | (df.pick_lat>=18.4767308) | (df.pick_lng<=74.0543908) | (df.pick_lng>=78.588083) | (df.drop_lat<=11.5945587) | (df.drop_lat>=18.4767308) | (df.drop_lng<=74.0543908) | (df.drop_lng>=78.588083)]




In [52]:
print("Total Ride Outside Karnataka: {}".format(len(total_ride_outside_KA)))

Total Ride Outside Karnataka: 39632


### Total Ride Outside Karnataka: 39632
### OLA Bikes doesnot provide intercity requests. Considering these as system error requests

Source: https://www.olacabs.com/

In [53]:
## Rides for which geodesic distance > 500kms
## Pickup and drop not of KA (state where we have maximum booking requests and user base)
suspected_bad_rides = total_ride_outside_KA[total_ride_outside_KA.geodesic_distance > 500]
suspected_bad_rides

           index                   ts  ...  booking_time_diff_min  geodesic_distance
105      4765341  2021-01-21 23:18:35  ...                    852            1061.67
7368     1756248  2020-09-08 14:49:08  ...                   1327            1588.02
38148     586293  2020-06-04 18:13:54  ...                    459             986.24
40544     122197  2020-04-07 11:09:24  ...                   2585             839.86
40560     288777  2020-04-29 23:11:41  ...                    698            1369.50
...          ...                  ...  ...                    ...                ...
3681751   770076  2020-06-25 11:32:12  ...                   1137            1772.80
3681752  1086664  2020-07-29 15:29:45  ...                  49196            1779.54
3685652  4377242  2021-01-09 14:10:14  ...                    238            1726.83
3685653  4377307  2021-01-09 14:57:02  ...                     46            1726.84
3699369   723201  2020-06-20 15:09:58  ...                  50741

### There are 506 rides which are >500kms geodesic distance and are pickup & drop outside KA, these are suspected rides. 

In [54]:
df = df[~df.index.isin(suspected_bad_rides.index)].reset_index(drop = True)




In [55]:
print("Number of Good Ride Requests: {}".format(len(df)))

Number of Good Ride Requests: 3708333


In [56]:
dataset = df[['ts', 'number', 'pick_lat','pick_lng','drop_lat','drop_lng','geodesic_distance','hour','mins','day','month','year','dayofweek','booking_timestamp','booking_time_diff_hr', 'booking_time_diff_min']]





In [65]:
import io
import boto3

# Assuming df is your DataFrame and s3 client and bucket_name defined

buffer = io.BytesIO()
df.to_csv(buffer, index=False, compression='gzip')  # <-- Add compression here
buffer.seek(0)  # Reset pointer to start

# Upload to S3
s3.put_object(Bucket=bucket_name, Key='processed/clean_data.csv.gz', Body=buffer.getvalue())

print("In-memory gzip-compressed CSV uploaded successfully!")


In-memory gzip-compressed CSV uploaded successfully!


In [57]:
dataset.to_csv('./../Data/clean_data.csv',index = False, compression = 'gzip')

FileNotFoundError: [Errno 2] No such file or directory: './../Data/clean_data.csv'


## Some Stats: Of 1year of Ride Requests Data at OLA Bikes
### Ride request of same user with same timestamp: 113540
### None user_id: 116
### Number of requests to same pickup lat-long by a user within 1hour: 39,79,554
### Number of rides by a user within 8mins of booking to different pickup lat-long: 611891
### Number of Rides Requests less than 50meters of pickup and drop: 14460
### Number of Rides pickup or drop lat-long outside India: 642

### Our majority ride state is Karnataka
#### Total Ride Outside Karnataka (pickup or drop): 39632

### Rides which are outside KA and pickup to drop distance is >500kms: 506

## Number of Good Ride Requests: 3708329