In [None]:
# query for cell below
# change tenant_id and timestamp

"""
SELECT "tenant_id",
         from_iso8601_timestamp(timestamp) AS "event_datetime",
         "object_data-vin" as "vin",
         "object_data-is_available" as "is_available",
         "object_data-last_known_latitude" as "lat",
         "object_data-last_known_longitude" as "lng"
FROM "data_lake_us_prod"."sa_object_changed"
WHERE "name" = 'VEHICLE_UPDATE'
        AND tenant_id = 'darwin-prod'
        AND timestamp > '2019-05-01'
        AND timestamp < '2019-05-15'
order by timestamp;
"""

In [2]:
# settings
region = {
     'oakland': dict(
         x_min=-13618976.4221,
         x_max=-13605638.1607,
         y_min=4549035.0828,
         y_max=4564284.2700,
         timezone='America/Los_Angeles'),
     'madrid': dict(
         x_min=-416448.0394,
         x_max=-406912.5201,
         y_min=4921025.4356,
         y_max=4931545.0816,
         timezone='Europe/Madrid')
}

In [2]:
# generates dataframe of vehicle availability merged with aggregate dow/hour availability
import pandas as pd
import numpy as np
import calendar
from copy import deepcopy

DATAFILE = 'vehicle_data_eiffel_2019_04.csv'
REGION_TIMEZONE = region['madrid']['timezone']
DT_COLS = ['event_datetime']

# create multi-index and multi-index dataframe
mi = pd.MultiIndex.from_product([list(calendar.day_name), list(range(0, 24))], names=['dow', 'hour'])
base_series = pd.Series(index=mi).fillna(value=0)
mi_df = pd.DataFrame(columns=mi)


def convert_datetime_columns(df, columns):
    for col in columns:
        df[col] = df[col].dt.tz_localize('UTC').dt.tz_convert(REGION_TIMEZONE)

        
def collapse_is_available_events(group):
    global supply_df
    group = group.sort_values(by='event_datetime')

    # get time of change of states
    
    # get event_datetime when is_available goes from true to false (becomes unavailable)
    # previous event (is_available=True) changed state (is_available=False), indicating becoming unavailable
    left = group[(group['is_available'] == False) & (group['is_available'].shift() == True)].rename(
        columns={'event_datetime':'unavailable_at'})
        
    # get event_datetime when is_available goes from false to true (becomes available)
    # previous event (is_available=False) changed state (is_available=True), indicating becoming available
    right = group[(group['is_available'] == True) & (group['is_available'].shift() == False)].rename(
        columns={'event_datetime':'available_at'})['available_at'].to_frame()

    # can't assume symmetry for events
    # can't tell which event comes first
    merged_group = pd.merge_asof(left, right, left_on='unavailable_at', right_on='available_at')
    supply_df = supply_df.append(merged_group)
    
    
# construct large dow/hour df
# NOTE: very expensive. should save intermediates so don't have to regenerate
def extractor(x):
    global mi_df
    temp = deepcopy(base_series)
    # duration less than 1 hour, does span across slice (hour) ex: [1:30, 2:15]
    if x.size == 2 and x[0].hour != x[1].hour:  
        temp[x[0].day_name(), x[0].hour] += 60 - x[0].minute
        temp[x[1].day_name(), x[1].hour] += x[1].minute

    # duration less than 1 hour, doesn't span across slice (hour) ex: [1:30, 1:45]
    elif x.size == 2 and x[0].hour == x[1].hour:
        temp[x[0].day_name(), x[0].hour] += x[1].minute - x[0].minute
  
    # duration greater than 1 hour, does span across slice (hour) ex: [1:30, 2:30, 2:45]
    elif x.size == 3 and x[1].hour == x[2].hour:
        temp[x[0].day_name(), x[0].hour] += 60 - x[0].minute
        temp[x[2].day_name(), x[2].hour] += x[2].minute
  
    else:
        # duration greater than 2 hours, ex: [1:30, 2:30, 3:30, 3:45]
        # or spans across multiple hours
        n = 0
        min_marker = x[0].minute
        for i, j, k in zip(x.day_name(), x.hour, x.minute):
            # each datetimeindex
            if n == 0: # first element => 60 - 30 = 30
                temp[i, j] += (60 - k)
            elif n == (x.size - 1):  # last element, can't assume full hour
                if k >= min_marker:
                    temp[i, j] += (k - min_marker) # ex: 3:45 - 3:30 = 15m
                else:
                    temp[i, j] += k  # ex: 3:30 - 3:00 = 30m
            elif n == (x.size - 2):  # second to last element, can't assume full hour
                temp[i, j] += k  # ex: 3:30 - 3:00 = 30m
            else:  # middle of array
                temp[i, j] += 60 # ex: 3:30 - 2:30 = 1h
            n += 1
    mi_df = mi_df.append(temp, ignore_index=True)


