# Coding for Economists - Advanced Session 4

## 1. Setup Environment

In [1]:
# %pip install line-profiler memory-profiler dask 'dask[distributed]'

In [2]:
# Install in Google Colab
# !pip install gensim bertopic

In [3]:
import numpy as np
import pandas as pd
# Turn on copy on write
pd.options.mode.copy_on_write = True

In [4]:
# Load profilers
%load_ext line_profiler
%load_ext memory_profiler

## 2. A Slow Application

In [54]:
df = pd.read_csv('econ_panel.csv')
df = df[df['Indicator'] == 'NGDP_R']
df = df.drop(columns=['Indicator']).rename(columns={'Country':'code', 'Year':'year', 'Value':'value'})
df.head()

Unnamed: 0,code,year,value
0,AFG,2002,183.26
1,AFG,2003,198.736
2,AFG,2004,200.069
3,AFG,2005,223.737
4,AFG,2006,235.731


In [None]:
gdp['year']  = pd.to_datetime(gdp['year'], format='%Y')
gdp['value'] = pd.to_numeric(gdp['value'], errors='coerce')
# this will convert any remaining object-columns to their inferred dtypes
gdp = gdp.infer_objects()
def interpolate_country(group):
    # pick only the two columns we care about
    s = (
        group[['year', 'value']]
         .set_index('year')['value']      # annual values, DatetimeIndex
         .resample('W')                    # up-sample to weekly
         .interpolate(method='linear')     # linear fill
    )
    return s
gdp_weekly = (
    gdp
    .groupby('code', group_keys=True)
    .apply(interpolate_country)            # returns a Series named 'value'
    .reset_index()                          # brings back 'code' & 'year' as cols
)
gdp_daily.to_csv('gdp_panel.csv', index=False)

# regions = pd.read_csv('country_region.csv')
# regions.head()
# regions = regions.rename(columns={'Entity': 'country', 'Code':'code','World regions according to OWID':'region'})
# regions[['country', 'code','region']].to_csv('country_region.csv', index=False)

  .apply(interpolate_country)            # returns a Series named 'value'


### 2.1 Load Data

In [45]:
import csv

gdp = {}
path = 'gdp_panel.csv'
with open(path) as f:
    reader = csv.DictReader(f)
    for row in reader:
        country = row['code']
        value = float(row['value'])
        if country not in gdp:
            gdp[country] = []
        gdp[country].append(value)

mapping = {}
path = 'country_region.csv'
with open(path) as f:
    reader = csv.DictReader(f)
    for row in reader:
        country = row['code']
        region = row['region']
        if country not in mapping:
            mapping[country] = region

print(gdp['IRL'])

