In [None]:
import os
import numpy as np
import pandas as pd
import geopandas as gpd
import tarfile
import re

from functools import partial
# from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor
from glob import glob, iglob
from pathlib import Path
from tqdm import tqdm
from unittest.mock import MagicMock

import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

from projections.shapefiles import iter_records
from projections import raster, utils


pd.set_option('max_columns', None)

In [2]:
def load_tifs(tars):
    for tar in tars:
        names = []

        with tarfile.open(tar, 'r') as tf:
            # Find gz in tar
            names = [x for x in tf.getnames() if x.endswith('.gz') and 'stable_lights' in x]
            if not names:
                print(f'No stable lights in {tar}')
                continue

            for name in names:
                tf.extract(name, read_path)

        for name in names:
            # Extract tif
            name_path = read_path / name
            os.system(f'gunzip {name_path}')

            tifs = glob(str(read_path / '*.tif'))

            for tif in tifs:
                # Yield path to tif file
                yield tif

            # Clean up
            for file in tifs + [name_path]:
                try:
                    os.remove(file)
                except FileNotFoundError:
                    continue
                    
                    
def save_location_mapping(row_and_path):
    row, path = row_and_path
    shape = row['geometry']
    
    subset = raster.find_subset_with_intersection_area(IMAGE, shape)

    if subset is None:
        with open(path, 'w') as f:
            f.write('')
        return

    subset['adm0'] = row['GID_0']
    subset['adm1'] = row['GID_1']
    subset['adm2'] = row['GID_2']
    
    subset.to_csv(path, index=False)

In [3]:
read_path = Path('../Data/Nightlights/')
output_path = Path('../Output/Nighlights/')
partial_path = output_path / 'partial_locs'
countries_path = output_path / 'countries'
ethnic_path = output_path / 'ethnic'
ethnic_countries_path = output_path / 'ethnic_countries'

tars = sorted(glob(str(read_path / '*.tar')))

In [4]:
geo_df = gpd.read_file('../Shapefiles/preprocessed/all_countries.shp')
geo_df['GID_1'].fillna(geo_df['GID_0'], inplace=True)
geo_df['GID_2'].fillna(geo_df['GID_1'], inplace=True)

# Map raster to polygons

In [5]:
def get_filename_from_row(row, path):
    name = f'{row["GID_2"]}.csv'
    return path / name


def get_filename_with_portion_from_row(row, path):
    portion = f"_p{int(row['portion']):03d}" if row['portion'] else ''
    name = f'{row["GID_2"]}{portion}.csv'
    return path / name


def record_exists(row, path):
    return (
        get_filename_from_row(row, path).exists() or 
        get_filename_with_portion_from_row(row, path).exists()
    )


def yield_missing_records(records, save_path):
    for _, row in records.iterrows():
        if record_exists(row, save_path):
            continue
            
        yield row, get_filename_with_portion_from_row(row, save_path)

In [None]:
# Only need to map the locations once
# Still loop for the clean up
n_processes = 30
for tif in load_tifs(tars[:1]):
    IMAGE = utils.read_tif(tif)
    processing_function = partial(save_location_mapping, crs=geo_df.crs)
    country_iterator = partial(yield_missing_records, save_path=partial_path)
    
    if n_processes == 1:
        for blob in tqdm(country_iterator(geo_df)):
            processing_function(blob)
    else:
        with ThreadPoolExecutor(n_processes) as tpe:
            for _ in tqdm(tpe.map(processing_function, country_iterator(geo_df)), 
                          total=geo_df.shape[0]):
                pass

In [None]:
gid_with_portions = geo_df.loc[geo_df['portion'].notnull(), 'GID_2'].unique()
for gid in tqdm(gid_with_portions):
    files = list(partial_path.glob(f'{gid}_p*.csv'))
    if not files:
        continue
        
    portions = [utils.read_csv(file) for file in files]
    portions = [x for x in portions if not x.empty]
    
    if not portions:
        continue
    elif len(portions) == 1:
        country = portions[0]
    else:
        county = portions[0].append(portions[1:], ignore_index=True)
        
    county = county.groupby(['lat', 'lon']).sum().reset_index()
    for i in range(3):
        county[f'adm{i}'] = portions[0].iloc[0][f'adm{i}']
    county.to_csv(partial_path / f'{gid}.csv')

# Aggregate

In [6]:
def is_portion(name):
    match = re.match('.*_p\d\d\d.csv$', name)
    return match is not None


def get_files_by_country(path):
    files_by_country = {}
    for file in path.glob('*.csv'):
        if is_portion(file.name):
            continue
            
        file = Path(file)
        country = file.name[:3]
        if country in files_by_country:
            files_by_country[country].append(file)
        else:
            files_by_country[country] = [file]
            
    return files_by_country