# get df and clean up
vehicle_df = pd.read_csv(
    DATAFILE,
    parse_dates=['event_datetime'],
    infer_datetime_format=True
).dropna()

convert_datetime_columns(vehicle_df, DT_COLS)

# group by vin
vehicle_df = vehicle_df.groupby(['vin'])
supply_df = pd.DataFrame()

vehicle_df.apply(collapse_is_available_events)
supply_df = supply_df.dropna()

# supply_df['unavailable_at'] = supply_df['event_datetime']
# supply_df.drop(['event_datetime'], axis=1)

supply_df.reset_index(inplace=True)
supply_df['idle_duration'] = supply_df['unavailable_at'] - supply_df['available_at']  # duration for analysis
supply_df['idle_duration_minutes'] = supply_df['idle_duration'].dt.total_seconds()/60.0

# create datetimeindex of periods with the end datetime appended
df = supply_df.apply(
    lambda x: (pd.date_range(x['available_at'], x['unavailable_at'], freq='H', closed='left')).append(
        pd.to_datetime([x['unavailable_at']])), axis=1)

df.apply(extractor)

# merge the big dow/hour mask back with vehicle_update data
supply_df = supply_df.merge(mi_df, left_index=True, right_index=True)
supply_df.to_csv(f'{DATAFILE.split(".")[0]}_with_dow_hour_mask.csv')
supply_df

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3267, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-b0a14141cf17>", line 113, in <module>
    df.apply(extractor)
  File "/anaconda3/lib/python3.7/site-packages/pandas/core/series.py", line 3194, in apply
    mapped = lib.map_infer(values, f, convert=convert_dtype)
  File "pandas/_libs/src/inference.pyx", line 1472, in pandas._libs.lib.map_infer
  File "<ipython-input-2-b0a14141cf17>", line 82, in extractor
    mi_df = mi_df.append(temp, ignore_index=True)
  File "/anaconda3/lib/python3.7/site-packages/pandas/core/frame.py", line 6211, in append
    sort=sort)
  File "/anaconda3/lib/python3.7/site-packages/pandas/core/reshape/concat.py", line 226, in concat
    return op.get_result()
  File "/anaconda3/lib/python3.7/site-packages/pandas/core/reshape/concat.py", line 423, in get_result
    copy=self.copy)
  File "/anaconda

KeyboardInterrupt: 

In [9]:
# spot checks (dependent on above)

print(supply_df.iloc[0]['vin'])
print(supply_df.iloc[0]['available_at'])
print(supply_df.iloc[0]['unavailable_at'])

print(supply_df.iloc[1]['vin'])
print(supply_df.iloc[1]['available_at'])
print(supply_df.iloc[1]['unavailable_at'])

print(supply_df.iloc[50961]['vin'])
print(supply_df.iloc[50961]['available_at'])
print(supply_df.iloc[50961]['unavailable_at'])

print(supply_df.iloc[50988]['vin'])
print(supply_df.iloc[50988]['available_at'])
print(supply_df.iloc[50988]['unavailable_at'])

1G1FX6S08J4138281
2019-04-03 13:57:20.267000-07:00
2019-04-03 14:55:38.815000-07:00
1G1FX6S08J4138281
2019-04-03 15:12:08.695000-07:00
2019-04-04 09:08:48.304000-07:00
JTDKDTB3XJ1606732
2019-04-15 22:52:15.782000-07:00
2019-04-15 22:52:37.183000-07:00
JTDKDTB3XJ1606732
2019-04-26 15:09:59.559000-07:00
2019-04-29 06:13:42.903000-07:00