[50.393, 50.39645901639344, 50.399918032786886, 50.40337704918033, 50.40683606557377, 50.41029508196721, 50.413754098360656, 50.4172131147541, 50.42067213114754, 50.42413114754098, 50.427590163934426, 50.43104918032787, 50.43450819672131, 50.43796721311475, 50.441426229508195, 50.44488524590164, 50.44834426229508, 50.45180327868852, 50.455262295081965, 50.45872131147541, 50.46218032786885, 50.46563934426229, 50.469098360655735, 50.47255737704918, 50.47601639344262, 50.47947540983606, 50.482934426229505, 50.486393442622955, 50.4898524590164, 50.49331147540984, 50.49677049180328, 50.500229508196725, 50.50368852459017, 50.50714754098361, 50.51060655737705, 50.514065573770495, 50.51752459016394, 50.52098360655738, 50.52444262295082, 50.527901639344265, 50.53136065573771, 50.53481967213115, 50.53827868852459, 50.541737704918035, 50.54519672131148, 50.54865573770492, 50.55211475409836, 50.555573770491804, 50.55903278688525, 50.56249180327869, 50.56595081967213, 50.569409836065574, 50.5728688

In [46]:
print(mapping['IRL'])

Europe


### 2.2 Compute GDP Growth and Volatility

In [47]:
from statistics import pstdev

metrics = {}
window = 365
for country, series in gdp.items():
    growth = []
    vol = []
    for i in range(window, len(series)):
        prev = series[i-window]
        curr = series[i]
        growth.append((curr - prev) / prev)
        vol.append(pstdev(series[i-window:i]))
    metrics[country] = {'growth': growth, 'vol': vol}

print(metrics['IRL'])

{'growth': [0.025053896049184524, 0.02505217644747351, 0.025023791405490287, 0.02499541025944927, 0.024967033008548703, 0.024938659651986477, 0.024910290188961275, 0.024881924618671433, 0.024853562940316074, 0.024825205153093968, 0.02479685125620468, 0.024768501248847424, 0.0247401551302222, 0.024711812899528663, 0.02468347455596725, 0.02465514009873805, 0.02462680952704195, 0.02459848284007948, 0.02457016003705195, 0.024541841117160332, 0.024513526079606378, 0.024485214923591504, 0.024456907648317892, 0.024428604252987395, 0.024400304736802634, 0.0243720090989659, 0.024343717338680255, 0.024315429455148276, 0.024287145447573755, 0.024258865315159705, 0.02423058905711006, 0.024202316672628423, 0.02417404816091916, 0.024145783521186306, 0.024117522752634676, 0.024089265854468866, 0.02406101282589371, 0.02403276366611467, 0.024004518374336863, 0.02397627694976619, 0.02394803939160821, 0.02391980569906925, 0.023891575871355302, 0.023863349907673134, 0.023835127807229177, 0.023806909569230

### 2.3 Aggregate Regions

In [48]:
# mapping: country → region
agg = {}  # plain dict

# Collect all growth & vol values by region
for country, m in metrics.items():
    region = mapping[country]
    if region not in agg:
        agg[region] = {'growth': [], 'vol': []}
    agg[region]['growth'].extend(m['growth'])
    agg[region]['vol'].extend(m['vol'])

# Compute regional averages
result = {}
for region, d in agg.items():
    total_growth = sum(d['growth'])
    total_vol    = sum(d['vol'])
    count_growth = len(d['growth'])
    count_vol    = len(d['vol'])
    result[region] = {
        'avg_growth': total_growth / count_growth if count_growth else 0,
        'avg_vol':    total_vol    / count_vol    if count_vol    else 0
    }

print(result)

{'Asia': {'avg_growth': 0.051404015722441514, 'avg_vol': 3481.3651531567803}, 'Africa': {'avg_growth': 0.04026273762645489, 'avg_vol': 53.30404019347396}, 'Europe': {'avg_growth': 0.022876323584881706, 'avg_vol': 46.059394111544194}, 'South America': {'avg_growth': 0.028355296970392158, 'avg_vol': 407.22675804007355}, 'North America': {'avg_growth': 0.029643883289800194, 'avg_vol': 11.192510511272188}, 'Oceania': {'avg_growth': 0.024703008920248598, 'avg_vol': 1.043666657089604}}


### 2.4 Time and Memory Profiling

In [49]:
def run_analysis_v0():
    # Load Data
    gdp = {}
    path = 'gdp_panel.csv'
    with open(path) as f:
        reader = csv.DictReader(f)
        for row in reader:
            country = row['code']
            value = float(row['value'])
            if country not in gdp:
                gdp[country] = []
            gdp[country].append(value)
    
    mapping = {}
    path = 'country_region.csv'
    with open(path) as f:
        reader = csv.DictReader(f)
        for row in reader:
            country = row['code']
            region = row['region']
            if country not in mapping:
                mapping[country] = region

    # Compute dgp growth rate and volitility
    metrics = {}
    window = 365
    for country, series in gdp.items():
        growth = []
        vol = []
        for i in range(window, len(series)):
            prev = series[i-window]
            curr = series[i]
            growth.append((curr - prev) / prev)
            vol.append(pstdev(series[i-window:i]))
        metrics[country] = {'growth': growth, 'vol': vol}

    # Aggregate Regions
    agg = {}  # plain dict

    for country, m in metrics.items():
        region = mapping[country]
        if region not in agg:
            agg[region] = {'growth': [], 'vol': []}
        agg[region]['growth'].extend(m['growth'])
        agg[region]['vol'].extend(m['vol'])
    
    result = {}
    for region, d in agg.items():
        total_growth = sum(d['growth'])
        total_vol    = sum(d['vol'])
        count_growth = len(d['growth'])
        count_vol    = len(d['vol'])
        result[region] = {
            'avg_growth': total_growth / count_growth if count_growth else 0,
            'avg_vol':    total_vol    / count_vol    if count_vol    else 0
        }
    return result
    
%lprun -f run_analysis_v0 run_analysis_v0()

Timer unit: 1e-09 s

Total time: 512.162 s
File: /var/folders/j4/72zz472s4sdd1r1ssyml8k5w0000gn/T/ipykernel_30612/1188526080.py
Function: run_analysis_v0 at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def run_analysis_v0():
     2                                               # Load Data
     3         1       9000.0   9000.0      0.0      gdp = {}
     4         1          0.0      0.0      0.0      path = 'gdp_panel.csv'
     5         2     481000.0 240500.0      0.0      with open(path) as f:
     6         1     109000.0 109000.0      0.0          reader = csv.DictReader(f)
     7   2518945 2995416000.0   1189.2      0.6          for row in reader:
     8   2518944  150304000.0     59.7      0.0              country = row['code']
     9   2518944  473084000.0    187.8      0.1              value = float(row['value'])
    10   2518944  172232000.0     68.4      0.0              if country not in gdp:
    11

In [50]:
import sys
!"{sys.executable}" -m memory_profiler app_v0.py

^C
Filename: app_v0.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    10   54.688 MiB   54.688 MiB           1   @profile
    11                                         def run_analysis_v0():
    12                                             # Load Data
    13   54.688 MiB    0.000 MiB           1       gdp = {}
    14   54.688 MiB    0.000 MiB           1       path = 'gdp_panel.csv'
    15  153.844 MiB    0.000 MiB           2       with open(path) as f:
    16   54.688 MiB    0.000 MiB           1           reader = csv.DictReader(f)
    17  153.844 MiB    0.156 MiB     2518945           for row in reader:
    18  153.844 MiB    0.000 MiB     2518944               country = row['code']
    19  153.844 MiB    0.000 MiB     2518944               value = float(row['value'])
    20  153.844 MiB   77.156 MiB     2518944               if country not in gdp:
    21  153.562 MiB    0.000 MiB         189                   gdp[country] = []
    22  153.844 MiB   21.844 Mi

## 3. Panda Dataframe

### 3.1 Load Data

In [12]:
import pandas as pd
gdp = pd.read_csv(
    'gdp_panel.csv',
    usecols=['code','year','value'], # Only read necessary columns
    parse_dates=['year'],
    dtype={'code':'category'}
).sort_values(['code','year'])

gdp.head()

Unnamed: 0,code,year,value
0,AFG,2002-01-01,183.26
1,AFG,2003-01-01,198.736
2,AFG,2004-01-01,200.069
3,AFG,2005-01-01,223.737
4,AFG,2006-01-01,235.731


In [13]:
mapping = pd.read_csv(
    'country_region.csv',
    usecols=['code','region'], # Only read necessary columns
    dtype={'code':'category', 'region':'category'}
).sort_values(['code'])

mapping.head()

Unnamed: 0,code,region
11,ABW,North America
0,AFG,Asia
6,AGO,Africa
7,AIA,North America
1,ALA,Europe


### 3.2 Compute GDP Growth and Volatility

In [14]:
window = 365
# pivot to wide form: rows=date, cols=country
panel = gdp.pivot(index='year', columns='code', values='value')

# rolling growth: pct_change over window periods
growth = panel.pct_change(periods=window, fill_method=None)

# rolling volatility: std over window periods
vol = panel.rolling(window=window).std()

# melt back to long form & merge region info
growth = growth.stack().rename('growth').reset_index()
vol    = vol.stack().rename('vol').reset_index()
metrics = pd.merge(growth, vol, on=['year','code'])
metrics = metrics.merge(mapping, on='code')

metrics.head()

Unnamed: 0,year,code,growth,vol,region
0,1984-01-01,AGO,0.05592,15.980693,Africa
1,1984-01-01,ALB,0.121608,8.694685,Europe
2,1984-01-01,ARE,-0.007765,18.792364,Asia
3,1984-01-01,ARG,-0.034092,8.660888,South America
4,1984-01-01,ATG,0.192771,0.063253,North America


### 3.3 Aggregate Regions

In [15]:
result = (
    metrics
    .groupby(['region'], observed=False)
    .agg(avg_growth=('growth','mean'),
         avg_vol=('vol','mean'))
)

print(result)

               avg_growth       avg_vol
region                                 
Africa           0.184300    237.669407
Asia             0.233143  15548.356671
Europe           0.100774    196.459055
North America    0.129856     48.756135
Oceania          0.106929      4.802754
South America    0.127411   1836.365379


### 3.4 Time and Memory Profiling

In [16]:
def run_analysis_v1():
    # Load Data
    gdp = pd.read_csv(
        'gdp_panel.csv',
        usecols=['code','year','value'], # Only read necessary columns
        parse_dates=['year'],
        dtype={'code':'category'}
    ).sort_values(['code','year'])

    mapping = pd.read_csv(
        'country_region.csv',
        usecols=['code','region'], # Only read necessary columns
        dtype={'code':'category', 'region':'category'}
    ).sort_values(['code'])

    window = 365
    # Compute GDP Growth and Volatility
    # pivot to wide form: rows=date, cols=country
    panel = gdp.pivot(index='year', columns='code', values='value')
    
    # rolling growth: pct_change over window periods
    growth = panel.pct_change(periods=window, fill_method=None)
    
    # rolling volatility: std over window periods
    vol = panel.rolling(window=window).std()
    
    # melt back to long form & merge region info
    growth = growth.stack().rename('growth').reset_index()
    vol    = vol.stack().rename('vol').reset_index()
    metrics = pd.merge(growth, vol, on=['year','code'])
    metrics = metrics.merge(mapping, on='code')

    # Aggregate Regions
    result = (
        metrics
        .groupby(['region'], observed=False)
        .agg(avg_growth=('growth','mean'),
             avg_vol=('vol','mean'))
    )
    return result

%lprun -f run_analysis_v1 run_analysis_v1()

Timer unit: 1e-09 s

Total time: 0.025206 s
File: /var/folders/j4/72zz472s4sdd1r1ssyml8k5w0000gn/T/ipykernel_30612/2510214605.py
Function: run_analysis_v1 at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def run_analysis_v1():
     2                                               # Load Data
     3         3   11683000.0    4e+06     46.4      gdp = pd.read_csv(
     4         1          0.0      0.0      0.0          'gdp_panel.csv',
     5         1          0.0      0.0      0.0          usecols=['code','year','value'], # Only read necessary columns
     6         1       1000.0   1000.0      0.0          parse_dates=['year'],
     7         1          0.0      0.0      0.0          dtype={'code':'category'}
     8         1     591000.0 591000.0      2.3      ).sort_values(['code','year'])
     9                                           
    10         3    1375000.0 458333.3      5.5      mapping = pd.read_c

In [17]:
import sys
!"{sys.executable}" -m memory_profiler app_v1.py

Filename: app_v1.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     9  120.953 MiB  120.953 MiB           1   @profile
    10                                         def run_analysis_v1():
    11                                             # Load Data
    12  124.531 MiB    3.344 MiB           3       gdp = pd.read_csv(
    13  120.953 MiB    0.000 MiB           1           'gdp_panel.csv',
    14  120.953 MiB    0.000 MiB           1           usecols=['code','year','value'], # Only read necessary columns
    15  120.953 MiB    0.000 MiB           1           parse_dates=['year'],
    16  120.953 MiB    0.000 MiB           1           dtype={'code':'category'}
    17  124.531 MiB    0.234 MiB           1       ).sort_values(['code','year'])
    18                                         
    19  124.703 MiB    0.172 MiB           3       mapping = pd.read_csv(
    20  124.531 MiB    0.000 MiB           1           'country_region.csv',
    21  124.531 MiB    0.000 

## 4. Memory Bound -> Process by Chunk

### 4.1 Prepare Parquet Data

In [18]:
import dask.dataframe as dd

# Read the raw CSV in parallel
ddf = dd.read_csv(
    'gdp_panel.csv',
    usecols=['code','year','value'],
    parse_dates=['year'],
    dtype={'code':'category'}
)
ddf['code'] = ddf['code'].astype(str)

# Write out as partitioned Parquet
#    Here we partition by 'country' so that each country’s data lives in its own sub-folder.
ddf.to_parquet(
    'gdp_panel_parquet/',
    engine='pyarrow',
    write_index=False,         # drop the old CSV index
    partition_on=['code'],  # creates subfolders country=AAA, country=BBB, …
    compression='snappy'       # fast compression
)

# Read the raw CSV in parallel
ddf_region = dd.read_csv(
    'country_region.csv',
    usecols=['code','region'],
    dtype={'code':'category'}
)
ddf_region['code'] = ddf_region['code'].astype(str)

# Write out as partitioned Parquet
ddf_region.to_parquet(
    'country_region_parquet/',
    engine='pyarrow',
    write_index=False,         # drop the old CSV index
    partition_on=['code'],  # creates subfolders country=AAA, country=BBB, …
    compression='snappy'       # fast compression
)

### 4.2 Load Data

In [19]:
import dask.dataframe as dd
import pandas as pd

ddf = dd.read_parquet(
    'gdp_panel_parquet/',
    columns=['code','year','value'],
    engine='pyarrow'
)

regions = dd.read_parquet('country_region_parquet/')

### 4.3 Compute GDP Growth and Volatility

In [20]:
def summarize_country(df, window=365):
    # If this slice is empty, return an empty DataFrame
    if df.empty:
        return pd.DataFrame(columns=['code','mean_growth','mean_vol'])

    # Otherwise do the rolling stats
    df = df.sort_values('year')
    growth = df['value'].pct_change(periods=window, fill_method=None).mean()
    vol    = df['value'].rolling(window=window).std().mean()
    return pd.DataFrame({
        'code':        [df['code'].iat[0]],
        'mean_growth': [growth],
        'mean_vol':    [vol]
    })

# Build a proper meta so Dask knows what comes back
meta = pd.DataFrame({
    'code':        pd.Series(dtype='object'),
    'mean_growth': pd.Series(dtype='float64'),
    'mean_vol':    pd.Series(dtype='float64'),
})

country_summaries = ddf.groupby('code', observed=True).apply(
    summarize_country,
    meta=meta
)

country_summaries = country_summaries.reset_index(drop=True)

### 4.4 Aggregate Regions

In [21]:
country_summaries = country_summaries.astype({'code': 'string'})
regions = regions.astype({'code': 'string'})

country_summaries = country_summaries.merge(regions, on='code')

regional_stats = (
    country_summaries
    .groupby('region', observed=True)[['mean_growth', 'mean_vol']]
    .mean()
    .compute()
)

regional_stats

Unnamed: 0_level_0,mean_growth,mean_vol
region,Unnamed: 1_level_1,Unnamed: 2_level_1
Africa,0.183315,220.190844
Asia,0.23896,13674.881557
North America,0.129856,48.059538
Europe,0.103997,230.764849
South America,0.127411,1800.576875
Oceania,0.09984,4.007155


### 4.5 Time and Memory Profiling

In [22]:
def run_analysis_v2():
    # Load data
    ddf = dd.read_parquet(
        'gdp_panel_parquet/',
        columns=['code','year','value'],
        engine='pyarrow'
    )
    
    regions = dd.read_parquet('country_region_parquet/')

    # Compute GDP Growth and Volatility
    meta = pd.DataFrame({
        'code':        pd.Series(dtype='object'),
        'mean_growth': pd.Series(dtype='float64'),
        'mean_vol':    pd.Series(dtype='float64'),
    })
    
    country_summaries = ddf.groupby('code', observed=True).apply(
        summarize_country,
        meta=meta
    )
    
    country_summaries = country_summaries.reset_index(drop=True)

    # Aggregate regions
    country_summaries = country_summaries.astype({'code': 'string'})
    regions = regions.astype({'code': 'string'})
    
    country_summaries = country_summaries.merge(regions, on='code')
    
    result = (
        country_summaries
        .groupby('region', observed=True)[['mean_growth', 'mean_vol']]
        .mean()
        .compute()
    )
    return result

%lprun -f run_analysis_v2 run_analysis_v2()

Timer unit: 1e-09 s

Total time: 2.40633 s
File: /var/folders/j4/72zz472s4sdd1r1ssyml8k5w0000gn/T/ipykernel_30612/1602383827.py
Function: run_analysis_v2 at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def run_analysis_v2():
     2                                               # Load data
     3         2   33791000.0    2e+07      1.4      ddf = dd.read_parquet(
     4         1          0.0      0.0      0.0          'gdp_panel_parquet/',
     5         1          0.0      0.0      0.0          columns=['code','year','value'],
     6         1          0.0      0.0      0.0          engine='pyarrow'
     7                                               )
     8                                           
     9         1   32015000.0    3e+07      1.3      regions = dd.read_parquet('country_region_parquet/')
    10                                           
    11                                               # 

In [23]:
import sys
!"{sys.executable}" -m memory_profiler app_v2.py

Filename: app_v2.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    25  156.203 MiB  156.203 MiB           1   @profile
    26                                         def run_analysis_v2():
    27                                             # Load data
    28  162.391 MiB    6.188 MiB           2       ddf = dd.read_parquet(
    29  156.203 MiB    0.000 MiB           1           'gdp_panel_parquet/',
    30  156.203 MiB    0.000 MiB           1           columns=['code','year','value'],
    31  156.203 MiB    0.000 MiB           1           engine='pyarrow'
    32                                             )
    33                                             
    34  163.406 MiB    1.016 MiB           1       regions = dd.read_parquet('country_region_parquet/')
    35                                         
    36                                             # Compute GDP Growth and Volatility
    37  163.406 MiB    0.000 MiB           2       meta = pd.DataFrame({


## 5. CPU Bound -> Parallel Computing

In [24]:
import os
import glob
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
from dask.distributed import Client

### 5.1 Prepare Parquet Data

In [25]:
# Read the raw CSV in parallel
ddf = dd.read_csv(
    'gdp_panel.csv',
    usecols=['code','year','value'],
    parse_dates=['year'],
    dtype={'code':'category'}
)
ddf['code'] = ddf['code'].astype(str)

# Write out as partitioned Parquet
#    Here we partition by 'country' so that each country’s data lives in its own sub-folder.
ddf.to_parquet(
    'gdp_panel_parquet/',
    engine='pyarrow',
    write_index=False,         # drop the old CSV index
    partition_on=['code'],  # creates subfolders country=AAA, country=BBB, …
    compression='snappy'       # fast compression
)

### 5.2 Functions to Detect and Process Single Files

In [26]:
def discover_country_tasks(base_dir):
    tasks = []
    for entry in os.listdir(base_dir):
        full = os.path.join(base_dir, entry)
        if not (entry.startswith("code=") and os.path.isdir(full)):
            continue

        code = entry.split("=", 1)[1]
        pattern = os.path.join(full, "*.parquet")
        files = sorted(glob.glob(pattern))
        if files:
            tasks.append((code, files))
    return tasks

In [27]:
def process_country(code, parquet_paths):
    # parquet_paths: list of all .parquet files for a single country-partition
    # read them all (value only), concatenate into one 1-D numpy array
    arrays = []
    for path in parquet_paths:
        tbl = pq.read_table(path, columns=['value'])
        arrays.append(tbl.column('value').to_numpy())
    series = np.concatenate(arrays)
    # now do the window‐period rolling growth & vol
    window = 365
    growth = (series[window:] - series[:-window]) / series[:-window]
    windows = np.lib.stride_tricks.sliding_window_view(series, window)
    vol    = np.std(windows, axis=1)
    return code, (growth.mean(), vol.mean())

### 5.3 Process Files in Parallel

In [28]:
# Discover tasks
tasks = discover_country_tasks('gdp_panel_parquet/')
tasks[:5]

[('BRN', ['gdp_panel_parquet/code=BRN/part.0.parquet']),
 ('AGO', ['gdp_panel_parquet/code=AGO/part.0.parquet']),
 ('GNB', ['gdp_panel_parquet/code=GNB/part.0.parquet']),
 ('OMN', ['gdp_panel_parquet/code=OMN/part.0.parquet']),
 ('MNE', ['gdp_panel_parquet/code=MNE/part.0.parquet'])]

In [29]:
# Launch Dask
client = Client() 
print(client)

<Client: 'tcp://127.0.0.1:50715' processes=5 threads=10, memory=16.00 GiB>


In [30]:
# Submit tasks
futures = client.map(lambda args: process_country(*args), tasks)
results = client.gather(futures)

In [31]:
# Close Dask
client.close() 

In [32]:
# Build DataFrame & merge
df = pd.DataFrame(
    [(code, g, v) for code, (g, v) in results],
    columns=['code','avg_growth','avg_vol']
)
regions = pd.read_csv('country_region.csv', dtype={"code": str})
df = df.merge(regions, on="code", how="left")

### 5.4 Aggregate Region

In [33]:
# Aggregate by region
result = df.groupby("region")[["avg_growth", "avg_vol"]].mean()
result

Unnamed: 0_level_0,avg_growth,avg_vol
region,Unnamed: 1_level_1,Unnamed: 2_level_1
Africa,0.183315,190.690864
Asia,0.23896,11842.794822
Europe,0.103997,199.848221
North America,0.129856,41.62078
Oceania,0.09984,3.470298
South America,0.127411,1559.345315


### 5.5 Time and Memory Profiling

In [34]:
def run_analysis_v3():
    # Discover tasks
    tasks = discover_country_tasks('gdp_panel_parquet/')

    # Launch Dask
    client = Client() 

    # Submit Tasks
    futures = client.map(lambda args: process_country(*args), tasks)
    results = client.gather(futures)

    # Close Dask
    client.close() 

    # Build DataFrame & merge
    df = pd.DataFrame(
        [(code, g, v) for code, (g, v) in results],
        columns=['code','avg_growth','avg_vol']
    )
    regions = pd.read_csv('country_region.csv', dtype={"code": str})
    df = df.merge(regions, on="code", how="left")

    # Aggregate by region
    result = df.groupby("region")[["avg_growth", "avg_vol"]].mean()

    return result

%lprun -f run_analysis_v3 run_analysis_v3()

Timer unit: 1e-09 s

Total time: 0.938876 s
File: /var/folders/j4/72zz472s4sdd1r1ssyml8k5w0000gn/T/ipykernel_30612/188904422.py
Function: run_analysis_v3 at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def run_analysis_v3():
     2                                               # Discover tasks
     3         1   25342000.0    3e+07      2.7      tasks = discover_country_tasks('gdp_panel_parquet/')
     4                                           
     5                                               # Launch Dask
     6         1  394891000.0    4e+08     42.1      client = Client() 
     7                                           
     8                                               # Submit Tasks
     9         1    7049000.0    7e+06      0.8      futures = client.map(lambda args: process_country(*args), tasks)
    10         1  389620000.0    4e+08     41.5      results = client.gather(futures)
    11       

In [35]:
import sys
!"{sys.executable}" -m memory_profiler app_v3.py

Filename: app_v3.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    42  139.078 MiB  139.078 MiB           1   @profile
    43                                         def run_analysis_v3():
    44                                             # Discover tasks
    45  139.141 MiB    0.062 MiB           1       tasks = discover_country_tasks('gdp_panel_parquet/')
    46                                         
    47                                             # Launch Dask
    48  143.000 MiB    3.859 MiB           1       client = Client() 
    49                                         
    50                                             # Submit Tasks
    51  143.156 MiB    0.156 MiB           1       futures = client.map(lambda args: process_country(*args), tasks)
    52  145.547 MiB    2.391 MiB           1       results = client.gather(futures)
    53                                         
    54                                             # Close Dask
    55  14