In [1]:
import numpy as np
import pandas as pd
import googlemaps
import requests
from geopy.distance import geodesic
from tqdm.auto import tqdm
import datetime

tqdm.pandas()

In [2]:
try:
    # import the all sheet from the pickle file
    dict_sheets = pd.read_pickle('./Dataset for the project dvsa1203.ods/dvsa1203.pkl')
except FileNotFoundError:
    # import the all sheet from the ods file
    dict_sheets = pd.read_excel('./Dataset for the project dvsa1203.ods/dvsa1203.ods', engine='odf', sheet_name=None)
    # save the dict_sheets as a pickle file
    pd.to_pickle(dict_sheets, './Dataset for the project dvsa1203.ods/dvsa1203.pkl')


In [3]:
dfs = []

for sheet_name in list(dict_sheets.keys())[1:]:

    # get the dataframe
    df = dict_sheets[sheet_name].copy()

    # drop the first 6 rows
    df = df.drop(df.index[0:6])
    df = df.reset_index(drop=True)

    # 2014-15 have some total nan rows
    df = df.dropna(axis=1, how="all")

    # rename the columns
    df.columns = ["Location", "Age", "Male Conducted", "Male Passes", "Male Pass rate", "Female Conducted",
                  "Female Passes", "Female Pass rate", "Total Conducted", "Total Passes", "Total Pass rate"]

    # get the first column
    location_col = df["Location"].to_list()
    for idx in range(len(location_col)):
        if pd.isna(location_col[idx]):
            location_col[idx] = location_col[idx - 1]

    df["Location"] = location_col

    # delete the invalid rows
    df = df.loc[df["Age"].isin([str(_) for _ in range(17, 26)])]

    # add the year column
    df["Year"] = int(sheet_name.split("-")[0])

    # except the first column all the columns are numeric, if cannot convert to numeric, then set it to NaN
    for col in df.columns[1:]:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    # add the gender columns
    df1 = df[["Location", "Year", "Age", "Male Conducted", "Male Passes", "Male Pass rate"]]
    df2 = df[["Location", "Year", "Age", "Female Conducted", "Female Passes", "Female Pass rate"]]
    df3 = df[["Location", "Year", "Age", "Total Conducted", "Total Passes", "Total Pass rate"]]

    df1.columns = ["Location", "Year", "Age", "Conducted", "Passes", "Pass rate"]
    df2.columns = ["Location", "Year", "Age", "Conducted", "Passes", "Pass rate"]
    df3.columns = ["Location", "Year", "Age", "Conducted", "Passes", "Pass rate"]

    df1.insert(2, "Gender", value="Male")
    df2.insert(2, "Gender", value="Female")
    df3.insert(2, "Gender", value="Total")

    # merge the two dataframes
    df = pd.concat([df1, df2, df3], ignore_index=True)

    df["Pass rate"] = df["Pass rate"] / 100

    dfs.append(df)

# concatenate all the dataframes
df = pd.concat(dfs, ignore_index=True)

df

Unnamed: 0,Location,Year,Gender,Age,Conducted,Passes,Pass rate
0,Aberdeen North,2022,Male,17.0,387.0,222.0,0.573643
1,Aberdeen North,2022,Male,18.0,220.0,106.0,0.481818
2,Aberdeen North,2022,Male,19.0,128.0,67.0,0.523438
3,Aberdeen North,2022,Male,20.0,119.0,61.0,0.512605
4,Aberdeen North,2022,Male,21.0,106.0,46.0,0.433962
...,...,...,...,...,...,...,...
62986,Z Sheffield (Parkway) Closed,2013,Total,21.0,85.0,49.0,0.576471
62987,Z Sheffield (Parkway) Closed,2013,Total,22.0,77.0,34.0,0.441558
62988,Z Sheffield (Parkway) Closed,2013,Total,23.0,68.0,36.0,0.529412
62989,Z Sheffield (Parkway) Closed,2013,Total,24.0,67.0,27.0,0.402985


In [4]:
# # save the cleaned dataframe as a pickle file
# pd.to_pickle(df, './Dataset for the project dvsa1203.ods/dvsa1203_cleaned.pkl')
# 
# # save the cleaned dataframe as a  excel file
# df.to_excel('./Dataset for the project dvsa1203.ods/dvsa1203_cleaned.xlsx',index=False)


