In [2]:
#@markdown <b>Run me to import underscore module</b><br/>   {display-mode: "form"}
#@markdown <small>Method signatures:</small><br/> 
#@markdown <small><small>&nbsp; &nbsp; &nbsp; _(source_path, target_path)</small></small><br/>
#@markdown <small><small>&nbsp; &nbsp; &nbsp; _set_gh_token(token)</small></small><br/>
#@markdown <small><small>&nbsp; &nbsp; &nbsp; _from_gh(user_name, repo_name, release_name) &nbsp; &nbsp; &nbsp; <b>Returns:</B> dictionary of arrays { 'array_name' : np.ndarray }</small></small><br/>
#@markdown <small><small>&nbsp; &nbsp; &nbsp; _to_gh(user_name, repo_name, release_name, split_size=600, **arr_kwargs)</small></small><br/>

!pip install -q githubrelease
import numpy as np
import os, glob, re, time
import github_release

compressed_dirs = set()


def _compress(source_path, target_path, target_dir=None):
    if target_dir:
        !mkdir -p {target_dir}
    if target_path.endswith('.tar.gz'):
        !tar -czf {target_path} -C {source_path} .
    elif target_path.endswith('.tar'):
        !tar -cf {target_path} -C {source_path} .
    elif target_path.endswith('.zip'):
        !(cd {source_path} && zip -q -r {target_path} .)


def _extract(source_path, target_path):
    !mkdir -p {target_path}
    if source_path.endswith('.tar.gz'):
        !tar -xzf {source_path} -C {target_path}
    elif source_path.endswith('.tar'):
        !tar -xf {source_path} -C {target_path}
    elif source_path.endswith('.zip'):
        !unzip -qq {source_path} -d {target_path}


