<h1>Event Identification</h1>

<h2>Preprocessing data</h2>
We define the tyre compound descriptions linked to their numbers, a function to convert the driver names into their unique <i>name codes</i> via their driver numbers, and another function that helps us load the <i>timetable data</i> from the files and calls the second function to process the data into a dictionary that uses the name codes.

In [15]:
import json
import os
import random


compounds = {9: 'soft', 10: 'medium', 11: 'hard',
             5: 'intermediate', 6: 'full wet',
             0: 'undefined', 1: 'supersoft',
             2: 'soft', 3: 'medium', 4: 'hard',
             7: 'ultrasoft', 8: 'hypersoft'}


def codeFromNumber(d_nr, year, i=0):
    folder = '../DataExtraction/RaceData/Ergast_Data_Drivers/'
    try: file = [f for f in os.listdir(folder) if year in f][i]
    except IndexError: return 'XXX'
    with open(folder + file, 'r') as f: d_data = json.load(f)
    code = [d_data[d]['code'] for d in d_data
            if d_data[d]['number'] == str(d_nr)]
    if len(code) > 0: return code[0]
    elif d_nr == 51: return 'FIT'
    elif d_nr == 1: return 'VER'
    elif d_nr == 40: return 'LAW'
    elif d_nr == 5: return 'VET'
    elif d_nr == 88: return 'KUB'
    else: 
        print(d_nr, year)
        return codeFromNumber(d_nr, year, i+1)


def loadTimetables(race):
    folder = '../DataExtraction/RaceData/TFeed_Timetables/' + race + '/'
    lap_data, yr = {}, race[:4]
    # folder, lap_data, yr = './Timetables/' + race + '/', {}, race[:4]
    tts = [f for f in os.listdir(folder) if 'ipynb' not in f]
    for tt in tts:
        fn = folder + tt
        with open(fn, 'r') as f: data = json.load(f)
        d_keys, n_data = [k for k in data if k != 'version'], {}
        d_nrs = [data[d]['driver_number'] for d in d_keys]
        # n_keys, n_data = [names[d] for d in d_keys], {}
        for i in range(len(d_keys)):
            nr, d = d_nrs[i], d_keys[i]
            code = codeFromNumber(nr, yr)
            # n, d = n_keys[i], d_keys[i]
            n_data[code] = data[d]
        lap = data[d_keys[0]]['lap']
        if lap not in lap_data.keys(): lap_data[lap] = []
        lap_data[lap].append(n_data)
    return lap_data

<h2>Linking files</h2>
The files from the different sources (i.e., TFeed and LiveTiming/Ergast) use different names to represent the same races. We therefore define a function that connects the files by searching for the presence of the country or city name in the other files.

In [2]:
def loadRaces(year):
    f1 = '../DataExtraction/RaceData/TFeed_Timetables/'
    f2 = '../DataExtraction/RaceData/Ergast_Data_Circuit/'
    f3 = '../DataExtraction/RaceData/LiveTiming_Data/'
    races1 = [rf for rf in os.listdir(f1) if str(year) in rf]
    races2 = [rf for rf in os.listdir(f2) if str(year) in rf]
    races = []
    for r2 in races2:
        with open(f2 + r2, 'r') as f: data = json.load(f)
        country, city = data['country'], data['city']
        country = country.replace(' ', '_').lower()
        city = city.replace(' ', '_').lower()
        for r1 in races1:
            if city in r1 or country in r1:
                races.append([r1, r2])
    for r1, r2 in races:
        i = races.index([r1, r2])
        with open(f3 + r2, 'r') as f: data = json.load(f)
        nr_laps = list(data.keys())[-1]
        races[i].append(int(nr_laps))
    return races

<h2>Identifying events</h2>
We use different classes for different types of events, each with one or more functions that contains rules to determine whether an event occurred in a lap or not.

In [3]:
class Data:
    def __init__(self, race):
        self.data = loadTimetables(race)

    def drivers(self):
        return [d for d in self.data[1][0].keys()]