# Test center

In [5]:
df_test_center = pd.DataFrame({"Test Center": df["Location"].unique()})
df_test_center

Unnamed: 0,Test Center
0,Aberdeen North
1,Aberdeen South (Cove)
2,Abergavenny
3,Aberystwyth (Park Avenue)
4,Airdrie
...,...
452,Sheffield(Handsworth)
453,Stoke-On-Trent(Cobridge)
454,Stoke-on-Trent(Newcastle-Under-Lyme)
455,Weston-Super-Mare


## Get the pass rate of each test center

In [6]:
df_pRate = df[["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate", "Location": "Test Center"})

df_pRate22 = df.loc[df["Year"] == 2022][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate22", "Location": "Test Center"})
df_pRate21 = df.loc[df["Year"] == 2021][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate21", "Location": "Test Center"})
df_pRate20 = df.loc[df["Year"] == 2020][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate20", "Location": "Test Center"})
df_pRate19 = df.loc[df["Year"] == 2019][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate19", "Location": "Test Center"})
df_pRate18 = df.loc[df["Year"] == 2018][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate18", "Location": "Test Center"})
df_pRate17 = df.loc[df["Year"] == 2017][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate17", "Location": "Test Center"})
df_pRate16 = df.loc[df["Year"] == 2016][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate16", "Location": "Test Center"})
df_pRate15 = df.loc[df["Year"] == 2015][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate15", "Location": "Test Center"})
df_pRate14 = df.loc[df["Year"] == 2014][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate14", "Location": "Test Center"})
df_pRate13 = df.loc[df["Year"] == 2013][["Location", "Pass rate"]].groupby("Location").mean().rename(
    columns={"Pass rate": "pRate13", "Location": "Test Center"})

# join the dataframes
df_test_center = df_test_center.join(df_pRate, on="Test Center")
df_test_center = df_test_center.join(df_pRate22, on="Test Center")
df_test_center = df_test_center.join(df_pRate21, on="Test Center")
df_test_center = df_test_center.join(df_pRate20, on="Test Center")
df_test_center = df_test_center.join(df_pRate19, on="Test Center")
df_test_center = df_test_center.join(df_pRate18, on="Test Center")
df_test_center = df_test_center.join(df_pRate17, on="Test Center")
df_test_center = df_test_center.join(df_pRate16, on="Test Center")
df_test_center = df_test_center.join(df_pRate15, on="Test Center")
df_test_center = df_test_center.join(df_pRate14, on="Test Center")
df_test_center = df_test_center.join(df_pRate13, on="Test Center")

df_test_center

Unnamed: 0,Test Center,pRate,pRate22,pRate21,pRate20,pRate19,pRate18,pRate17,pRate16,pRate15,pRate14,pRate13
0,Aberdeen North,0.540643,0.497256,0.586460,0.595685,0.525232,0.554287,,,,0.486457,
1,Aberdeen South (Cove),0.555662,0.559935,0.610141,0.627207,0.578401,0.535264,,,,0.492954,0.485734
2,Abergavenny,0.587358,0.594071,0.606086,0.605538,0.543415,0.563244,,,,0.586518,0.612634
3,Aberystwyth (Park Avenue),0.563824,0.625290,0.614179,0.574907,0.509206,0.495951,,,,,
4,Airdrie,0.478275,0.498094,0.537110,0.489113,0.452222,0.469523,,,,0.431492,0.470373
...,...,...,...,...,...,...,...,...,...,...,...,...
452,Sheffield(Handsworth),0.547450,,,,,,,,,,0.547450
453,Stoke-On-Trent(Cobridge),0.484902,,,,,,,,,,0.484902
454,Stoke-on-Trent(Newcastle-Under-Lyme),0.472919,,,,,,,,,,0.472919
455,Weston-Super-Mare,0.532939,,,,,,,,,,0.532939


## Get the lat,lng of test center

In [7]:
def get_search_name(name):
    # delete the first letter if it is z or Z
    if name[0] in ["z", "Z"]:
        name = name[1:]

    # Removal of close and Closed
    name = name.replace("closed", " ")
    name = name.replace("Closed", " ")

    # Removal of ( and )
    name = name.replace("(", " ")
    name = name.replace(")", " ")

    # name += " driving test center,UK"
    # remove the duplicate spaces
    name = " ".join(name.split())

    return "Driving test center, " + name + ", UK"


df_test_center["Searching Name"] = df_test_center["Test Center"].apply(get_search_name)
df_test_center

Unnamed: 0,Test Center,pRate,pRate22,pRate21,pRate20,pRate19,pRate18,pRate17,pRate16,pRate15,pRate14,pRate13,Searching Name
0,Aberdeen North,0.540643,0.497256,0.586460,0.595685,0.525232,0.554287,,,,0.486457,,"Driving test center, Aberdeen North, UK"
1,Aberdeen South (Cove),0.555662,0.559935,0.610141,0.627207,0.578401,0.535264,,,,0.492954,0.485734,"Driving test center, Aberdeen South Cove, UK"
2,Abergavenny,0.587358,0.594071,0.606086,0.605538,0.543415,0.563244,,,,0.586518,0.612634,"Driving test center, Abergavenny, UK"
3,Aberystwyth (Park Avenue),0.563824,0.625290,0.614179,0.574907,0.509206,0.495951,,,,,,"Driving test center, Aberystwyth Park Avenue, UK"
4,Airdrie,0.478275,0.498094,0.537110,0.489113,0.452222,0.469523,,,,0.431492,0.470373,"Driving test center, Airdrie, UK"
...,...,...,...,...,...,...,...,...,...,...,...,...,...
452,Sheffield(Handsworth),0.547450,,,,,,,,,,0.547450,"Driving test center, Sheffield Handsworth, UK"
453,Stoke-On-Trent(Cobridge),0.484902,,,,,,,,,,0.484902,"Driving test center, Stoke-On-Trent Cobridge, UK"
454,Stoke-on-Trent(Newcastle-Under-Lyme),0.472919,,,,,,,,,,0.472919,"Driving test center, Stoke-on-Trent Newcastle-..."
455,Weston-Super-Mare,0.532939,,,,,,,,,,0.532939,"Driving test center, Weston-Super-Mare, UK"


In [8]:
def find_place(search_name):
    gmaps = googlemaps.Client(key='AIzaSyDlGirEWPGlPZKeftEvII-D_Xyx8BAxpOA')
    find_place_result = gmaps.find_place(input=search_name, input_type="textquery", fields=["geometry/location"])
    # if not find the place
    if find_place_result["status"] != "OK":
        search_name = search_name.replace("Driving test center, ", "")
        find_place_result = gmaps.find_place(input=search_name, input_type="textquery", fields=["geometry/location"])

    assert find_place_result["status"] == "OK"
    return find_place_result["candidates"][0]["geometry"]["location"]


find_place_result = {_: find_place(_) for _ in tqdm(df_test_center["Searching Name"].unique())}

  0%|          | 0/418 [00:00<?, ?it/s]

In [9]:
df_test_center['lat'] = df_test_center["Searching Name"].apply(lambda x: find_place_result[x]['lat'])
df_test_center['lng'] = df_test_center["Searching Name"].apply(lambda x: find_place_result[x]['lng'])
df_test_center

Unnamed: 0,Test Center,pRate,pRate22,pRate21,pRate20,pRate19,pRate18,pRate17,pRate16,pRate15,pRate14,pRate13,Searching Name,lat,lng
0,Aberdeen North,0.540643,0.497256,0.586460,0.595685,0.525232,0.554287,,,,0.486457,,"Driving test center, Aberdeen North, UK",57.145050,-2.090507
1,Aberdeen South (Cove),0.555662,0.559935,0.610141,0.627207,0.578401,0.535264,,,,0.492954,0.485734,"Driving test center, Aberdeen South Cove, UK",57.088442,-2.108061
2,Abergavenny,0.587358,0.594071,0.606086,0.605538,0.543415,0.563244,,,,0.586518,0.612634,"Driving test center, Abergavenny, UK",51.816040,-3.010646
3,Aberystwyth (Park Avenue),0.563824,0.625290,0.614179,0.574907,0.509206,0.495951,,,,,,"Driving test center, Aberystwyth Park Avenue, UK",52.398359,-4.068531
4,Airdrie,0.478275,0.498094,0.537110,0.489113,0.452222,0.469523,,,,0.431492,0.470373,"Driving test center, Airdrie, UK",55.866770,-3.988403
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
452,Sheffield(Handsworth),0.547450,,,,,,,,,,0.547450,"Driving test center, Sheffield Handsworth, UK",53.368875,-1.365170
453,Stoke-On-Trent(Cobridge),0.484902,,,,,,,,,,0.484902,"Driving test center, Stoke-On-Trent Cobridge, UK",53.040342,-2.187913
454,Stoke-on-Trent(Newcastle-Under-Lyme),0.472919,,,,,,,,,,0.472919,"Driving test center, Stoke-on-Trent Newcastle-...",52.996686,-2.212268
455,Weston-Super-Mare,0.532939,,,,,,,,,,0.532939,"Driving test center, Weston-Super-Mare, UK",51.343539,-2.972070


## Get the # of roundabouts within 1km, 10km, 30km of each test center

In [10]:
def count_roundabouts(lat, lng, radius):
    """
    Count the number of roundabouts within the specified radius
    :param lat: latitude
    :param lng: longitude
    :param radius: in meters
    :return: the number of roundabouts
    """

    # Define the Overpass API URL
    overpass_url = "http://overpass-api.de/api/interpreter"

    # Define the Overpass QL query to find roundabouts
    overpass_query = f"""
    [out:json];
    (
      // search for roundabouts (junction=roundabout) within the bounding box
      way["junction"="roundabout"](around:{radius},{lat},{lng});
    );
    out center;
    """

    # Send the request to the Overpass API
    response = requests.get(overpass_url, params={'data': overpass_query})
    data = response.json()

    # Count roundabouts within the specified radius
    count = 0
    for element in data['elements']:
        roundabout_center = (element['center']['lat'], element['center']['lon'])
        if geodesic((lat, lng), roundabout_center).meters <= radius:
            count += 1

    return count


radius_list = [1, 5, 10, 15, 20, 25]

for radius in radius_list:
    df_test_center[f'roundabouts{radius}Km'] = df_test_center.progress_apply(
        lambda x: count_roundabouts(x['lat'], x['lng'], radius * 1000), axis=1)

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

## Get the traffic light number within radius of each test center

In [11]:
def count_traffic_lights(lat, lng, radius):
    """
    Count the number of roundabouts within the specified radius
    :param lat: latitude
    :param lng: longitude
    :param radius: in meters
    :return: the number of traffic lights within the specified radius
    """

    # Define the Overpass API URL
    overpass_url = "http://overpass-api.de/api/interpreter"

    # Define the Overpass QL query to find roundabouts
    overpass_query = f"""
    [out:json];
    (
      // search for traffic lights (traffic_signals=traffic_lights) within the bounding box
      node[traffic_signals=traffic_lights](around:{radius},{lat},{lng});
    );
    out;
    """

    # Send the request to the Overpass API
    response = requests.get(overpass_url, params={'data': overpass_query})
    data = response.json()

    # Count traffic lights within the specified radius
    count = 0
    for element in data['elements']:
        roundabout_center = (element['lat'], element['lon'])
        if geodesic((lat, lng), roundabout_center).meters <= radius:
            count += 1

    return count


for radius in radius_list:
    df_test_center[f'traffic_lights{radius}Km'] = df_test_center.progress_apply(
        lambda x: count_traffic_lights(x['lat'], x['lng'], radius * 1000), axis=1)

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

## Get the traffic signals number within radius of each test center

In [12]:
def count_traffic_signals(lat, lng, radius):
    """
    Count the number of traffic signals within the specified radius
    :param lat: latitude
    :param lng: longitude
    :param radius: in meters
    :return: the number of traffic signals within the specified radius
    """
    overpass_url = "http://overpass-api.de/api/interpreter"
    overpass_query = f"""
    [out:json];
    (
      node[highway=traffic_signals](around:{radius},{lat},{lng});
    );
    out;
    """
    response = requests.get(overpass_url, params={'data': overpass_query})
    data = response.json()

    count = 0
    for element in data['elements']:
        if geodesic((lat, lng), (element['lat'], element['lon'])).meters <= radius:
            count += 1

    return count


for radius in radius_list:
    df_test_center[f'traffic_signals{radius}Km'] = df_test_center.progress_apply(
        lambda x: count_traffic_signals(x['lat'], x['lng'], radius * 1000), axis=1)

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

## Get # of schools within radius of each test center

In [13]:
def count_school_zones(lat, lng, radius):
    """
    Count the number of school zone signs within the specified radius
    :param lat: latitude
    :param lng: longitude
    :param radius: in meters
    :return: the number of school zone signs within the specified radius
    """
    overpass_url = "http://overpass-api.de/api/interpreter"
    overpass_query = f"""
    [out:json];
    (
      node[amenity=school](around:{radius},{lat},{lng});
    );
    out;
    """
    response = requests.get(overpass_url, params={'data': overpass_query})
    data = response.json()

    count = 0
    for element in data['elements']:
        if geodesic((lat, lng), (element['lat'], element['lon'])).meters <= radius:
            count += 1

    return count


for radius in radius_list:
    df_test_center[f'schools{radius}Km'] = df_test_center.progress_apply(
        lambda x: count_school_zones(x['lat'], x['lng'], radius * 1000), axis=1)

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

## Get # of bus stop within radius of each test center

In [17]:
def count_bus_stop(lat, lng, radius):
    """
    Count the number of bus stop within the specified radius
    :param lat: latitude
    :param lng: longitude
    :param radius: in meters
    :return: the number of bus stop within the specified radius
    """
    overpass_url = "http://overpass-api.de/api/interpreter"
    overpass_query = f"""
    [out:json];
    node
      [highway=bus_stop]
      (around:{radius},{lat},{lng});
    out;
    """
    response = requests.get(overpass_url, params={'data': overpass_query})
    data = response.json()

    count = 0
    for element in data['elements']:
        if geodesic((lat, lng), (element['lat'], element['lon'])).meters <= radius:
            count += 1

    return count


for radius in radius_list:
    df_test_center[f'bstop{radius}Km'] = df_test_center.progress_apply(
        lambda x: count_bus_stop(x['lat'], x['lng'], radius * 1000), axis=1)

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

  0%|          | 0/457 [00:00<?, ?it/s]

# save the cleaned dataframe

In [18]:
str_datetime = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
df_test_center.to_excel('./Dataset/test_center' + str_datetime + '.xlsx', index=False)
df_test_center.to_csv('./Dataset/test_center' + str_datetime + '.csv', index=False)

# Simple EDA

In [19]:
df_test_center

Unnamed: 0,Test Center,pRate,pRate22,pRate21,pRate20,pRate19,pRate18,pRate17,pRate16,pRate15,...,schools10Km,schools15Km,schools20Km,schools25Km,bstop1Km,bstop5Km,bstop10Km,bstop15Km,bstop20Km,bstop25Km
0,Aberdeen North,0.540643,0.497256,0.586460,0.595685,0.525232,0.554287,,,,...,2,2,2,2,100,944,1364,1595,1676,1969
1,Aberdeen South (Cove),0.555662,0.559935,0.610141,0.627207,0.578401,0.535264,,,,...,2,2,2,2,15,288,1118,1549,1696,1821
2,Abergavenny,0.587358,0.594071,0.606086,0.605538,0.543415,0.563244,,,,...,0,3,6,7,3,19,70,269,533,1073
3,Aberystwyth (Park Avenue),0.563824,0.625290,0.614179,0.574907,0.509206,0.495951,,,,...,1,1,1,1,14,109,143,168,192,223
4,Airdrie,0.478275,0.498094,0.537110,0.489113,0.452222,0.469523,,,,...,21,28,37,40,46,404,1221,2867,4847,6355
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
452,Sheffield(Handsworth),0.547450,,,,,,,,,...,2,4,5,7,51,894,3437,5302,7282,9379
453,Stoke-On-Trent(Cobridge),0.484902,,,,,,,,,...,1,2,3,3,7,432,1238,1987,2884,3766
454,Stoke-on-Trent(Newcastle-Under-Lyme),0.472919,,,,,,,,,...,1,1,3,3,28,447,965,1744,2799,3899
455,Weston-Super-Mare,0.532939,,,,,,,,,...,0,1,5,7,92,506,600,830,1145,1982