def _(source_path, target_path):
    """
    Use cases:
        Movement:
            - GCS -> GCS
            - GCS -> LOCAL
            - LOCAL -> GCS
            - LOCAL -> LOCAL
            
        Compression (e.g. from dir to .tar.gz):
            - GCS -> GCS
            - GCS -> LOCAL
            - LOCAL -> GCS
            - LOCAL -> LOCAL
            
        Extraction (e.g. from .zip to dir):
            - GCS -> GCS
            - GCS -> LOCAL
            - LOCAL -> GCS
            - LOCAL -> LOCAL
            
        Extraction & compression (e.g. from .zip to .tar.gz):
            - GCS -> GCS
            - GCS -> LOCAL
            - LOCAL -> GCS
            - LOCAL -> LOCAL
    """
    COMPRESSION_FORMATS = ('zip', 'tar', 'tar.gz')
    TEMP_DIR = "/tmp_"
    LOG_TEMPLATE = "{}    from    {}    to    {}"

    # Source
    source_dir, _, source_name = source_path.rpartition('/')
    source_isgcs = source_path.startswith("gs://")
    source_islocal = not source_isgcs
    source_isprefix, source_isfile, source_ext = source_name.partition('.')
    source_isdir = not source_isfile
    source_iscompression = source_ext in COMPRESSION_FORMATS

    # Target
    target_dir, _, target_name = target_path.rpartition('/')
    target_isgcs = target_path.startswith("gs://")
    target_islocal = not target_isgcs
    target_prefix, target_isfile, target_ext = target_name.partition('.')
    target_isdir = not target_isfile
    target_iscompression = target_ext in COMPRESSION_FORMATS

    # Flags
    MOVE_ONLY = source_ext == target_ext
    GCS_ONLY = source_isgcs and target_isgcs
    RENAME = source_isprefix != target_prefix
    COMPRESSION = source_isdir and target_iscompression
    EXTRACTION = source_iscompression and target_isdir
    EXTRACTION_COMPRESSION = source_iscompression and target_iscompression and source_ext != target_ext

    # Authenticate if writing to GCS
    if target_isgcs:
        from google.colab import auth
        auth.authenticate_user()

    # Assert that subdirectories exist if target is local
    if target_islocal:
        !mkdir -p {target_dir}

    # Movement commands
    if MOVE_ONLY:
        # GCS -> GCS
        if source_isgcs and target_isgcs:
            print(LOG_TEMPLATE.format("MOVING (1/1)", source_path, target_path))
            !gsutil -m -q mv {source_path} {target_path}
        
        # LOCAL -> LOCAL
        elif source_islocal and target_islocal:
            print(LOG_TEMPLATE.format("MOVING (1/1)", source_path, target_path))
            !mv {source_path} {target_path}
        
        # GCS -> LOCAL
        elif source_isgcs and target_islocal:
            if source_isdir:
                print(LOG_TEMPLATE.format("DOWNLOADING DIR (1/1)", source_path, target_dir))
                !gsutil -m -q cp -r {source_path} {target_dir}
                if RENAME:
                    print(LOG_TEMPLATE.format("\tRENAMING DIR", source_isprefix, target_prefix))
                    !mv {target_dir}/{source_isprefix} {target_dir}/{target_prefix}
            else:
                print(LOG_TEMPLATE.format("DOWNLOADING FILE (1/1)", source_path, target_path))
                !gsutil -m -q cp {source_path} {target_path}
        
        # LOCAL -> GCS
        if source_islocal and target_isgcs:
            if source_isdir:
                print(LOG_TEMPLATE.format("UPLOADING DIR (1/1)", source_path, target_path))
                !gsutil -m -q cp -r {source_path} {target_path}
            else:
                print(LOG_TEMPLATE.format("UPLOADING FILE (1/1)", source_path, target_path))
                !gsutil -m -q cp {source_path} {target_path}
        return


    # Create directory for intermediate storage if required
    if source_isgcs or target_isgcs or EXTRACTION_COMPRESSION:
        !mkdir -p {TEMP_DIR}
    

    # For remaining operations, download GCS source to temp and treat as local
    if source_isgcs:
        if source_isdir:
            print(LOG_TEMPLATE.format("\tDOWNLOADING DIR", source_path, TEMP_DIR))
            !gsutil -m -q cp -r {source_path} {TEMP_DIR}
        else:
            print(LOG_TEMPLATE.format("\tDOWNLOADING FILE", source_path, f"{TEMP_DIR}/{source_name}"))
            !gsutil -m -q cp {source_path} {TEMP_DIR}/{source_name}
        source_path = f"{TEMP_DIR}/{source_name}"
        source_dir = TEMP_DIR

    # Compression
    if COMPRESSION:
        if target_islocal:
            print(LOG_TEMPLATE.format("COMPRESSING (1/1)", source_path, target_path))
            _compress(source_path, target_path, target_dir=target_dir)
        else:
            print(LOG_TEMPLATE.format("COMPRESSING (1/2)", source_path, f"{TEMP_DIR}/{target_name}"))
            _compress(source_path, f"{TEMP_DIR}/{target_name}")
            print(LOG_TEMPLATE.format("UPLOADING FILE (2/2)", f"{TEMP_DIR}/{target_name}", target_path))
            !gsutil -m -q cp {TEMP_DIR}/{target_name} {target_path}

    # Extraction
    elif EXTRACTION:
        if target_islocal:
            print(LOG_TEMPLATE.format("EXTRACTING (1/1)", source_path, target_path))
            _extract(source_path, target_path)
        else:
            print(LOG_TEMPLATE.format("EXTRACTING (1/2)", source_path, f"{TEMP_DIR}/{target_name}"))
            _extract(source_path, f"{TEMP_DIR}/{target_name}")
            print(LOG_TEMPLATE.format("UPLOADING DIR (2/2)", f"{TEMP_DIR}/{target_name}", target_path))
            !gsutil -m -q cp -r {TEMP_DIR}/{target_name} {target_path}

    # Extraction & compression
    elif EXTRACTION_COMPRESSION:
        if target_islocal:
            print(LOG_TEMPLATE.format("EXTRACTING (1/2)", source_path, f"{TEMP_DIR}/{target_prefix}"))
            _extract(source_path, f"{TEMP_DIR}/{target_prefix}")
            print(LOG_TEMPLATE.format("COMPRESSING (2/2)", f"{TEMP_DIR}/{target_prefix}", target_path))
            _compress(f"{TEMP_DIR}/{target_prefix}", target_path, target_dir=target_dir)
        else:
            print(LOG_TEMPLATE.format("EXTRACTING (1/3)", source_path, f"{TEMP_DIR}/{target_prefix}"))
            _extract(source_path, f"{TEMP_DIR}/{target_prefix}")
            print(LOG_TEMPLATE.format("COMPRESSING (2/3)", f"{TEMP_DIR}/{target_prefix}", f"{TEMP_DIR}/{target_name}"))
            _compress(f"{TEMP_DIR}/{target_prefix}", f"{TEMP_DIR}/{target_name}")
            print(LOG_TEMPLATE.format("UPLOADING FILE (3/3)", f"{TEMP_DIR}/{target_name}", target_path))
            !gsutil -m -q cp {TEMP_DIR}/{target_name} {target_path}
    
    # Cleanup intermediate storage
    !rm -rf {TEMP_DIR}