In [None]:
# query for rental time, rental location, and total_to_charge information
# used in cell below
# change the tenant_id and timestamp
'''
SELECT "object_data-rental_id" as "rental_id",
          "object_data-customer_id" as "customer_id",
          "object_data-rental_reserved_at" as "reserved_at",
          "object_data-rental_booked_at" as "booked_at",
          "object_data-rental_ended_at" as "ended_at",
          "object_data-start_location_latitude" as "start_location_lat",
          "object_data-start_location_longitude" as "start_location_lng",
          "object_data-end_location_latitude" as "end_location_lat",
          "object_data-end_location_longitude" as "end_location_lng",
          payments.total_to_charge
 FROM "data_lake_us_prod"."sa_object_changed"
 JOIN (
   select "object_data-rental_id" as "rental_id",
          "object_data-total_to_charge" as "total_to_charge"
   from "data_lake_us_prod"."sa_object_changed"
   where tenant_id = 'darwin-prod'
   and name = 'PAYMENT_LIFECYCLE'
   and timestamp > '2019-05-01'
   and timestamp < '2019-05-15'
   group by "object_data-rental_id", "object_data-total_to_charge"
   having "object_data-total_to_charge" is not null) as payments on payments.rental_id = "object_data-rental_id"
 WHERE tenant_id = 'darwin-prod'
         AND name = 'RENTAL_LIFECYCLE'
         AND timestamp > '2019-05-01'
         AND timestamp < '2019-05-15'
 GROUP BY
     "object_data-rental_id",
     "object_data-customer_id",
     "object_data-start_location_latitude",
     "object_data-start_location_longitude",
     "object_data-end_location_latitude",
     "object_data-end_location_longitude",
     "object_data-rental_reserved_at",
     "object_data-rental_booked_at",
     "object_data-rental_ended_at",
     payments.total_to_charge
 HAVING "object_data-start_location_latitude" is NOT null
         AND "object_data-start_location_longitude" is NOT null
         AND "object_data-end_location_latitude" is NOT null
         AND "object_data-end_location_longitude" is NOT null
         AND length(split(cast("object_data-end_location_latitude" as varchar), '.')[2]) > 4
         AND length(split(cast("object_data-end_location_longitude" as varchar), '.')[2]) > 4
         AND length(split(cast("object_data-start_location_latitude" as varchar), '.')[2]) > 4
         AND length(split(cast("object_data-start_location_longitude" as varchar), '.')[2]) > 4
'''


'''
SELECT "object_data-rental_id" as "rental_id",
          "object_data-customer_id" as "customer_id",
          "object_data-rental_reserved_at" as "reserved_at",
          "object_data-rental_booked_at" as "booked_at",
          "object_data-rental_ended_at" as "ended_at",
          "object_data-start_location_latitude" as "start_location_lat",
          "object_data-start_location_longitude" as "start_location_lng",
          "object_data-end_location_latitude" as "end_location_lat",
          "object_data-end_location_longitude" as "end_location_lng",
          payments.total_to_charge,
          payments.credit_amount_used
 FROM (select distinct "object_data-rental_id" as "rental_id"
       from "data_lake_us_prod"."sa_object_changed"
       where "object_data-rental_reserved_at" > '2019-05-01'
       and "object_data-rental_reserved_at" < '2019-05-02'
      ) as rentals_list
 JOIN "data_lake_us_prod"."sa_object_changed"
 ON "object_data-rental_id" = rentals_list.rental_id
 JOIN (select "object_data-rental_id" as "rental_id",
          "object_data-total_to_charge" as "total_to_charge",
          "object_data-credit_amount_used" as "credit_amount_used"
       from "data_lake_us_prod"."sa_object_changed"
       where tenant_id = 'darwin-prod'
       and name = 'PAYMENT_LIFECYCLE'
       group by "object_data-rental_id", "object_data-total_to_charge", "object_data-credit_amount_used"
       having "object_data-total_to_charge" is not null
      ) as payments 
 ON payments.rental_id = rentals_list.rental_id
 WHERE tenant_id = 'darwin-prod'
         AND name = 'RENTAL_LIFECYCLE'
 GROUP BY
     "object_data-rental_id",
     "object_data-customer_id",
     "object_data-start_location_latitude",
     "object_data-start_location_longitude",
     "object_data-end_location_latitude",
     "object_data-end_location_longitude",
     "object_data-rental_reserved_at",
     "object_data-rental_booked_at",
     "object_data-rental_ended_at",
     payments.total_to_charge,
     payments.credit_amount_used
 HAVING "object_data-start_location_latitude" is NOT null
         AND "object_data-start_location_longitude" is NOT null
         AND "object_data-end_location_latitude" is NOT null
         AND "object_data-end_location_longitude" is NOT null
         AND length(split(cast("object_data-end_location_latitude" as varchar), '.')[2]) > 4
         AND length(split(cast("object_data-end_location_longitude" as varchar), '.')[2]) > 4
         AND length(split(cast("object_data-start_location_latitude" as varchar), '.')[2]) > 4
         AND length(split(cast("object_data-start_location_longitude" as varchar), '.')[2]) > 4
'''