def get_year_from_tif(tif):
    return int(Path(tif).name[3:7])


def aggregate_and_save(results, tif, save_path, name, groupby):
    year = Path(tif).name[3:7]
    
    if '{year}' in name:
        name = name.format(year=year)
        
    save_path = save_path / name

    if save_path.exists():
        return False

    # Get raster subset
    mask = results['lon'] > -999
    
    if mask.sum() > 0:
        increment = raster.get_increment_from_tif(IMAGE)
        
        shape = MagicMock()
        shape.bounds = (
            results.loc[mask, 'lon'].min() - increment,
            results.loc[mask, 'lat'].min() - increment,
            results['lon'].max() + increment,
            results['lat'].max() + increment
        )

        subset = raster.get_df_by_maximum_bounds(IMAGE, shape, geo=False)
        subset['lon'] = np.round(subset['lon'], 6)
        subset['lat'] = np.round(subset['lat'], 6)

        # Combine with results
        tresults = results.merge(subset, on=['lon', 'lat'], how='left')
        tresults['value'].fillna(0, inplace=True)  # This fills the -999
    else:
        tresults = results.copy()
        tresults['value'] = 0

    # Aggregate
    pivot = tresults.groupby(groupby)[['intersection_area', 'value']].sum()
    pivot['year'] = int(year)

    # Save
    pivot.reset_index().to_csv(save_path, index=False)
    
    return True


def load_nl_files(files):
    results = []
    for file in files:
        try:
            df = pd.read_csv(file)
        except pd.errors.EmptyDataError:
            continue
            
        try:
            results.append(df[pd.to_numeric(df['intersection_area']) > 0])    
        except Exception as e:
            print(file)
            raise e
        
    results = results[0].append(results[1:], ignore_index=True)
    results['lon'] = np.round(results['lon'], 6)
    results['lat'] = np.round(results['lat'], 6)
    
    return results


def iter_files_by_country(files_by_country, tif, path):
    for country, files in files_by_country.items():
        year = get_year_from_tif(tif)
        name = f'{country}_{year}.csv'
        if (path / name).exists():
            continue
            
        yield country, files, name, tif

In [7]:
def load_aggregate_and_save(blob):
    country, files, name, tif_name = blob    
    
    # Load polygons
    results = load_nl_files(files)
    results['adm1'].fillna('', inplace=True)
    results['adm2'].fillna('', inplace=True)
    results.drop_duplicates(inplace=True)

    aggregate_and_save(results, tif, countries_path, name, groupby=['adm0', 'adm1', 'adm2'])

    
n_processes = 10
files_by_country = get_files_by_country(partial_path)

with ThreadPoolExecutor(n_processes) as tpe:
    for tif in load_tifs(tars):
        IMAGE = utils.read_tif(tif)
        for _ in tqdm(tpe.map(load_aggregate_and_save, 
                              iter_files_by_country(files_by_country, tif, countries_path)), 
                      total=len(files_by_country), desc=tif):
            pass

../Data/Nightlights/F152000.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 256/256 [26:19<00:00,  6.17s/it] 
../Data/Nightlights/F101992.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 256/256 [25:55<00:00,  6.08s/it] 
../Data/Nightlights/F101993.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 256/256 [24:43<00:00,  5.79s/it] 
../Data/Nightlights/F121994.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 256/256 [24:57<00:00,  5.85s/it] 
../Data/Nightlights/F121995.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 256/256 [25:28<00:00,  5.97s/it] 
../Data/Nightlights/F121996.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 256/256 [25:24<00:00,  5.96s/it] 
../Data/Nightlights/F141997.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 256/256 [25:04<00:00,  5.88s/it] 
../Data/Nightlights/F141998.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 256/256 [24:36<00:00,  5.77s/it] 
../Data/Nightlights/F141999.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 256/256 

# Combine aggregations

In [8]:
results = []
for file in tqdm(countries_path.glob('*.csv')):
    df = utils.read_csv(file)
    if not df.empty:
        results.append(df)
        
results = results[0].append(results[1:], ignore_index=True)
print(results.shape)
results.to_csv(output_path / 'nightlights.csv', index=False)

5632it [00:16, 349.05it/s]


(1027950, 6)


# Ethnic
# Map raster to Ethnic locs

In [16]:
def iter_records(adm, save_path):
    for _, row in tqdm(adm.iterrows(), total=adm.shape[0]):
        name = f'{row["GID_0"]}_{row["NAME"]}.csv'
        name_path = Path(save_path) / name
            
        yield row, name_path