def _set_gh_token(token):
    os.environ["GITHUB_TOKEN"] = token


def _export_array(array, release_name, prefix="", splits=3):
    dir_path = f"/tmp_/{release_name}"
    !mkdir -p {dir_path}
    n_digits = len(str(splits - 1))
    subarrays = np.array_split(array, splits)
    for i, subarray in enumerate(subarrays):
        filename = f"{prefix}__{str(i).zfill(n_digits)}.npy"
        np.save(f"{dir_path}/{filename}", subarray)


def _concat_arrays(paths):
    return np.concatenate([np.load(path, allow_pickle=True) for path in sorted(paths)])


def _to_gh(user_name, repo_name, release_name, split_size=600, **arr_kwargs):
    # Assert that GitHub Auth token is set
    if "GITHUB_TOKEN" not in os.environ:
        print("GitHub authentication token is not set.")
        print("Set token using the '_set_gh_token(token_string)' method.")
        print("Minimal required auth scope is 'repo/public_repo' for public repositories.")
        print("URL: https://github.com/settings/tokens/new")
        return

    # Split arrays
    for prefix, array in arr_kwargs.items():
        splits = int((array.nbytes/1_000_000) // split_size) + 1
        _export_array(array, release_name, prefix=prefix, splits=splits)

    # Upload arrays
    github_release.gh_release_create(
        f"{user_name}/{repo_name}", 
        release_name, 
        publish=True, 
        name=release_name, 
        asset_pattern=f"/tmp_/{release_name}/*"
    )
    !rm -rf /tmp_/*


def _from_gh(user_name, repo_name, release_name):
    # Download release to temporary directory
    print("Downloading dataset in parallell ... ", end='\t')
    t0 = time.perf_counter()
    assets = github_release.get_assets(f"{user_name}/{repo_name}", tag_name=release_name)
    download_urls = [asset['browser_download_url'] for asset in assets]
    urls_str = " ".join(download_urls)
    !echo {urls_str} | xargs -n 1 -P 8 wget -q -P /tmp_/{release_name}_dl/
    t1 = time.perf_counter()
    print(f"done! ({t1 - t0:.3f} seconds)")

    # Load data into numpy arrays
    paths = glob.glob(f"/tmp_/{release_name}_dl/*.npy")
    groups = {}
    for path in paths:
        match = re.match(r".*/(.*)__[0-9]*\.npy", path)
        if match:
            prefix = match.group(1)
            groups[prefix] = groups.get(prefix, []) + [path]
    arrays_dict = {name: _concat_arrays(paths) for name, paths in groups.items()}
    !rm -rf /tmp_/*
    return arrays_dict
    

def _log_to_gh(user, repo, tag, log_dir="/tmp/logs"):
    # Create temporary directory for compressed logs
    !mkdir -p /tmp/compressed_logs
    
    # Compress all directories in log dir
    for dirname in os.listdir(log_dir):
        # Skip files
        if "." in dirname or dirname in compressed_dirs:
            continue

        # Compress
        _(f"{log_dir}/{dirname}", f"/tmp/compressed_logs/{dirname}.tar.gz")
        compressed_dirs.add(dirname)

    # Upload compressed logs to GitHub
    github_release.gh_asset_upload(f"{user}/{repo}", tag, f"/tmp/compressed_logs/*.tar.gz")

    # Cleanup compressed logs
    !rm -rf /tmp/compressed_logs/*

### Link to Google Drive: https://drive.google.com/drive/folders/1Pnuo1tB1XtiDjMa7eXmuUctL2XX9emU7
### Link to Tableau Online: https://dub01.online.tableau.com/#/site/telenordashboard/projects/123532
### Link to Google Cloud Storage: https://console.cloud.google.com/storage/browser?project=telenor-data-science
### Link to Google Cloud BigQuery: https://console.cloud.google.com/bigquery?project=telenor-data-science&p=telenor-data-science&d=telenor_dataset&page=dataset

In [3]:
import glob
import os
import pandas as pd

In [5]:
# Project info
PROJECT_ID = 'telenor-data-science'
DATASET = 'location_dataset.tar.gz'

In [6]:
# Download dataset from GCS
!gsutil -q cp gs://{PROJECT_ID}/datasets/{DATASET} /tmp
!mkdir -p data
!tar -zxf /tmp/{DATASET} -C /content/data

In [69]:
paths = glob.glob("/content/data/**/*.parquet", recursive=True)
dataframes = {}
summary_dfs = []

prefixes = ["MET", "KV", "PRA", "NEA"]

# Load all parquet files as dataframes
for path in paths:
    dir_name, sep, file_name = path.rpartition(os.sep)
    location = file_name.split('.')[0]
    location_df = pd.read_parquet(path)
    #print(f"Location: {location}")
    
    location_data = []
    col_groups = {prefix: [col for col in df.columns if col.startswith(prefix)] for prefix in prefixes}
    for prefix, col_group in col_groups.items():
        location_data.append(df[col_group].isnull().sum().sum()/df[col_group].size)
    
    summary_dfs.append([location] + location_data)

    #print(col_groups)
    
    
    #print("\n"*3)
    #df.isna().describe()
"""
    df_tuple = (df, os.path.splitext(path)[0][-4:])
    df_old = dataframes.get(dir_name, [])
    df_old.append(df_tuple)
    dataframes[dir_name] = df_old

## Create mapping {location -> [[df1, df2, df3, ...], [year1, year2, year3, ...]]}
##for k, v in dataframes.items():
#    dataframes[k] = list(zip(*v))

#df2 = pd.concat([df.drop(['location', 'year'], axis=1).isnull().sum(), df.describe().transpose()], axis=1)
#df2.hist(column=0, bins=20)
""";
pd.DataFrame(summary_dfs)



Unnamed: 0,0,1,2,3,4
0,bryn_skole,0.011385,,0.138893,0.507519
1,klosterhaugen,0.011385,,0.138893,0.507519
2,kransen,0.011385,,0.138893,0.507519
3,vangsveien__hamar,0.011385,,0.138893,0.507519
4,rådal,0.011385,,0.138893,0.507519
5,kannik,0.011385,,0.138893,0.507519
6,alnabru,0.011385,,0.138893,0.507519
7,moheia_vest,0.011385,,0.138893,0.507519
8,elgeseter,0.011385,,0.138893,0.507519
9,e6_tiller,0.011385,,0.138893,0.507519


In [None]:
# Print years and number of features
template = "{:20}\t{:>10}\t{}"
print(template.format("Location", "# columns", "Years"))
for location, data in dataframes.items():
    dfs, years = data
    years = ', '.join(sorted(years))

     # Count number of unique columns
    location_columns = set()
    for df in dfs:
        location_columns.update(df.columns)
    num_of_columns = len(location_columns)
    print(template.format(location, num_of_columns, years))


Location            	 # columns	Years
E6-Tiller           	        45	2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020
Alvim               	        41	2015, 2016, 2017, 2018, 2019, 2020
Sofienbergparken    	        58	2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020
Manglerud           	       104	2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020
Leiret              	        54	2016, 2017, 2018, 2019, 2020
Minnesundvegen, Gjøvik	        42	2014, 2015, 2016, 2017, 2018, 2019, 2020
Vangsveien, Hamar   	        43	2014, 2015, 2016, 2017, 2018, 2019, 2020
Rv 4, Aker sykehus  	       106	2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020
Ringsakervegen      	        30	2018, 2019, 2020
Bakke kirke         	        50	2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020
Mo

In [None]:
'''
def rename_col(name):
    """Conversion to make column names compatible with BigQuery."""
    problematic_chars = ' .,:;{}()='
    for c in problematic_chars:
        name = name.replace(c, '_')
    return name
'''

import re
def rename_col(name):
    """Conversion to make column names compatible with BigQuery."""
    name = re.sub("(PRA\.(.+)\.(.+)\.(.+))", r'PRA_\3__\4', name)
    name = re.sub("(MET\.[a-zA-Z0-9]*:0\.)", r'MET_', name)
    name = re.sub(r"(NEA\..*\.(NOx|PM2_5|PM10|NO2|PM1|NO))", r'NEA_\2', name)
    name = re.sub(r"(Kystverket\.[a-zA-Z0-9_-]*\.(stationary|moving))", r'KV_\2', name)
    return name

def rename_file(name):
    name = name.lower()
    for c in " ,.-:;{}[]()=":
        name = name.replace(c, '_')
    return name

In [None]:
base_dfs = []

for location, lists in dataframes.items():
    dfs, years = lists
    dfs = list(dfs)
    # Rename all columns (Thanks telenor :))

    for i, df in enumerate(dfs):
        translation = {col: rename_col(col) for col in df.columns}
        dfs[i] = df.rename(columns=translation)

    # Create new root dataframe for directory
    base_df = pd.DataFrame()

    # Add location and years data to dataframe
    base_df['year'] = None
    
    # Identify all unique columns
    location_columns = set()
    for df in dfs:
        location_columns.update(df.columns)
    
    # Generate colummns in base dataframe
    for column_name in location_columns:
        base_df[column_name] = None

    # Add all rows of data to base dataframe
    for df, year in zip(dfs, years):
        df['year'] = year
        base_df = base_df.append(df)
    
    

    # Add location to all rows
    base_df.insert(1, 'location', location)
    base_df = base_df.reindex(sorted(base_df.columns), axis=1)
    base_dfs.append(base_df)

In [None]:
# Total number of rows in all datasets
import numpy as np
print(np.array([df.shape for df in base_dfs]).sum(axis=0))

[3700794    2557]


In [None]:
summary_dfs = []

# Export .parquet files to local storage
for location, base_df in zip(dataframes.keys(), base_dfs):
    prefixes = ["MET", "KV", "PRA", "NEA"]
    
    location_data = []
    col_groups = {prefix: [col for col in df.columns if col.startswith(prefix)] for prefix in prefixes}
    for prefix, col_group in col_groups.items():
        location_data.append(df[col_group].isnull().sum().sum()/df[col_group].size)
    
    summary_dfs.append([location] + location_data)

pd.DataFrame(summary_dfs)

In [None]:
# Upload dataset to Google Cloud Storage
from google.colab import auth
auth.authenticate_user()

FROM = "/tmp/location_dataset"
TO = "gs://telenor-data-science/datasets/location_dataset"
IS_DIR = True

if IS_DIR:
    !gsutil -m cp -r {FROM} {TO}
else:
    !gsutil cp {FROM} {TO}

# Compress and upload compresed version to GCS
!tar -czvf /tmp/location_dataset.tar.gz /tmp/location_dataset
!gsutil cp /tmp/location_dataset.tar.gz gs://telenor-data-science/datasets/location_dataset.tar.gz

In [None]:
# DOESNT WORK - Type conflict on parquet inferred schema vs. df index :( 
#root_df = pd.DataFrame()
root_df = base_dfs[0].copy().iloc[0:0]
#root_df = root_df.set_index(base_dfs[0].iloc[0:0].index)
# Identify all unique columns
root_columns = set()
for df in base_dfs:
    root_columns.update(df.columns)

# Generate colummns in root dataframe
for column_name in root_columns:
    root_df[column_name] = ''

# Rearrange columns
root_df = root_df.reindex(sorted(root_df.columns), axis=1)

#Change col types
#root_df.append(base_dfs[0])
#root_df = root_df.iloc[0:0]
root_df = root_df.astype(float)
root_df = root_df.astype({'year': 'str', 'location': 'str'})

root_df.to_parquet(f"/tmp/location_dataset/all_locations.parquet")
!gsutil cp /tmp/location_dataset/all_locations.parquet gs://telenor-data-science/datasets/location_dataset/all_locations.parquet
# Create table
!bq load --source_format=PARQUET --ignore_unknown_values --autodetect telenor-data-science:location_dataset.all_locations gs://telenor-data-science/datasets/location_dataset/*.parquet 

Copying file:///tmp/location_dataset/all_locations.parquet [Content-Type=application/octet-stream]...
/ [0 files][    0.0 B/ 38.7 KiB]                                                / [1 files][ 38.7 KiB/ 38.7 KiB]                                                
Operation completed over 1 objects/38.7 KiB.                                     
Waiting on bqjob_r1b00740624a4021c_0000017533a7e616_1 ... (6s) Current status: DONE   
BigQuery error in load operation: Error processing job 'telenor-data-
science:bqjob_r1b00740624a4021c_0000017533a7e616_1': Error while reading data,
error message: incompatible types for field 'year': INT32 in Parquet vs. string
in schema


In [None]:
root_df.dtypes

KV_moving_0m_to_100m          float64
KV_moving_10000m_to_30000m    float64
KV_moving_1000m_to_3000m      float64
KV_moving_100m_to_300m        float64
KV_moving_3000m_to_10000m     float64
                               ...   
PRA_9__from5_6To7_6           float64
PRA_9__from7_6To12_5          float64
PRA_9__upTo5_6                float64
location                       object
year                           object
Length: 141, dtype: object