In [4]:
class Overtake:
    def __init__(self, data):
        self.data, self.drivers = data.data, data.drivers()

    def overtakes(self, lap):
        lap_data, p_pos = self.data[lap], None
        pos_ots, ots = [], []
        for ld in lap_data:
            # the data occassionally misses a driver, the exception
            # clause is used to avoid errors in such cases.
            try: pos = [(d, ld[d]['position']) for d in self.drivers]
            except KeyError: continue
            pos = sorted(pos, key=lambda x: x[1])
            if not p_pos: p_pos = pos
            if pos != p_pos:
                pos_ots += [z for z in zip(pos, p_pos)
                            if z[0] != z[1]]
            p_pos = pos
        for i in range(1, len(pos_ots)):
            p1, p2 = pos_ots[i-1], pos_ots[i]
            if p1[0][0] == p2[1][0] and p1[1][0] == p2[0][0]:
                d_adv, pos, d_dis = p1[0][0], p1[0][1], p2[0][0]
                if lap < 2: ots.append([d_adv, [d_dis, pos]])
                elif not self.__madeStop(lap_data, self.data[lap-1], d_dis):
                    ots.append([d_adv, [d_dis, pos]])
        return ots

    def __madeStop(self, cld, pld, driver):
        c_pits, p_pits = cld[-1][driver]['pits'], pld[0][driver]['pits']
        if c_pits != p_pits: return True
        return False

In [5]:
class Pitstop:
    def __init__(self, data):
        self.data, self.drivers = data.data, data.drivers()

    def pitstops(self, lap):
        if lap < 2: return None
        ld, stops = self.data[lap], []
        for d in self.drivers:
            try: pits = [ld[x][d]['pits'] for x in range(len(ld))]
            except KeyError: continue
            p_pit = self.data[lap-1][-1][d]['pits']
            pitting = [p for p in pits if p != p_pit]
            if len(pitting) == 0: continue
            # pitting = [p for p in pits if p < 0]
            # if len(pitting) == 0: continue
            t = self.data[lap][-1][d]['tyre_compound'][-1][0]
            i = pits.index(pitting[0])
            o_pos = self.data[lap-1][-1][d]['position']
            n_pos = self.data[lap][i][d]['position']
            time = abs(pitting[0]) / 1000
            stop = [d, [[o_pos, n_pos], t, time], None]
            stops.append(stop)
        if len(stops) > 0: return stops

    def undercut(self, driver, lap):
        ld = self.data[lap]
        pos, d_front = self.data[lap-1][-1][driver]['position'], None
        for d in self.drivers:
            p = self.data[lap-1][-1][d]['position']
            if p == pos - 1:
                d_front = d
                break
        if not d_front: return None
        pits_front = [ld[x][d_front]['pits'] for x in range(len(ld))]
        pc_front = np.max(pits_front)
        pc = np.max([ld[x][driver]['pits'] for x in range(len(ld))])
        if pc > pc_front: return d_front

In [6]:
import numpy as np


class UnusualTime:
    def __init__(self, data):
        self.data, self.drivers = data.data, data.drivers()
        self.ps = Pitstop(data)

    def slowSector(self, lap):
        sectors, outliers = [], []
        for s in ['s1', 's2', 's3']:
            s_times = [(d, self.data[lap][-1][d][s]) for d in self.drivers]
            s_times = [s for s in s_times if s[1] != 0]
            if len(s_times) > 0: sectors.append(s_times)
        for st in sectors:
            ols = self.__outlier(st, lap, sectors.index(st))
            [outliers.append(o) for o in ols]
        if len(outliers) > 0: return outliers

    def __outlier(self, s_times, lap, si):
        sec = 'sector ' + str(si + 1)
        stops, p_stops = self.ps.pitstops(lap), self.ps.pitstops(lap-1)
        outliers, d_stops, pd_stops = [], [], []
        if stops: d_stops = [p[0] for p in stops]
        if p_stops: pd_stops = [p[0] for p in p_stops]
        n_times = []
        for s_pair in s_times:
            # for d in d_stops: print(d)
            if s_pair[0] not in d_stops and s_pair[0] not in pd_stops:
                c_pits = self.data[lap][-1][s_pair[0]]['pits']
                p_pits = self.data[lap][0][s_pair[0]]['pits']
                if c_pits == p_pits: n_times.append(s_pair)
        t = np.array([s[1] for s in n_times])
        mean, std = np.mean(t), np.std(t)
        for pair in n_times:
            z = (pair[1] - mean) / std
            if z > 2.5: outliers.append(
                [pair[0], [sec, [round(pair[1], 2), round(mean, 2)]]])
        return outliers