def iter_and_skip_records(adm, save_path):
    for row, name_path in iter_records(adm, save_path):
        if name_path.exists():
            continue
            
        yield row, name_path    
        

def save_location_mapping(blob):
    row, name_path = blob
    shape = row['geometry']
    
    subset = raster.find_subset_with_intersection_area(IMAGE, shape)
    if subset is None:
        print('No intersection found for', name_path)
        with open(name_path, 'w') as f:
            f.write('')
        return

    for col in ['NAME', 'TRIBE_CODE', 'GID_0']:
        subset[col] = row[col]
    
    subset.to_csv(name_path, index=False)
    
# Load shapes
adm = gpd.read_file('../Shapefiles/ethnic_preprocessed/tribe_adm0_s.shp')

In [7]:
# Map locations
n_processes = 15
for tif in load_tifs(tars[:1]):
    IMAGE = utils.read_tif(tif)
    i = partial(iter_and_skip_records, save_path=ethnic_path)
    
    if n_processes == 1:
        for blob in i(adm):
            save_location_mapping(blob)
    else:
        with ThreadPoolExecutor(n_processes) as tpe:
            for _ in tqdm(tpe.map(save_location_mapping, i(adm)), total=adm.shape[0]):
                pass

100%|██████████| 5053/5053 [00:01<00:00, 3224.31it/s]
  0%|          | 0/5053 [00:00<?, ?it/s]


In [None]:
gid_with_portions = adm.loc[adm['portion'].notnull(), ['GID_0', 'NAME']].agg('_'.join, axis=1).unique()
for gid in tqdm(gid_with_portions):
    files = list(ethnic_path.glob(f'{gid}_p*.csv'))
    if not files:
        continue
        
    portions = [utils.read_csv(file) for file in files]
    portions = [x for x in portions if not x.empty]
    
    if not portions:
        continue
    elif len(portions) == 1:
        country = portions[0]
    else:
        country = portions[0].append(portions[1:], ignore_index=True)
        
    country = country.groupby(['lat', 'lon']).sum().reset_index()
    l
    for i in range(3):
        country[f'adm{i}'] = portions[0].iloc[0][f'adm{i}']
    country.to_csv(partial_path / f'{gid}.csv')

# Aggregate ethnic

In [17]:
def ethnic_load_aggregate_and_save(blob):
    groupby = ['NAME', 'TRIBE_CODE', 'GID_0']
    country, files, name, tif_name = blob    
    
    # Load polygons
    results = load_nl_files(files)
    for col in groupby:
        results[col].fillna('', inplace=True)
    results.drop_duplicates(inplace=True)

    aggregate_and_save(results, tif_name, ethnic_countries_path, name, groupby=groupby)


files_by_country = get_files_by_country(ethnic_path)

n_processes = 10
with ThreadPoolExecutor(n_processes) as tpe:
    for tif in load_tifs(tars):
        IMAGE = utils.read_tif(tif)
        for _ in tqdm(tpe.map(ethnic_load_aggregate_and_save, 
                              iter_files_by_country(files_by_country, tif, ethnic_countries_path)), 
                      total=len(files_by_country), desc=tif):
            pass

../Data/Nightlights/F101992.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 57/57 [01:44<00:00,  1.83s/it]
../Data/Nightlights/F101993.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 57/57 [01:44<00:00,  1.84s/it]
../Data/Nightlights/F121994.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 57/57 [01:44<00:00,  1.84s/it]
../Data/Nightlights/F121995.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 57/57 [01:44<00:00,  1.83s/it]
../Data/Nightlights/F121996.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 57/57 [01:44<00:00,  1.83s/it]
../Data/Nightlights/F141997.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 57/57 [01:44<00:00,  1.83s/it]
../Data/Nightlights/F141998.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 57/57 [01:44<00:00,  1.83s/it]
../Data/Nightlights/F141999.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 57/57 [01:44<00:00,  1.83s/it]
../Data/Nightlights/F152000.v4b_web.stable_lights.avg_vis.tif: 100%|██████████| 57/57 [01:43<00:00,  1.82s/it]
.

In [18]:
results = []
for file in tqdm(ethnic_countries_path.glob('*.csv')):
    try:
        results.append(pd.read_csv(file))
    except pd.errors.EmptyDataError:
        continue
results = results[0].append(results[1:], ignore_index=True)
print(results.shape)
results.drop_duplicates(['NAME', 'GID_0', 'year']).to_csv(output_path / 'nightlights_ethnic.csv', index=False)

1254it [00:02, 469.53it/s]


(31064, 6)
