# Overview

Objective: 
- get [Betfair historical data](https://historicdata.betfair.com/#/home) for the `match odds` (who will win) market.  

Notes:  
- get `metadata` on: available months/years, countries, number of files, sizes, etc.  

# Setup

## Imports

In [1]:
# data wrangling
import json
import numpy as np
import pandas as pd
# requests
import requests
# credentials (for the API)
import configparser
# dates
import calendar
# asynchronous requests
import asyncio
import aiohttp
import aiofiles
# files, system
import os

## Credentials

 A valid [session token](https://developer.betfair.com/exchange-api/accounts-api-demo/) must be in `credentials.ini`, under `token`.

In [2]:
cred = configparser.ConfigParser()
cred.read('credentials.ini')
token = cred['DB']['token']

# API endpoints

reference: https://historicdata.betfair.com/#/apidocs

Endpoints:  
- `GetMyData` - Returns the packages you have purchased  
- `GetCollectionOptions` - Returns the filter options for a given filter  
- `GetAdvBasketDataSize` - Returns a file count and size based on a filter  
- `DownloadListOfFiles` - Returns a list of files based on a filter  
- `DownloadFile` - Downloads a specific file

So, we will:
1. check data packages and dates available with `GetMyData`.  
2. get countries available for each month with `GetCollectionOptions`.  
3. get file counts and sizes per month of data with `GetAdvBasketDataSize`.  
4. get list of data files with `DownloadListOfFiles`.  
5. download data files with `DownloadFile`.

# Check data packages and dates available with `GetMyData`

Make sure that:  
- Packages from 04-2015 to 12-2022 are available.  
- There are no missing months.

In [26]:
# session object
session = requests.Session()

In [27]:
URL_GETMYDATA = "https://historicdata.betfair.com/api/GetMyData"

headers = {
    "ssoid": token,
    'Accept': 'application/json'
    }

In [28]:
resp = session.get(URL_GETMYDATA, headers=headers)

In [29]:
# check response code and first 100 characters
resp.status_code, resp.text[:100]

(200,
 '[{"sport":"Horse Racing","plan":"Basic Plan","forDate":"2018-10-01T00:00:00","purchaseItemId":27728}')

We seem to have received a legitimate response.

In [30]:
purchased_packages_dic = resp.json()

Let us export the data as a `JSON` file.

In [31]:
PURCHASED_PACKAGES_PATH = "../data/raw/betfair/purchased_packages_dic.json"

In [32]:
with open(PURCHASED_PACKAGES_PATH, "w") as outfile:
    json.dump(purchased_packages_dic, outfile)

Load `JSON` file.

In [33]:
with open(file=PURCHASED_PACKAGES_PATH, mode="r") as f:
    purchased_packages_dic = json.load(f)

Now we check using `pandas` which months and years are available.

In [34]:
# build pandas DataFrame
purchased_packages = pd.DataFrame(purchased_packages_dic)
purchased_packages_soccer = purchased_packages[purchased_packages['sport'] == 'Soccer'].copy()
purchased_packages_soccer['forDate'] = pd.to_datetime(purchased_packages_soccer['forDate'])
purchased_packages_soccer['month'] = purchased_packages_soccer['forDate'].dt.month
purchased_packages_soccer['year'] = purchased_packages_soccer['forDate'].dt.year
purchased_packages_soccer

Unnamed: 0,sport,plan,forDate,purchaseItemId,month,year
2,Soccer,Basic Plan,2015-04-01,32270,4,2015
3,Soccer,Basic Plan,2015-05-01,32270,5,2015
4,Soccer,Basic Plan,2015-06-01,32270,6,2015
5,Soccer,Basic Plan,2015-07-01,32269,7,2015
6,Soccer,Basic Plan,2015-08-01,29416,8,2015
...,...,...,...,...,...,...
102,Soccer,Basic Plan,2023-08-01,120990,8,2023
103,Soccer,Basic Plan,2023-09-01,120993,9,2023
104,Soccer,Basic Plan,2023-10-01,120996,10,2023
105,Soccer,Basic Plan,2023-11-01,120999,11,2023


In [35]:
purchased_packages_soccer.groupby(by=['year'])['month'].aggregate(lambda x: list(x))

year
2015             [4, 5, 6, 7, 8, 9, 10, 11, 12]
2016    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
2017    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
2018    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
2019    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
2020    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
2021    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
2022    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
2023    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Name: month, dtype: object

**Conclusion**: Packages from 04-2015 to 12-2022 are available and there are no missing months.

# Get countries available for each month with `GetCollectionOptions`

For each month, get available countries and number of files.  
Each file is one match or part of one match.

Peforming synchronous `requests` takes around 20min, so we work asynchronously with `aiohttp` and `asyncio`.  

In [18]:
# constants
SPORT = 'Soccer'
PLAN = 'Basic Plan'
MARKET_TYPE = 'MATCH_ODDS'
FILE_TYPE = 'M'
LAST_DAY_INDEX = 1

In [19]:
# months and years
months = purchased_packages_soccer['month']
years = purchased_packages_soccer['year']

To make post requests per pair `month_year`, we need a list of `data` dictionaries.

In [20]:
# list of 'data' dictionaries for requests
data_list = []

for month, year in zip(months, years):
    data = {
        "sport": SPORT, 
        "plan": PLAN,
        "fromDay": 1,
        "marketTypesCollection": [MARKET_TYPE],
        "fileTypeCollection" : [FILE_TYPE],
        "fromMonth": month,
        "fromYear": year,
        "toDay": calendar.monthrange(year, month)[LAST_DAY_INDEX],
        "toMonth": month,
        "toYear": year
        }

    data_list.append(data)

In [21]:
URL_COLLECTION_OPTIONS = "https://historicdata.betfair.com/api/GetCollectionOptions"

Define 2 functions:  
- one to asynchronously make a single POST request
- one to wrap a list of requests

In [22]:
async def async_make_post_request(session, url, data, headers, timeout, retries):
    """
    Make an asynchronous POST request with retries.
    
    Parameters:
        - session (aiohttp.ClientSession): Asynchronous session object.
        - url (str): URL for the request.
        - data (dict): Data to be sent with the request.
        - headers (dict): Headers for the request.
        - timeout (int): Timeout for the request.
        - retries (int): Number of retries if the request fails.
    
    Returns:
        - response (dict): JSON response of the request if successful.
        - None: If the request is unsuccessful.
    """
    
    retry_count = 0
    while retry_count < retries:
        try:
            async with session.post(url, json=data, headers=headers, timeout=timeout) as response:
                if response.status != 200:
                    retry_count += 1
                    continue
                return await response.json()
        except (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError):
            retry_count += 1
    return None

In [23]:
async def async_make_multiple_post_requests(url, data_list, headers, timeout=5, retries=20):
    """
    Make multiple asynchronous POST requests with retries.
    
    Parameters:
        - url (str): URL for the requests.
        - data_list (list): list of data dictionaries to be sent with each request.
        - headers (dict): Headers for the requests.
        - timeout (int): Timeout for the requests.
        - retries (int): Number of retries if the requests fail.
    
    Returns:
        - results (list): list of JSON responses of the requests if successful.
    """
    async with aiohttp.ClientSession() as session:
        tasks = [async_make_post_request(session, url, data, headers, timeout, retries)
                 for data in data_list]
        results = await asyncio.gather(*tasks, return_exceptions=False)
        return results

In [24]:
results = await async_make_multiple_post_requests(URL_COLLECTION_OPTIONS, data_list, headers)

Check if all requests were successful:

In [25]:
None in results

False

In case not all requests were successful, we write a function to perform retry cycles on failed requests.

In [21]:
def get_none_idx(results):
    """
    Get indices of failed requests in a given `results` list.
    """
    none_idx = []
    for i, r in enumerate(results):
        if r is None:
            none_idx.append(i)
    return none_idx

In [22]:
async def retry_failed_requests(url, data_list, headers, results, timeout=5, retries=20, max_retry_cycles=10):
    """
    Perform retry cycles on the failed requests.
    
    Parameters:
    - url (str): URL for the requests.
    - data_list (list): list of data dictionaries to be sent with each request.
    - results (list): list of previous JSON responses of the requests if successful.
    - max_retry_cycles (int): maximum number of cycles if there still are failed requests.
    
    Returns:
        - results (list): updated list of JSON responses of the requests if successful.
    """
    count = 0
    while ((None in results) and (count < max_retry_cycles)):
        none_idx = get_none_idx(results)
        results_retry = await async_make_multiple_post_requests(url,
                                                                [data_list[i] for i in none_idx],
                                                                headers,
                                                                timeout,
                                                                retries)
        for i, j in enumerate(none_idx):
            results[j] = results_retry[i]
        count +=1
    return results

In [23]:
results = await retry_failed_requests(URL_COLLECTION_OPTIONS, data_list, headers, results)

In [24]:
None in results

False

All requests were successful. We needed a high value for RETRIES (20) due to timeout errors.  
Now let us store the results in a dictionary with `month_year` pairs as keys.

In [25]:
keys = ['_'.join([str(m), str(y)]) for m, y in zip(months, years)]

In [26]:
collection_options = {k: r for k, r in zip(keys, results)}

Let us export the data as a `JSON` file.

In [27]:
COLLECTION_OPTIONS_PATH = "../data/raw/betfair/collection_options.json"

In [28]:
with open(COLLECTION_OPTIONS_PATH, "w") as outfile:
    json.dump(collection_options, outfile)

Let us load the `JSON` file.

In [29]:
with open(file=COLLECTION_OPTIONS_PATH, mode="r") as f:
    collection_options = json.load(f)

Let us see how a record looks like.

In [30]:
collection_options['4_2016']

{'marketTypesCollection': [{'name': 'MATCH_ODDS', 'count': 11133}],
 'countriesCollection': [{'name': 'PY', 'count': 67},
  {'name': 'BR', 'count': 467},
  {'name': 'SV', 'count': 79},
  {'name': 'CO', 'count': 110},
  {'name': 'FR', 'count': 266},
  {'name': 'IL', 'count': 200},
  {'name': 'CZ', 'count': 232},
  {'name': 'UA', 'count': 155},
  {'name': 'RU', 'count': 264},
  {'name': 'AZ', 'count': 25},
  {'name': 'IQ', 'count': 3},
  {'name': 'UZ', 'count': 49},
  {'name': 'JP', 'count': 225},
  {'name': 'IE', 'count': 80},
  {'name': 'IN', 'count': 44},
  {'name': 'CN', 'count': 59},
  {'name': 'SG', 'count': 51},
  {'name': 'GB', 'count': 1479},
  {'name': 'IT', 'count': 435},
  {'name': 'JO', 'count': 40},
  {'name': 'MY', 'count': 53},
  {'name': 'VE', 'count': 109},
  {'name': 'RO', 'count': 178},
  {'name': 'CS', 'count': 67},
  {'name': 'SK', 'count': 90},
  {'name': '', 'count': 76},
  {'name': 'KW', 'count': 37},
  {'name': 'QA', 'count': 39},
  {'name': 'FI', 'count': 77},


We do not perform an analysis per country at this moment, but the data is stored in case we need to analyze it ahead in the project.

# Get file counts and sizes per month of data with `GetAdvBasketDataSize`

For each month, get total size and number of files (before filtering).  
We want to get the order of magnitude of the total size of Betfair data files and the number of files, for the set of relevant countries.  
We use the list of relevant countries defined in the first round of the research.

In [31]:
COUNTRY_CODES_PATH = 'configuration/countryCodes.csv'

In [32]:
country_codes = list(pd.read_csv(COUNTRY_CODES_PATH)['marketDefinition.countryCode'].values)
country_codes

['AR',
 'AU',
 'BR',
 'CA',
 'CN',
 'DE',
 'ES',
 'FR',
 'GB',
 'IL',
 'IT',
 'JP',
 'NL',
 'PT',
 'RU',
 'TR',
 'US']

Similarly to the previous endpoint, we need a list of data dictionaries for the POST requests.  
For `GetAdvBasketDataSize`, we restrict the requests to the list of relevant countries. 

In [33]:
data_list_countries = []

for data in data_list:
    data_countries = data.copy()
    data_countries['countriesCollection'] = country_codes
    data_list_countries.append(data_countries)

In [34]:
URL_DATA_SIZE = "https://historicdata.betfair.com/api/GetAdvBasketDataSize"

In [35]:
results_data_size = await async_make_multiple_post_requests(URL_DATA_SIZE, data_list_countries, headers)

In [36]:
None in results_data_size

True

Not all requests were successful, so we run a function to perform retry cycles on the failed requests.

In [61]:
results_data_size = await retry_failed_requests(URL_DATA_SIZE, data_list_countries, headers, results_data_size)

In [62]:
None in results_data_size

False

All requests were successful.  
Let us see what the results look like.

In [64]:
results_data_size

[{'totalSizeMB': 0, 'fileCount': 17},
 {'totalSizeMB': 7, 'fileCount': 3028},
 {'totalSizeMB': 3, 'fileCount': 1028},
 {'totalSizeMB': 4, 'fileCount': 1702},
 {'totalSizeMB': 8, 'fileCount': 3905},
 {'totalSizeMB': 11, 'fileCount': 6242},
 {'totalSizeMB': 10, 'fileCount': 5236},
 {'totalSizeMB': 9, 'fileCount': 4706},
 {'totalSizeMB': 9, 'fileCount': 4303},
 {'totalSizeMB': 10, 'fileCount': 5255},
 {'totalSizeMB': 7, 'fileCount': 3500},
 {'totalSizeMB': 7, 'fileCount': 3700},
 {'totalSizeMB': 11, 'fileCount': 6221},
 {'totalSizeMB': 8, 'fileCount': 4235},
 {'totalSizeMB': 4, 'fileCount': 1539},
 {'totalSizeMB': 4, 'fileCount': 2012},
 {'totalSizeMB': 9, 'fileCount': 4404},
 {'totalSizeMB': 11, 'fileCount': 5521},
 {'totalSizeMB': 11, 'fileCount': 5614},
 {'totalSizeMB': 9, 'fileCount': 4609},
 {'totalSizeMB': 9, 'fileCount': 3949},
 {'totalSizeMB': 8, 'fileCount': 3927},
 {'totalSizeMB': 9, 'fileCount': 4301},
 {'totalSizeMB': 10, 'fileCount': 5043},
 {'totalSizeMB': 11, 'fileCount': 5

Now let us store the results in a dictionary with `month_year` pairs as keys.

In [66]:
sizes_counts = {k: r for k, r in zip(keys, results_data_size)}

Let us export the data as a `JSON` file.

In [67]:
SIZES_COUNTS_PATH = "../data/raw/betfair/sizes_counts.json"

In [68]:
with open(SIZES_COUNTS_PATH, "w") as outfile:
    json.dump(sizes_counts, outfile)

Let us calculate totals.

In [69]:
total_size = 0
total_files = 0

for k, v in sizes_counts.items():
    total_size += v['totalSizeMB']
    total_files += v['fileCount']
print(f'Total size: {total_size} MB.')
print(f'Total files: {total_files}.')

Total size: 712 MB.
Total files: 303894.


Load from `JSON` file.

In [70]:
with open(file=SIZES_COUNTS_PATH, mode="r") as f:
    sizes_counts = json.load(f)

We now know the order of magnitude of the total size of Betfair data files and the number of files, for the set of relevant countries.  

# Get list of data files with `DownloadListOfFiles`

For each month, get list of files for download, for the relevant set of countries.

In [71]:
URL_LIST_OF_FILES = "https://historicdata.betfair.com/api/DownloadListOfFiles"

In [72]:
results_list_of_files = await async_make_multiple_post_requests(URL_LIST_OF_FILES, data_list_countries, headers, timeout=15)

In [73]:
None in results_list_of_files

True

In [79]:
results_list_of_files = await retry_failed_requests(URL_LIST_OF_FILES, data_list_countries, headers, results_list_of_files, timeout=5, max_retry_cycles=1)

In [91]:
None in results_list_of_files

False

All requests were successful.
Now let us store the results in a dictionary with `month_year` pairs as keys.

In [92]:
lists_of_files = {k: r for k, r in zip(keys, results_list_of_files)}

Let us export the data as a `JSON` file.

In [93]:
LISTS_OF_FILES_PATH = "../data/raw/betfair/lists_of_files.json"

In [94]:
with open(LISTS_OF_FILES_PATH, "w") as outfile:
    json.dump(lists_of_files, outfile)

Let us load the `JSON` file.

In [95]:
with open(file=LISTS_OF_FILES_PATH, mode="r") as f:
    lists_of_files = json.load(f)

Let us see how a record looks like.

In [96]:
lists_of_files['10_2022'][:10]

['/xds_nfs/edp_processed/BASIC/2022/Oct/1/31772344/1.203873421.bz2',
 '/xds_nfs/edp_processed/BASIC/2022/Oct/1/31772343/1.203873511.bz2',
 '/xds_nfs/edp_processed/BASIC/2022/Oct/1/31761863/1.203677367.bz2',
 '/xds_nfs/edp_processed/BASIC/2022/Oct/1/31781522/1.204081256.bz2',
 '/xds_nfs/edp_processed/BASIC/2022/Oct/1/31772345/1.203873601.bz2',
 '/xds_nfs/edp_processed/BASIC/2022/Oct/1/31770605/1.203831018.bz2',
 '/xds_nfs/edp_processed/BASIC/2022/Oct/1/31770607/1.203830585.bz2',
 '/xds_nfs/edp_processed/BASIC/2022/Oct/1/31788403/1.204181749.bz2',
 '/xds_nfs/edp_processed/BASIC/2022/Oct/1/31781610/1.204044205.bz2',
 '/xds_nfs/edp_processed/BASIC/2022/Oct/1/31788559/1.204181204.bz2']

Check if the number of file names obtained with `DownloadListOfFiles` match the previous file count per month obtained with `GetAdvBasketDataSize`.

In [97]:
{k: len(v) for k, v in lists_of_files.items()} == {k: v['fileCount'] for k, v in sizes_counts.items()}

True

# Download data files with `DownloadFile`

Download data files with **asynchronous requests** using the endpoint `DownloadFile`.

In [98]:
# constants
BASE_PATH = '../data/raw/betfair/'
URL_DOWNLOAD = 'https://historicdata.betfair.com/api/DownloadFile'
# headers
headers = {"ssoid": token}

Define 3 asynchronous functions:  
- one to make a specific download;  
- one to wrap a list of requests (tasks);
- one to run a complete round of downloads for all lists of files (recall that we have a list of files for each month of data).

In [99]:
async def async_download_file(session, url, base_path, params, headers, previously_downloaded_filepaths, timeout, retries):
    """
    This function makes an asynchronous GET request using the aiohttp library. 
    If the filePath specified in params has already been downloaded, the function returns the filePath.
    Otherwise, the function attempts to make a GET request to a maximum of `retries` number of times (or until the request is successful).
    If the GET request is successful, the function writes the content of the response to a file in the directory specified by BASE_PATH. 
    The function then returns the filePath of the written file.
    If the GET request is unsuccessful, the function returns None.

    Parameters:
    session (aiohttp.ClientSession): A session instance from the aiohttp library.
    url (str): URL to make the GET request to.
    base_path (str): path to base folder where files will be written.
    params (dict): Dictionary of parameters with the file path to include in the GET request. 
        Schema: {"filePath": file_path}
    headers (dict): Dictionary of headers to include in the GET request. 
        Schema: {"ssoid": token}, where `token` is a valid session token.
    previously_downloaded_filepaths (list): List of previously downloaded file paths.
    timeout (int or float): Timeout value in seconds for the GET request.
    retries (int): Number of times to retry the GET request if it fails.

    Returns:
    str: The file path of the written file if the GET request is successful 
    (or if it has already been downloaded as per `previously_downloaded_filepaths`).
    None: If the GET request is unsuccessful.
    """

    if params['filePath'] in previously_downloaded_filepaths:
        return params['filePath']
    retry_count = 0
    while retry_count < retries:
        try:
            async with session.get(url, params=params, headers=headers, timeout=timeout) as response:
                if response.status != 200:
                    retry_count += 1
                    continue
                dir_path_to_write = os.path.join(base_path, os.path.dirname(params['filePath'])[1::])
                file_path_to_write = os.path.join(base_path, params['filePath'][1::])
                if not os.path.exists(dir_path_to_write):
                    os.makedirs(dir_path_to_write)
                async for data in response.content.iter_chunked(1024):
                    async with aiofiles.open(file_path_to_write, 'ba') as f:
                        await f.write(data)
                return params['filePath']
        except (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError):
            retry_count += 1
    return None

In [100]:
async def async_download_multiple_files(url, base_path, params_list, headers, previously_downloaded_filepaths, timeout=10, retries=20):
    """
    Asynchronously download multiple files from a URL.
    This function uses the aiohttp library to download multiple files in parallel. 
    If a file has already been downloaded, it is not re-downloaded. 
    The function returns a list of file paths for all files that were downloaded successfully.

    Parameters:
    url (str): URL to download files from.
    base_path (str): path to base folder where files will be written.
    params_list (list): List of dictionaries of parameters, each with a "filePath" key specifying the file path to include in the GET request.
         Schema: [{"filePath": file_path1}, {"filePath": file_path2}, ...]
    headers (dict): Dictionary of headers to include in the GET request. 
        Schema: {"ssoid": token}, where `token` is a valid session token.
    previously_downloaded_filepaths (list): List of paths of previously downloaded files.
    timeout (int or float): Timeout value in seconds for each GET request.
    retries (int): Number of times to retry each GET request if it fails.
    
    Returns:
    List[str]: List of file paths of the written files that were successfully downloaded.
    (or previously downloaded as per `previously_downloaded_filepaths`).
    """
    async with aiohttp.ClientSession() as session:
        tasks = [async_download_file(session, url, base_path, params, headers, 
                                  previously_downloaded_filepaths, timeout, retries) 
                 for params in params_list]
        downloaded_files = await asyncio.gather(*tasks, return_exceptions=False)
        return downloaded_files

In [101]:
async def run_round_of_downloads(lists_of_files_to_download, lists_of_downloaded_files, url, base_path, headers):
    """
    Download multiple files in parallel for each pair `month_year` in lists_of_files_to_download.
    Keep track of downloaded files per category in lists_of_downloaded_files.

    Params:
    lists_of_files_to_download (dict): maps a pair `month_year` to a list of file paths to download.
        Schema: {'month_year': [file_path1, file_path2]}, 
        like in {'5_2015': ['/xds_nfs/hdfs_supreme/BASIC/2015/May/1/27433050/1.118512110.bz2',
                            '/xds_nfs/hdfs_supreme/BASIC/2015/May/1/27433196/1.118516429.bz2',
                            ...]}
    lists_of_downloaded_files (dict):  maps a pair `month_year` to a list of previously downloaded file paths.
        Schema: same as lists_of_files_to_download's.
    url (str): URL to make the GET request to.
    base_path (str): base file path to save downloaded files.
    headers (dict): Dictionary of headers to include in the GET request. 
        Schema: {"ssoid": token}, where `token` is a valid session token.
    Returns:

    lists_of_downloaded_files (dict): updated dictionary of downloaded files per pair `month_year`.
    """

    if not lists_of_downloaded_files:
        lists_of_downloaded_files = {}

    for k, v in lists_of_files_to_download.items():
        params_list = [{"filePath": file_path} for file_path in v]
        if k in lists_of_downloaded_files:
            previously_downloaded_filepaths = lists_of_downloaded_files[k]
        else:
            previously_downloaded_filepaths = []
        lists_of_downloaded_files[k] = await async_download_multiple_files(url, base_path, params_list, headers, previously_downloaded_filepaths)
    
    return lists_of_downloaded_files

Perform downloads. In the previous section we named the full dictionary with all lists of files as `lists_of_files`.

In [None]:
lists_of_downloaded_files = await run_round_of_downloads(lists_of_files, None, URL_DOWNLOAD, BASE_PATH, headers)

In practice, a single call of the asynchronous function `run_round_of_downloads` was not sufficient to download all files, as the connection with the API led to many timeouts.  
These were not serious issues, and with more rounds of requests and retries, all files were downloaded.  
Below is an ilustrative example of a round of downloads, for 5 files.

In [109]:
lists_of_files_to_download_example = {k: v[:5] for k, v in lists_of_files.items() if k == '4_2015'}

In [110]:
lists_of_files_to_download_example

{'4_2015': ['/xds_nfs/hdfs_supreme/BASIC/2015/Apr/29/27427653/1.118400175.bz2',
  '/xds_nfs/hdfs_supreme/BASIC/2015/Apr/29/27425814/1.118368559.bz2',
  '/xds_nfs/hdfs_supreme/BASIC/2015/Apr/30/27433160/1.118515985.bz2',
  '/xds_nfs/hdfs_supreme/BASIC/2015/Apr/30/27428030/1.118406188.bz2',
  '/xds_nfs/hdfs_supreme/BASIC/2015/Apr/30/27427545/1.118394357.bz2']}

In [111]:
lists_of_downloaded_files_example = await run_round_of_downloads(lists_of_files_to_download_example, None, URL_DOWNLOAD, '../data/test/', headers)

In [112]:
lists_of_downloaded_files_example

{'4_2015': ['/xds_nfs/hdfs_supreme/BASIC/2015/Apr/29/27427653/1.118400175.bz2',
  '/xds_nfs/hdfs_supreme/BASIC/2015/Apr/29/27425814/1.118368559.bz2',
  '/xds_nfs/hdfs_supreme/BASIC/2015/Apr/30/27433160/1.118515985.bz2',
  '/xds_nfs/hdfs_supreme/BASIC/2015/Apr/30/27428030/1.118406188.bz2',
  '/xds_nfs/hdfs_supreme/BASIC/2015/Apr/30/27427545/1.118394357.bz2']}

All 5 files were indeed downloaded. Write dictionary as `JSON`.

In [113]:
LISTS_OF_DOWNLOADED_FILES_PATH = '../data/raw/betfair/lists_of_downloaded_files_example.json'

with open(LISTS_OF_DOWNLOADED_FILES_PATH, "w") as outfile:
    json.dump(lists_of_downloaded_files_example, outfile)

Now we check if the files physically in the hard drive indeed correspond to the full list of files we intended to download.  
We check the full list of ~300k files.

In [114]:
def list_files(file_path, file_type):
    """
    Walks through a file path and returns all files that have the specified file type.
    
    Params:
    file_path (str): The directory to search for files in and its subdirectories.
    file_type (str): The file extension to search for (e.g. '.txt', '.pdf').
    
    Returns:
    matching_files (list): A list of filenames that have the specified file type.
    """
    matching_files = []
    for dir_path, dir_names, file_names in os.walk(file_path):
        for filename in file_names:
            if filename.lower().endswith(file_type.lower()):
                matching_files.append(filename)
    return matching_files

In [115]:
# downloaded files are .bz2 files
downloaded_files_in_hd = list_files(BASE_PATH, '.bz2')

Now we compare `downloaded_files_in_hd` with the original `lists_of_files` to be downloaded.  
As `lists_of_files` is a dictionary with lists, we flatten it.

In [116]:
lists_of_files_flattened = [item.split("/")[-1] for sublist in lists_of_files.values() for item in sublist]

In [117]:
set(downloaded_files_in_hd) == set(lists_of_files_flattened)

True

Both sets match, so all files were downloaded successfully.