In [7]:
class LapTimes:
    def __init__(self, data):
        self.data, self.drivers = data.data, data.drivers()

    def __prevBestTime(self, lap):
        prev_data = [self.data[k] for k in self.data.keys() if k < lap]
        rec = None
        for pd in prev_data:
            drivers = [d for d in self.drivers if d in pd[-1].keys()]
            pbs = [(d, pd[-1][d]['lap_time']) for d in drivers]
            pbs = [p for p in pbs if p[1] != 0]
            pbs = sorted(pbs, key=lambda x: x[1])
            if len(pbs) == 0: continue
            best_time = pbs[0]
            if not rec: rec = best_time
            if best_time[1] < rec[1]: rec = best_time
        return rec

    def fastestLap(self, lap):
        if lap < 5: return None
        pbt = self.__prevBestTime(lap)
        drivers = [d for d in self.drivers if d in self.data[lap][0].keys()]
        times = [(d, self.data[lap][0][d]['lap_time']) for d in drivers]
        times = [t for t in times if t[1] != 0]
        times = sorted(times, key=lambda x: x[1])
        if len(times) == 0: return None
        d, t = times[0]
        diff = round(pbt[1] - t, 2)
        if t < pbt[1]: return [d, [t, diff]]

    def retirement(self, lap):
        if lap < 2: return None
        cld, pld, rets = self.data[lap], self.data[lap-1], []
        for d in self.drivers:
            try:
                clp_s, clp_e = cld[0][d]['lap_pos'], cld[-1][d]['lap_pos']
                plp_s, plp_e = pld[0][d]['lap_pos'], pld[-1][d]['lap_pos']
            except KeyError: continue
            if clp_s == clp_e and plp_s != plp_e:
                loc, start = 'middle', ['start', 'beginning', 'first part']
                end = ['end', 'last part']
                if clp_e < 0.33: loc = random.choice(start)
                elif clp_e > 0.66: loc = random.choice(end)
                rets.append([d, loc])
        if len(rets) > 0: return rets

In [8]:
class Battles:
    def __init__(self, data):
        self.data, self.drivers = data.data, data.drivers()

    def approaching(self, lap):
        if lap < 5: return None
        a_events, ld = [], self.data[lap]
        drivers = [d for d in self.drivers if d in ld[0].keys()]
        iv_start = [(d, ld[0][d]['interval']) for d in drivers]
        iv_start = sorted(iv_start, key=lambda x: ld[0][x[0]]['position'])
        iv_end = [(d, ld[-1][d]['interval']) for d in drivers]
        iv_end = sorted(iv_end, key=lambda x: ld[0][x[0]]['position'])
        for ivs in iv_start:
            d = ivs[0]
            pit_s, pit_e = ld[0][d]['pits'], ld[-1][d]['pits']
            if pit_s != pit_e: continue
            if ivs[1] == 0 or ivs[1] > 5: continue
            try: ive = [i for i in iv_end if i[0] == d][0]
            except IndexError: continue
            diff = ivs[1] - ive[1]
            if diff < 1: continue
            pd_s = iv_start[iv_start.index(ivs) - 1][0]
            pd_e = iv_end[iv_end.index(ive) - 1][0]
            if pd_s != pd_e: continue
            a_events.append([d, [pd_e, round(diff, 2), round(ive[1], 2)]])
        if len(a_events) > 0:
            a_events = sorted(a_events, key=lambda x: x[1][1])
            return a_events[0]
        
    def lapped(self, lap, driver):
        ld = self.data[lap]
        d_pos, d_gap = ld[0][driver]['lap_pos'], ld[0][driver]['gap']
        o_pos = [(d, ld[0][d]['lap_pos']) for d in self.drivers if
                 (d_gap - ld[0][d]['gap']) > 30]
        if len(o_pos) == 0: return None
        # the driver with the lap position that is closest to the
        # subject driver is likely the one lapping
        o_pos = sorted(o_pos, key=lambda x: abs(x[1]-d_pos))
        return o_pos[0][0]
    
    def drsZone(self, lap):
        if lap < 5: return None
        d_events, cld, pld = [], self.data[lap], self.data[lap-1]
        in_drs = [(d, cld[-1][d]['interval']) for d in self.drivers if 
                  cld[-1][d]['interval'] < 0.7 and cld[-1][d]['position'] > 1]
        if len(in_drs) == 0: return None
        for drv, inv in in_drs:
            c_pos, p_pos = cld[-1][drv]['position'], pld[-1][drv]['position']
            try:
                c_pd = [d for d in self.drivers if 
                        cld[-1][d]['position'] == c_pos-1][0]
                p_pd = [d for d in self.drivers if 
                        pld[-1][d]['position'] == p_pos-1][0]
            except IndexError: continue
            if pld[-1][drv]['interval'] < 1 or inv < 0: continue
            if inv >= cld[0][drv]['interval'] or c_pd != p_pd: continue
            d_events.append([drv, [c_pd, round(inv, 1), c_pos-1]])
        if len(d_events) == 0: return None
        d_events = sorted(d_events, key=lambda x: x[1][2])
        # return the one with the best position
        return d_events[0]

