In [1]:
from dask.distributed import Client

**Load Data**

In [2]:
# load the country codes
from utils.geocode_utils import get_country_code_lookup

codes = get_country_code_lookup()

In [3]:
# load reviews
import kagglehub
import pandas as pd
import swifter
from timeit import timeit
import os

path = kagglehub.dataset_download("christopheiv/winemagdata130k")
fname = 'winemag-data-130k-v2.csv'
reviews = pd.read_csv(os.path.join(path, fname))
reviews.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 129971 entries, 0 to 129970
Data columns (total 14 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   Unnamed: 0             129971 non-null  int64  
 1   country                129908 non-null  object 
 2   description            129971 non-null  object 
 3   designation            92506 non-null   object 
 4   points                 129971 non-null  int64  
 5   price                  120975 non-null  float64
 6   province               129908 non-null  object 
 7   region_1               108724 non-null  object 
 8   region_2               50511 non-null   object 
 9   taster_name            103727 non-null  object 
 10  taster_twitter_handle  98758 non-null   object 
 11  title                  129971 non-null  object 
 12  variety                129970 non-null  object 
 13  winery                 129971 non-null  object 
dtypes: float64(1), int64(2), object(11)


**Method to throttle**

In [8]:
def lookup(name):
  return codes[name] if name in codes else None

### Throttling Logic

In [21]:
from timeit import default_timer as timer
start = timer()
reviews['code'] = reviews['country'].swifter.apply(lookup)
elapsed = timer() - start
print(f'{reviews.shape[0]:,d}', 'rows augmented in', f'{elapsed:.3f}', 'seconds.', f'{elapsed/reviews.shape[0]*1_000:.5f}', 'ms per row.')

Pandas Apply:   0%|          | 0/129971 [00:00<?, ?it/s]

129,971 rows augmented in 0.203 seconds. 0.00156 ms per row.


**Parallelize**

In [17]:
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 32.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:58809,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 32.00 GiB

0,1
Comm: tcp://127.0.0.1:58820,Total threads: 4
Dashboard: http://127.0.0.1:58827/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:58812,
Local directory: /var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/dask-scratch-space/worker-3_we7c_e,Local directory: /var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/dask-scratch-space/worker-3_we7c_e

0,1
Comm: tcp://127.0.0.1:58821,Total threads: 4
Dashboard: http://127.0.0.1:58824/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:58814,
Local directory: /var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/dask-scratch-space/worker-s86uffkp,Local directory: /var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/dask-scratch-space/worker-s86uffkp

0,1
Comm: tcp://127.0.0.1:58822,Total threads: 4
Dashboard: http://127.0.0.1:58825/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:58816,
Local directory: /var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/dask-scratch-space/worker-yv82v9_3,Local directory: /var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/dask-scratch-space/worker-yv82v9_3

0,1
Comm: tcp://127.0.0.1:58823,Total threads: 4
Dashboard: http://127.0.0.1:58826/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:58818,
Local directory: /var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/dask-scratch-space/worker-dp1d4ts4,Local directory: /var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/dask-scratch-space/worker-dp1d4ts4


In [None]:
import dask.dataframe as dd
parallelize = True
location_cols = ['winery', 'region_1', 'region_2', 'province', 'country']
d_reviews = dd.from_pandas(reviews[location_cols], npartitions=100)
d_reviews.head()


Unnamed: 0,winery,region_1,region_2,province,country
0,Nicosia,Etna,,Sicily & Sardinia,Italy
1,Quinta dos Avidagos,,,Douro,Portugal
2,Rainstorm,Willamette Valley,Willamette Valley,Oregon,US
3,St. Julian,Lake Michigan Shore,,Michigan,US
4,Sweet Cheeks,Willamette Valley,Willamette Valley,Oregon,US


In [None]:
d_reviews['code'] = d_reviews['country'].apply(lookup, meta=('country', 'object'))
codes = client.compute(d_reviews)
d_reviews = codes.result()

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


In [20]:
d_reviews.head()

Unnamed: 0,winery,region_1,region_2,province,country,code
0,Nicosia,Etna,,Sicily & Sardinia,Italy,IT
1,Quinta dos Avidagos,,,Douro,Portugal,PT
2,Rainstorm,Willamette Valley,Willamette Valley,Oregon,US,US
3,St. Julian,Lake Michigan Shore,,Michigan,US,US
4,Sweet Cheeks,Willamette Valley,Willamette Valley,Oregon,US,US


In [None]:
# close the dask cluster
client.shutdown()



**Tests**

In [10]:
import unittest

class CountryCodeLookupTest(unittest.TestCase):

  def test_lookup_valid_country(self):
    self.assertEqual(lookup('France'), 'FR')
    self.assertEqual(lookup('United States Of America'), 'US')

  def test_lookup_unknown_country(self):
    self.assertIsNone(lookup('United States'))

  def test_overrides(self):
    self.assertEqual(lookup('US'), 'US')



In [11]:
if __name__ == '__main__':
    unittest.main(argv=[''], verbosity=2, exit=False)

# https://hamatti.org/posts/unit-test-your-python-code-in-jupyter-notebooks/

test_lookup_unknown_country (__main__.CountryCodeLookupTest.test_lookup_unknown_country) ... ERROR
test_lookup_valid_country (__main__.CountryCodeLookupTest.test_lookup_valid_country) ... ERROR
test_overrides (__main__.CountryCodeLookupTest.test_overrides) ... ERROR

ERROR: test_lookup_unknown_country (__main__.CountryCodeLookupTest.test_lookup_unknown_country)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/ipykernel_16297/2337193784.py", line 10, in test_lookup_unknown_country
    self.assertIsNone(lookup('United States'))
                      ^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/y8/_pw0hcc137n3wqx4y4cz33100000gq/T/ipykernel_16297/594008575.py", line 2, in lookup
    return codes[name] if name in codes else None
                          ^^^^^^^^^^^^^
TypeError: argument of type 'Future' is not iterable

ERROR: test_lookup_valid_country (__main__.CountryCodeLook