In [3]:
# formats rental datafile for data visualization
# rental datafile must have 

# create df from selected start and end hours
# bin the rental start positions
import pandas as pd

# get the rentals file
DATAFILE = 'rental_data_darwin_2019_05_01_2019_05_15.csv'
REGION_TIMEZONE = region['oakland']['timezone']
DT_COLS = ['reserved_at', 'booked_at', 'ended_at']

def convert_datetime_columns(df, columns):
    for col in columns:
        df[col] = df[col].dt.tz_localize('UTC').dt.tz_convert(REGION_TIMEZONE)

# get df and clean up
rental_df = pd.read_csv(
    DATAFILE,
    parse_dates=DT_COLS,
    infer_datetime_format=True
).dropna()

convert_datetime_columns(rental_df, DT_COLS)

# extract the rental start dow/hour
rental_df['reserved_at_hour'] = rental_df['reserved_at'].dt.hour
rental_df['reserved_at_dow'] = rental_df['reserved_at'].dt.day_name()

rental_df.to_csv(f'{DATAFILE.split(".")[0]}_with_dow_hour_mask.csv')

In [None]:
# query for getting appopens data
"""
select distinct 
"data-customer-id",
"session_id",
from_iso8601_timestamp("metadata-recorder_received_at") as "event_datetime",
cast(from_iso8601_timestamp("metadata-recorder_received_at") as date) as "date",
extract(hour from(cast(from_iso8601_timestamp("metadata-recorder_received_at") as time))) as "hour",
extract(minute from(cast(from_iso8601_timestamp("metadata-recorder_received_at") as time))) as "minute",
"data-number_vehicles_within_quarter_mile",
"data-number_vehicles_within_mile",
"data-user_location-lat",
"data-user_location-lng" 
 
from data_lake_eu_prod.ma_user_activity

where "metadata-recorder_received_at" > '2019-05-10 00:00:00.000 UTC'
and "name" not like '%NETWORK%'
and "name" not like '%UNKNOWN%'
and "name" in ('VEHICLE_AVAILABILITY')
and ("data-number_vehicles_within_quarter_mile" is null or "data-number_vehicles_within_quarter_mile" > 0)
order by "session_id", event_datetime
"""

In [None]:
# nearest vehicle app data
"""
select
from_iso8601_timestamp(timestamp) AS "event_datetime",
"data-distance",
"data-user_location-lat",
"data-user_location-lng",
"data-vehicle_latitude",
"data-vehicle_longitude"
from data_lake_us_prod.ma_user_activity
where tenant_id = 'darwin-prod'
and name = 'DISTANCE_NEAREST_VEHICLE'
and timestamp > '2019-05-01'
and timestamp < '2019-05-15';
"""

In [11]:
import pandas as pd

# constants
miles_per_meter = 0.000621371
rental_distance_threshold = 0.25

def convert_datetime_columns(df, columns):
    for col in columns:
        df[col] = df[col].dt.tz_localize('UTC').dt.tz_convert(REGION_TIMEZONE)

# load the file
DT_COLS = ['event_datetime']
datafile = 'nearest_vehicle_app_data_darwin_2019_05_01_2019_05_15.csv'
df = pd.read_csv(
        datafile,
        parse_dates=DT_COLS,
        infer_datetime_format=True
    ).dropna()

convert_datetime_columns(df, DT_COLS)

# create new column to test if within threshold
df['vehicle_nearby'] = (df['data-distance'] * miles_per_meter) < rental_distance_threshold

df['event_hour'] = df['event_datetime'].dt.hour
df['event_dow'] = df['event_datetime'].dt.day_name()

df.to_csv(f'{datafile.split(".")[0]}_with_threshold_mask.csv')