In [9]:
class DriverInfo:
    def __init__(self, race):
        self.race = race

    def team(self, driver):
        folder = '../DataExtraction/RaceData/Ergast_Data_Drivers/'
        file = folder + self.race
        with open(file, 'r') as f: data = json.load(f)
        teams = [data[d]['team'] for d in data if data[d]['code'] == driver]
        if len(teams) > 0: return teams[0]

<h2>Interpreting events</h2>
The <i>Interpreter</i> class calls the other classes and checks for the presence of these events. The <i>RaceControl</i> class, which is responsible for extracting and understanding race control messages and is saved in a separate file due to its complexity, is imported.

In [10]:
from RaceControl import RaceControl


class Interpreter:
    def __init__(self, data, race_rc, lap):
        self.data, self.race_rc, self.lap = data, race_rc, lap

    def overtakeEvent(self):
        os, events = Overtake(self.data).overtakes(self.lap), []
        if not os: return None
        actions = ['overtake', 'pass', ('fly', 'past')]
        os = sorted(os, key=lambda x: x[1][1])
        for o in os[:2]:
            act, main = random.choice(actions), None
            if type(act) == tuple: act, main = act
            event1 = {'subject': [{'driver': o[0]}],
                      'action': act,
                      'object': {'driver': [o[1][0]]}}
            if main: event1['object']['main'] = [main]
            event2 = {'subject': [{'driver': o[0]}],
                      'action': 'move up',
                      'object': {'position': [str(o[1][1])]}}
            events += [event1, event2]
        add_os = [{'driver': o[0]} for o in os[2:]]
        if len(add_os) == 0: return events
        events.append({'subject': add_os, 'action': 'also gain',
                       'object': {'main': ['position']}})
        return events

    def fastestLapEvent(self):
        fl = LapTimes(self.data).fastestLap(self.lap)
        if not fl: return None
        return[{'subject': [{'driver': fl[0]}],
                'action': 'set',
                'object': {'main': ['new fastest lap'],
                           'lap_time': [str(fl[1][0])]}}]

    def pitstopEvent(self):
        pit = Pitstop(self.data)
        ps, events = pit.pitstops(self.lap), []
        if not ps: return None
        actions = ['pit', 'stop', 'box', ('make', 'pitstop'),
                   ('come', 'in pits'), ('enter', 'pits')]
        ps1, ps2, prev_uc = ps, None, False
        if len(ps) > 2:
            ps = sorted(ps, key=lambda x: x[1][0][0] - x[1][0][1])
            ps1, ps2 = ps[:2], ps[2:]
        for p in ps1:
            uc = pit.undercut(p[0], self.lap)
            if uc and not prev_uc and random.choice([None, None, True]):
                event1 = {'subject': [{'driver': p[0]}],
                          'action': 'pit', 'object':
                          {'main': ['from P' + str(p[1][0][0]), 'to undercut'], 'driver': [uc]}}
                event2 = {'subject': [{'driver': uc}],
                          'action': 'be', 'object': {'main': ['still out']}}
                events += [event1, event2]
                prev_uc = True
                continue
            act = random.choice(actions)
            main = ['for ' + compounds[p[1][1]] + ' tyres']
            if type(act) == tuple:
                main.insert(0, act[1])
                act = act[0]
            old_pos, new_pos = p[1][0]
            event1 = {'subject': [{'driver': p[0]}],
                      'action': act, 'object': {'main': main}}
            act = 'drop to'
            if new_pos == old_pos: act = 'stay in'
            elif new_pos < old_pos: act = 'climb up to'
            event2 = {'subject': [{'driver': p[0]}],
                      'action': act,
                      'object': {'position': [str(new_pos)]}}
            events += [event1, event2]
            # if ps.undercut(p[0], self.lap):
            #     event3 = {'subject'
        if ps2: events.append({'subject': [{'driver': [p[0] for p in ps2]}],
                               'action': 'pit', 'object': {}})
        return events

    def slowSectorEvent(self):
        ss, events = UnusualTime(self.data).slowSector(self.lap), []
        if not ss: return None
        for s in ss:
            if s[1][1][0] > s[1][1][1]: x = 'slow'
            elif s[1][1][0] <= s[1][1][1]: x = 'fast'
            event = {'subject': [{'driver': s[0]}],
                     'action': 'go',
                     'object': {'main': [x],
                                'location': [s[1][0]],
                                'sector_time': [str(s[1][1][0])]}}
            events.append(event)
        return events

    def raceControlEvent(self):
        events, bfs = RaceControl(self.race_rc, self.lap).messages(), []
        if not events: return None
        for event in events:
            try: main = event['object']['main']
            except KeyError: continue
            if 'blue' in main: bfs.append(event)
        if len(bfs) == 0: return events
        for bf in bfs:
            d = bf['subject'][0]['driver']
            l_by = Battles(self.data).lapped(self.lap, d)
            if l_by:
                event1 = {'subject': [{'driver': d}],
                          'action': 'be lapped by',
                          'object': {'driver': [l_by]}}
                event2 = {'subject': [{'driver': l_by}],
                          'action': 'lap',
                          'object': {'driver': [d]}}
                events.append(random.choice([event1, event2]))
        return events
    
    def drsEvent(self):
        drs = Battles(self.data).drsZone(self.lap)
        if not drs: return None
        d1, d2 = drs[0], drs[1][0]
        gap, pos = drs[1][1], drs[1][2]
        act = random.choice(['fight', 'battle'])
        events1 = [{'subject': [{'driver': d1}, {'driver': d2}],
                    'action': act, 'object': 
                    {'main': ['for P' + str(pos)]}},
                   {'subject': [{'other': 'gap'}],
                    'action': 'be', 'object': 
                    {'main': [random.choice(['down to', 'only'])],
                     'gap': [str(gap) + 's']}}]
        events2 = [{'subject': [{'driver': d1}],
                    'action': 'be threathening', 'object':
                    {'driver': [d2], 'main': ['for position']}},
                   {'subject': [{'driver': d1}],
                    'action': 'be', 'object':
                    {'main': [str(gap) + 's behind'],
                     'driver': [d2], 'position': [str(pos)]}}]
        act = random.choice([['be', 'in DRS zone', 'of '],
                            ['have', 'DRS advantage', 'on '],
                            ['drive', 'on tail', 'of ']])
        events3 = [{'subject': [{'driver': d1}],
                    'action': act[0], 'object':
                    {'main': [act[1]], 'driver': [act[2] + d2]}},
                   {'subject': [{'driver': d2}],
                    'action': 'might lose', 'object':
                    {'main': ['P' + str(pos)], 'driver': ['to ' + d1]}}]
        return random.choice([events1, events2, events3])

    def approachEvent(self):
        # print('loading approaching')
        ap = Battles(self.data).approaching(self.lap)
        # print('loaded approaching')
        if not ap: return None
        d1, d2, diff, gap = ap[0], ap[1][0], ap[1][1], ap[1][2]
        acts1 = ['catch', 'approach', ('close', 'gap to')]
        act1, main1 = random.choice(acts1), None
        if type(act1) == tuple: act1, main1 = act1
        if main1: obj1 = {'main': [main1], 'driver': [d2]}
        else: obj1 = {'driver': [d2]}
        event1 = {'subject': [{'driver': d1}],
                  'action': act1, 'object': obj1}
        acts2 = [('decrease', 'to'), ('be', 'down to'),
                 ('be', 'only')]
        timings = ['now', 'end of lap', 'this lap', 'currently']
        act2, main2 = random.choice(acts2)
        t2 = random.choice(timings)
        event2 = {'subject': [{'other': 'gap'}],
                  'action': act2, 'object':
                  {'main': [main2, str(gap)],
                   'driver': [d2], 'timing': [t2]}}
        return [event1, event2]

    def driverOutEvent(self):
        # print('loading driver out')
        do, events = LapTimes(self.data).retirement(self.lap), []
        # print('loaded driver out')
        if not do: return None
        for d in do:
            team = DriverInfo(self.race_rc).team(d[0])
            act = random.choice(['retire from', 'stop', 'quit', 'be out of'])
            event1 = {'subject': [{'driver': d[0]}],
                      'action': act,
                      'object': {'main': ['race']}}
            act = random.choice(['give up', 'stop', 'be stationary'])
            event2 = {'subject': [{'other': 'his ' + team + ' car'}],
                      'action': act,
                      'object': {'timing': [d[1] + ' of lap']}}
            events += [event1, event2]
        return events

    def carEvent(self):
        rc = RaceControl(self.race_rc, self.lap)
        ce = [rc.safetyCar(), rc.carEvents()]
        return [[e] for e in ce if e]

