In [103]:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from time import perf_counter
import math
import nest_asyncio
nest_asyncio.apply()

df = pd.read_csv('datasets/cities.csv')
print(df.head())

  station_id   city_name      country       state iso2 iso3   latitude  \
0      41515    Asadabad  Afghanistan       Kunar   AF  AFG  34.866000   
1      38954    Fayzabad  Afghanistan  Badakhshan   AF  AFG  37.129761   
2      41560   Jalalabad  Afghanistan   Nangarhar   AF  AFG  34.441527   
3      38947      Kunduz  Afghanistan      Kunduz   AF  AFG  36.727951   
4      38987  Qala i Naw  Afghanistan     Badghis   AF  AFG  34.983000   

   longitude  
0  71.150005  
1  70.579247  
2  70.436103  
3  68.872530  
4  63.133300  


In [104]:
#  Haversine formula for calculating distance between two points on a sphere
def haversine(coord1, coord2):
    R = 6371  # Earth radius in km

    lat1, lon1 = coord1
    lat2, lon2 = coord2

    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)

    a = math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    return R * c

In [105]:
import threading

def worker(coord, cities_chunk, results, index):
    for city in cities_chunk:
        distance = haversine(coord, (city['latitude'], city['longitude']))
        results[index].append((city['city_name'], distance))

def manual_threading_parallelism(coord, cities, num_threads):
    threads = []
    results = [[] for _ in range(num_threads)]
    chunk_size = len(cities) // num_threads

    for i in range(num_threads):
        start = i * chunk_size
        end = None if i == num_threads - 1 else start + chunk_size
        cities_chunk = cities[start:end]
        thread = threading.Thread(target=worker, args=(coord, cities_chunk, results, i))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    distances = [item for sublist in results for item in sublist]

    # find the closest city parrallel, don't use min
    closest_city = distances[0]
    for city in distances[1:]:
        if city[1] < closest_city[1]:
            closest_city = city
    print(f'The closest city to {coord} is {closest_city[0]} at {closest_city[1]:.2f} km away')
    return 

# Usage
num_threads = 4
cities = df.to_dict('records')  # Convert DataFrame to list of dicts

# Measure execution time
start_time = perf_counter()
distances = manual_threading_parallelism((40.7128, -74.0060), cities, num_threads)
end_time = perf_counter()
execution_time_threading = end_time - start_time
print(f'Execution time with {num_threads} threads: {execution_time_threading:.5f} s')

The closest city to (40.7128, -74.006) is Trenton at 83.25 km away
Execution time with 4 threads: 0.00495 s


In [106]:
from queue import Queue
import queue

def queue_worker(coord, work_queue, result_queue):
    while not work_queue.empty():
        try:
            city = work_queue.get_nowait()
        except queue.Empty:
            break
        distance = haversine(coord, (city['latitude'], city['longitude']))
        result_queue.put((city['city_name'], distance))
        work_queue.task_done()

def queue_based_threading_parallelism(coord, cities, num_threads):
    work_queue = Queue()
    result_queue = Queue()

    # Enqueue all cities
    for city in cities:
        work_queue.put(city)

    threads = [threading.Thread(target=queue_worker, args=(coord, work_queue, result_queue)) for _ in range(num_threads)]

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    distances = results

    # find the closest city parrallel, don't use min
    closest_city = distances[0]
    for city in distances[1:]:
        if city[1] < closest_city[1]:
            closest_city = city
    print(f'The closest city to {coord} is {closest_city[0]} at {closest_city[1]:.2f} km away')
    return 


# Usage
num_threads = 4
cities = df.to_dict('records')

# Measure execution time
start_time = perf_counter()
distances = queue_based_threading_parallelism((40.7128, -74.0060), cities, num_threads)
end_time = perf_counter()
execution_time_queue_based = end_time - start_time
print(f'Execution time with queue-based threading and {num_threads} threads: {execution_time_queue_based:.5f} s')


The closest city to (40.7128, -74.006) is Trenton at 83.25 km away
Execution time with queue-based threading and 4 threads: 0.01032 s