<h2>Saving filtered data</h2>
In each year between 2018 and 2023, we take two random races to use for generating blog posts. We <i>exclude</i> 2020, since this was a Covid year with many cancelled races.

The lists contain the filename for the TFeed timetables, the filename for all the other data files, and the number of laps.

In [29]:
races = []
for year in range(2018, 2024):
    if year == 2020: continue
    races_y = loadRaces(year)
    races += random.sample(races_y, 2)

In [20]:
races = [['2018_monaco', '2018_monaco.json', 78],
         ['2018_china', '2018_shanghai.json', 56],
         ['2019_singapore', '2019_marina_bay.json', 61],
         ['2019_spain', '2019_catalunya.json', 66],
         ['2021_baku', '2021_baku.json', 51],
         ['2021_austin', '2021_americas.json', 56],
         ['2022_spa', '2022_spa.json', 44],
         ['2022_saudi_arabia', '2022_jeddah.json', 50],
         ['2023_baku', '2023_baku.json', 51],
         ['2023_suzuka', '2023_suzuka.json', 53]]

In [31]:
for r in races: print(r)

['2018_monaco', '2018_monaco.json', 78]
['2018_china', '2018_shanghai.json', 56]
['2019_singapore', '2019_marina_bay.json', 61]
['2019_spain', '2019_catalunya.json', 66]
['2021_baku', '2021_baku.json', 51]
['2021_austin', '2021_americas.json', 56]
['2022_spa', '2022_spa.json', 44]
['2022_saudi_arabia', '2022_jeddah.json', 50]
['2023_baku', '2023_baku.json', 51]
['2023_suzuka', '2023_silverstone.json', 52]


In [18]:
def getEvents(race, data, lap):
    ip, actions = Interpreter(data, race[1], lap), []
    main_acts = [ip.pitstopEvent(), ip.overtakeEvent()]
    acts = [ma for ma in main_acts if ma]
    if len(acts) > 0: actions.append(acts)
    actions += ip.carEvent()
    do = ip.driverOutEvent()
    if do: actions.append([do])
    rnd = random.randint(1, 20)
    if len(acts) == 0 or rnd >= 15:
        sub_acts = [ip.fastestLapEvent(), ip.raceControlEvent()]
        acts = [sa for sa in sub_acts if sa]
        if len(acts) > 0: actions.append(acts)
    if len(acts) == 0 or rnd <= 5:
        subsub_acts = [ip.drsEvent(), ip.approachEvent(), ip.slowSectorEvent()]
        acts = [sa for sa in subsub_acts if sa]
        if len(acts) > 1: actions.append([random.choice(acts)])
        elif len(acts) > 0: actions.append(acts)
    if len(actions) > 0: return actions

In [21]:
for race in races:
    data, events = Data(race[0]), {}
    fn = './Events/' + race[1]

    for lap in range(1, race[2]):
        try: acts = getEvents(race, data, lap)
        except KeyError: continue
        if acts: events['Lap ' + str(lap)] = acts

    with open(fn, 'w') as f: json.dump(events, f)
    print('Saved', race[0])

Saved 2023_suzuka
