# Goals
- [x] Get ebird data (US only) from Alex
- [ ] ebird.predict_proba(loc, date) -> probs
    - [ ] Figure out how to aggregate to make data fit comfortably in mem
    - [ ] Make an out-of-core ddf pass over the cols-for-time-space-priors.tsv file to produce the aggregates
    - [ ] Use the aggregates in memory to power the priors model
- [ ] Ensemble audio probs + ebird probs (expose tuning param for weighted combination)
- [ ] See how it improves model validation (e.g. many species should be way closer to few species)

In [None]:
import geohash
from notebooks import *

In [None]:
# Register a global dask progress bar
dask_progress().register()

In [None]:
# TODO -> potoo.dask
def dd_read_parquet_sample(
    path,
    sample: Union[float, List[float]] = None,
    sample_divisions=None,
    sample_npartitions=None,
    sample_repartition_force=False,
    sample_to_parquet=dict(compression='gzip'),
    sample_get=None,  # Default: dask_get_for_scheduler('threads')
    random_state=0,
    **kwargs,
):
    """
    dd.read_parquet with sampling, to make it easy to downsample large files for faster dev iteration
    - Same as dd.read_parquet(path, **kwargs) if sample isn't given
    """
    if sample:
        if isinstance(sample, float):
            sample = [sample]
        in_path = _sample_path(path, *sample[:-1])
        out_path = _sample_path(path, *sample)
        if not (Path(out_path) / '_metadata').exists():
            log.info('Caching sample: %s <- %s' % (
                Path(out_path).relative_to(os.getcwd()),
                Path(in_path).relative_to(os.getcwd())),
            )
            # Read and sample
            ddf = (
                dd.read_parquet(in_path, **kwargs)
                .sample(frac=sample[-1], replace=False, random_state=random_state)
            )
            # Repartition, if requested
            if sample_divisions or sample_npartitions:
                ddf = ddf.repartition(
                    divisions=sample_divisions,
                    npartitions=sample_npartitions,
                    force=sample_repartition_force,
                )
            # Write cached sample
            #   - Use 'threads' if repartitioning, else 'processes', to avoid ipc bottlenecks from the shuffle
            sample_get = sample_get or dask_get_for_scheduler('threads' if sample_npartitions else 'processes')
            (ddf
                .to_parquet(out_path, **sample_to_parquet, compute=False)
                .compute(get=sample_get)
            )
        # log.debug('Reading cached sample: %s' % Path(out_path).relative_to(os.getcwd()))
        path = out_path
    return dd.read_parquet(path, **kwargs)

def _sample_path(path: str, *sample: float) -> str:
    return '-'.join([path, *map(str, sample)])

In [None]:
# Downsample raw data (in shell)
#   (
#     set -eux
#     set -o pipefail
#     raw_f='ebd_US_relFeb-2017.txt.gz'
#     # 147m lines -> 14.7m lines
#     in_f="$raw_f"; out_f="$raw_f-0.1"
#     cat "$in_f" \
#       | pv -terb -s"`du -hs "$in_f" | field 0`" -cN in \
#       | gunzip \
#       | sample-lines .1 --seed=0 --keep-header \
#       | pigz \
#       | pv -terb -cN out \
#       > "$out_f"
#     # 14.7m lines -> 1.47m lines
#     in_f="$raw_f-0.1"; out_f="$raw_f-0.01"
#     cat "$in_f" \
#       | pv -terb -s"`du -hs "$in_f" | field 0`" -cN in \
#       | gunzip \
#       | sample-lines .1 --seed=0 --keep-header \
#       | pigz \
#       | pv -terb -cN out \
#       > "$out_f"
#     # 1.47m lines -> 147k lines
#     in_f="$raw_f-0.01"; out_f="$raw_f-0.001"
#     cat "$in_f" \
#       | pv -terb -s"`du -hs "$in_f" | field 0`" -cN in \
#       | gunzip \
#       | sample-lines .1 --seed=0 --keep-header \
#       | pigz \
#       | pv -terb -cN out \
#       > "$out_f"
#   )

In [None]:
# Inspect raw data
# ebd_tsv_path = f'{data_dir}/ebird/ebd_US_relFeb-2017.txt.gz'        #  18gb,  147m lines
# ebd_tsv_path = f'{data_dir}/ebird/ebd_US_relFeb-2017.txt.gz-0.1'    # 778mb, 14.7m lines
# ebd_tsv_path = f'{data_dir}/ebird/ebd_US_relFeb-2017.txt.gz-0.01'   # 100mb, 1.47m lines
ebd_tsv_path = f'{data_dir}/ebird/ebd_US_relFeb-2017.txt.gz-0.001'  #  12mb,  147k lines
proj_cols = [
    'CATEGORY', 'SCIENTIFIC NAME', 'OBSERVATION COUNT', 'COUNTY CODE', 'LATITUDE', 'LONGITUDE', 'OBSERVATION DATE',
    'SAMPLING EVENT IDENTIFIER', 'ALL SPECIES REPORTED',
]
ebd_tsv_df = pd.read_csv(ebd_tsv_path, sep='\t', nrows=10000, compression='gzip')
display(
    humanize.naturalsize(len(joblib_dumps(ebd_tsv_df))),
    humanize.naturalsize(len(joblib_dumps(ebd_tsv_df[proj_cols]))),
    # df_summary(ebd_tsv_df).T,
    df_summary(ebd_tsv_df[proj_cols]).T,
)

'9.9 MB'

'1.5 MB'

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,Unnamed: 6_level_0,Unnamed: 7_level_0,min,25%,50%,75%,max
Unnamed: 0_level_1,dtype,sizeof,len,count,nunique,mean,std,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
CATEGORY,object,559319,10000,10000,7,,,domestic,species,species,species,spuh
SCIENTIFIC NAME,object,666281,10000,10000,608,,,Acanthis flammea,Charadrius semipalmatus,Lophodytes cucullatus,Sayornis phoebe,Zosterops japonicus
OBSERVATION COUNT,object,571101,10000,10000,163,,,1,1,3,X,X
COUNTY CODE,object,579150,10000,9975,1316,,,US-AK-013,US-FL-105,US-MI-077,US-OR-003,US-WY-039
LATITUDE,float64,240000,10000,10000,5304,,,19.1,35.6,39.8,42.3,71.4
LONGITUDE,float64,240000,10000,10000,5303,,,-172,-107,-88.1,-77.7,173
OBSERVATION DATE,object,590000,10000,10000,5557,,,1800-01-01,1989-04-30,1998-03-15,2002-06-15,2004-12-31
SAMPLING EVENT IDENTIFIER,object,575180,10000,10000,9883,,,S1000381,S16318134,S25555972,S4519111,S9998747
ALL SPECIES REPORTED,int64,270900,10000,10000,2,,,0,1,1,1,1


In [None]:
# Convert ebd.txt -> proj.tsv (in shell)
#   - ~10x faster to read than raw data: no gunzip + fewer cols
#   - TODO Re-run for suf=1 (no downsampling) to populate group_identifier (col 40)
#   (
#     set -eux
#     set -o pipefail
#     sufs=(
#       0.001
#       0.01
#       0.1
#       # 1
#     )
#     for suf in "${sufs[@]}"; do
#       in_f="ebd_US_relFeb-2017.txt.gz$suf"
#       out_f="derived/priors/ebd_US_relFeb-2017-0-proj.tsv-$suf"
#       cat "$in_f" \
#         | pv -terb -cN in -s"`du -hs "$in_f" | field 0`" \
#         | gunzip \
#         | cut -f4,6,9,17,25,26,27,32,39,40 \
#         | pv -terb -cN out \
#         > "$out_f"
#     done
#   )

In [None]:
@partial(np.vectorize, otypes=[str])
def np_geohash_encode_safe(lat, lon, **kwargs):
    if pd.isnull(lat):
        return np.nan
    else:
        return geohash.encode(lat, lon, **kwargs)

@partial(np.vectorize, otypes=[np.uint64])
def np_geohash_encode_uint64_safe(lat, lon, **kwargs):
    if pd.isnull(lat):
        return np.nan
    else:
        return geohash.encode_uint64(lat, lon, **kwargs)

In [None]:
# TODO TODO Prototype e2e pipeline in one go, using downsampled data
#   - Output: priors
# sample = 1      #  147m lines
# sample = 0.1    # 14.7m lines
# sample = 0.01   # 1.47m lines
sample = 0.001  #  147k lines

In [None]:
ebd_proj_size = 12 * 1024**3
ebd_proj_npartitions = 128
ebd_proj = (
    # Read proj.tsv (raw lines with subset of cols)
    #   - sample=0.001: 147k rows, 12mb
    dd.read_csv(
        f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-0-proj.tsv-{sample}',  # 12gb / 1.2gb / 120mb / 12mb
        blocksize=int(ebd_proj_size / ebd_proj_npartitions * sample),
        sep='\t',
        dtype={
            # lonlat decimal precision [https://en.wikipedia.org/wiki/Decimal_degrees]
            #   - float32: ~5-6 decimals, ~.1-1m
            #   - float16: ~1-2 decimals, ~1-10km
            'LATITUDE': 'float32',
            'LONGITUDE': 'float32',
            'OBSERVATION COUNT': 'str',  # int | 'X'
            'ALL SPECIES REPORTED': 'float16',  # Smallest dtype for {1, 0, None}
        },
    )
    # Simplify col names
    .rename(columns=lambda c: c.lower().replace(' ', '_'))
    # Drop obs with no checklist id
    #   - sample=0.001: 147k -> 142k rows
    #   - FIXME Some of these nulls are due to misaligned fields (maybe from the tsv split above?)
    #   - Not clear what the other nulls are from; try backing them out to the full field set (including group_identifier)
    .pipe(lambda ddf: ddf[ddf.sampling_event_identifier.notnull()])
    # Replace sampling_event_identifier and group_identifier with checklist_id
    #   - Compress from str to int32 ('S10150441' -> 10150441) for ~46% compression (e.g. 24mb -> 13mb)
    #   - Map sampling_event_identifier to pos ('S10150441' -> 10150441) and group_identifier to net ('G554362' -> -554362)
    #   - Assume no nulls (filtered out above)
    .assign(checklist_id=lambda ddf: (ddf
        .group_identifier.combine_first(ddf.sampling_event_identifier)
        .str.replace('S', '').str.replace('G', '-').astype(np.int32)
    ))
    .drop(axis=1, labels=['sampling_event_identifier', 'group_identifier'])
    # Create geohash4 from (lat, lon)
    .pipe(lambda ddf: ddf.map_partitions(
        meta=ddf._meta.assign(
            geohash4='',
        ),
        func=lambda df: df.assign(
            geohash4=lambda df: np_geohash_encode_safe(df.latitude, df.longitude, precision=4),
        ),
    ))
    # Create categories to compact space usage via dictionary encoding
    #   - NOTE Create category dtypes after munging meta, else weird `pd.Index(None)` errors on downstream reads
    .astype({
        'category': 'category',
        'scientific_name': 'category',
        'observation_count': 'category',
        'county_code': 'category',
        'observation_date': 'category',
        'geohash4': 'category',  # Max 32**4 = ~1m values -> ~4mb dictionary (separate dictionary per partition df)
    })
    # Drop duplicate rows
    #   - sample=0.001: 142k -> 98.7k rows
    #   - The sample=0.001 raw file shows most lines occur once but ~18 lines are repeated ~2500-2800 times; drop these
    #   - The raw->proj step threw out cols, which could create false dupes, but (checklist, species) should be unique enough
    #   - TODO Bottleneck; maybe merge into the groupby we'll need at the end?
    # .pipe(lambda ddf: (ddf
    #     .drop_duplicates(
    #         subset=['sampling_event_identifier', 'scientific_name'],
    #         # split_out=1,                # sample=0.001: 2.1s (default)
    #         # split_out=ddf.npartitions,  # sample=0.001: 25.9s (hmm...)
    #     )
    # ))
)
display(
    ebd_proj.npartitions,
    len(ebd_proj),
    ebd_proj.head(10),
    df_summary(ebd_proj.head(1000)).T,
)

[                                                                                          ] | 0% Completed |  0.0s

[#                                                                                         ] | 1% Completed |  0.1s

[####                                                                                      ] | 5% Completed |  0.2s

[######                                                                                    ] | 7% Completed |  0.3s

[#########                                                                                 ] | 10% Completed |  0.4s

[###########                                                                               ] | 12% Completed |  0.5s

[##############                                                                            ] | 15% Completed |  0.7s

[################                                                                          ] | 18% Completed |  0.8s

[##################                                                                        ] | 21% Completed |  0.9s

[#####################                                                                     ] | 24% Completed |  1.0s

[########################                                                                  ] | 27% Completed |  1.1s

[###########################                                                               ] | 30% Completed |  1.2s

[#############################                                                             ] | 32% Completed |  1.3s

[################################                                                          ] | 36% Completed |  1.4s

[###################################                                                       ] | 39% Completed |  1.5s

[######################################                                                    ] | 42% Completed |  1.6s

[#########################################                                                 ] | 45% Completed |  1.8s

[############################################                                              ] | 49% Completed |  1.9s

[##############################################                                            ] | 51% Completed |  2.0s

[#################################################                                         ] | 54% Completed |  2.1s

[###################################################                                       ] | 57% Completed |  2.2s

[######################################################                                    ] | 60% Completed |  2.3s

[#########################################################                                 ] | 63% Completed |  2.4s

[###########################################################                               ] | 66% Completed |  2.5s

[###############################################################                           ] | 70% Completed |  2.6s

[#################################################################                         ] | 73% Completed |  2.7s

[#####################################################################                     ] | 76% Completed |  2.8s

[#######################################################################                   ] | 79% Completed |  2.9s

[###########################################################################               ] | 83% Completed |  3.0s

[##############################################################################            ] | 87% Completed |  3.2s

[#################################################################################         ] | 91% Completed |  3.3s

[####################################################################################      ] | 94% Completed |  3.4s

[#######################################################################################   ] | 97% Completed |  3.5s

[##########################################################################################] | 100% Completed |  3.6s




[                                                                                          ] | 0% Completed |  0.0s

[##########################################################################################] | 100% Completed |  0.1s




[                                                                                          ] | 0% Completed |  0.0s

[##########################################################################################] | 100% Completed |  0.1s




124

142242

Unnamed: 0,category,scientific_name,observation_count,county_code,latitude,longitude,observation_date,all_species_reported,checklist_id,geohash4
0,species,Sterna forsteri,12,US-CA-087,36.9,-122.0,1996-07-03,0.0,14820275,9q93
1,species,Larus delawarensis,1,US-OH-171,41.5,-84.5,2004-08-13,0.0,4066497,dp7g
2,species,Sayornis saya,1,US-CA-087,36.9,-122.0,1988-11-02,0.0,12524570,9q93
3,species,Cardinalis cardinalis,X,US-NE-067,40.3,-96.8,1968-03-30,1.0,10521006,9z5j
4,species,Anas discors,3,US-IL-097,42.4,-88.1,1997-08-13,1.0,11895058,dp93
5,species,Aythya marila,18,US-CA-087,36.9,-122.0,1995-01-24,0.0,11638008,9q93
6,species,Setophaga citrina,1,US-AR-099,33.7,-93.1,2004-07-13,0.0,12124092,9vvz
7,species,Leucophaeus atricilla,2,US-MD-019,38.6,-75.7,2000-04-29,0.0,-554362,dqf5
8,species,Larus delawarensis,1,US-VA-810,37.0,-76.1,2001-02-11,0.0,5669109,dq9f
9,species,Icterus spurius,X,US-IL-031,42.1,-87.8,1966-05-22,1.0,12340445,dp3x


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,Unnamed: 6_level_0,Unnamed: 7_level_0,min,25%,50%,75%,max
Unnamed: 0_level_1,dtype,sizeof,len,count,nunique,mean,std,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
category,category,55899,1000,1000,6,,,domestic,species,species,species,spuh
scientific_name,category,66477,1000,1000,323,,,Accipiter cooperii,Circus cyaneus,Megaceryle alcyon,Riparia riparia,Zosterops japonicus
observation_count,category,57139,1000,1000,61,,,1,1,3,X,X
county_code,category,58000,1000,1000,342,,,US-AK-261,US-DE-001,US-MA-021,US-OH-035,US-WY-013
latitude,float32,24000,1000,1000,581,38.45,5.196,19.6,35.5,39.4,42.3,62.9
longitude,float32,24000,1000,1000,582,-95.69,17.68,-156,-114,-88.1,-82.3,-67.1
observation_date,category,59000,1000,1000,897,,,1891-04-23,1990-04-19,1999-02-06,2002-09-28,2004-12-31
all_species_reported,float16,24000,1000,1000,2,0.7842,0.4116,0,1,1,1,1
checklist_id,int32,28000,1000,1000,983,10160000.0,8642000.0,-2264427,2527863,8740569,14560999,35084168
geohash4,category,53000,1000,1000,446,,,8e3p,9tnr,djf8,dp9s,f800


In [None]:
# TODO TODO Old stuff below

In [None]:
# Inspect proj.tsv
# ebd_proj_tsv_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-0-proj.tsv-1'      #  12gb   147m lines
# ebd_proj_tsv_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-0-proj.tsv-0.1'    # 1.2gb  14.7m lines
# ebd_proj_tsv_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-0-proj.tsv-0.01'   # 120mb, 1.47m lines
ebd_proj_tsv_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-0-proj.tsv-0.001'  #  12mb,  147k lines
_ebd_proj_tsv_ddf = lambda **kwargs: (
    dd.read_csv(ebd_proj_tsv_path, **kwargs, sep='\t', dtype={
        # lonlat decimal precision [https://en.wikipedia.org/wiki/Decimal_degrees]
        #   - float32: ~5-6 decimals, ~.1-1m
        #   - float16: ~1-2 decimals, ~1-10km
        'LATITUDE': 'float32',
        'LONGITUDE': 'float32',
        'OBSERVATION COUNT': 'str',  # int | 'X'
        'ALL SPECIES REPORTED': 'float16',  # Smallest dtype for {1, 0, None}
    })
    .rename(columns=lambda c: c.lower().replace(' ', '_'))
)
ebd_proj_tsv_ddf = _ebd_proj_tsv_ddf()
display(
    ebd_proj_tsv_ddf.npartitions,
    ebd_proj_tsv_ddf.head(10),
    df_summary(ebd_proj_tsv_ddf.head(1000)).T,
)

In [None]:
# TODO Dedupe sightings by group_identifier (obs from group checklists get repeated |observers| many times):
#   .assign(checklist_id=group_identifier or sampling_event_identifier)
#   .drop(axis=1, labels=['sampling_event_identifier', 'group_identifier'])
#   .drop_duplicates()

In [None]:
# ebd_proj_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-1-proj.parquet'  # 190 parts, gz [~19s read .category]
# ebd_proj_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-2-proj.parquet'  # 16 parts [~30s read .category]
# ebd_proj_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-3-proj.parquet'  # 190 parts, gz, cats
ebd_proj_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-4-proj.parquet'  # 190 parts, gz, cats, geohash

In [None]:
# Convert proj.tsv -> proj.parquet
#   - [5m41s] 1-proj
#   - [6m14s] 3-proj
#   - [7m15s] 4-proj
(
    _ebd_proj_tsv_ddf()
    # .head(1000, compute=False)  # Faster dev
    # Compress checklist id from str to uint32 (e.g. 'S10150441' -> 10150441)
    #   - FIXME Barfs on Nones
    # .assign(
    #     sampling_event_identifier=lambda ddf: ddf.sampling_event_identifier.str[1:].astype(np.uint32),
    # )
    # Create geohash from (lat, lon)
    .pipe(lambda ddf: ddf.map_partitions(
        meta=ddf._meta.assign(
            geohash='',
        ),
        func=lambda df: df.assign(
            geohash=lambda df: np_geohash_encode_safe(df.latitude, df.longitude),
        ),
    ))
    # Create categories to tell fastparquet to write these columns using dictionary encoding (more compact)
    #   - NOTE Create category dtypes after munging meta, else weird `pd.Index(None)` errors on downstream reads
    .astype({
        'category': 'category',
        'scientific_name': 'category',
        'observation_count': 'category',
        'county_code': 'category',
        'observation_date': 'category',
        'geohash': 'category',
    })
    .pipe(puts, lambda ddf: ddf.npartitions)
    .pipe(tap, lambda ddf: pp(dict(**ddf.dtypes)))
    .to_parquet(ebd_proj_path,
        compression='gzip',
        # compression=None,
        compute=False,
    )
    # .compute(get=dask_get_for_scheduler('synchronous'))
    # .compute(get=dask_get_for_scheduler('threads'))
    .compute(get=dask_get_for_scheduler('processes'))
)

In [None]:
# Test reading the file we just wrote
(dd.read_parquet(ebd_proj_path)
    # .head(1000, compute=False)
    # .pipe(tap, lambda ddf: pp(list(ddf.dtypes)))
    # .categorize() to upgrade unknown->known categoricals (requires an extra pass over the input)
    # .pipe(lambda ddf: ddf.categorize(columns=list(
    #     ddf.dtypes[lambda s: s == 'category'].index
    #     # ['category', 'scientific_name']
    # )))
    # .pipe(tap, lambda ddf: pp(list(ddf.dtypes)))
    .head(1000, compute=False)
    .compute()
    .pipe(df_summary).T
)

In [None]:
# TODO TODO Did the above .assign(geohash=...) work?

In [None]:
# TODO TODO How big is each col, really? (cf. df_summary above)
# Test reading the file we just wrote
# cols = dd.read_parquet(ebd_proj_path).columns
cols = [
    # 'category', 'latitude', 'sampling_event_identifier',
    *dd.read_parquet(ebd_proj_path).columns,
]
for i, col in enumerate(cols):
    log.info('col %s/%s: %s' % (i+1, len(cols), col))
    (
        dd_read_parquet_sample(
            ebd_proj_path,
            # sample=None,  # 1.47b
            # sample=.1, sample_npartitions=8,  # 147m
            sample=[.1, .1],  # 14.7m
            # sample=[.1, .1, .1],  # 1.47m
            # columns=[col],  # XXX
        )
        # Drop obs with no checklist id
        #   - FIXME Some of these nulls are due to misaligned fields (maybe from the tsv split above?)
        #   - Not clear what the other nulls are from; try backing them out to the full field set (including group_identifier)
        # .pipe(lambda ddf: ddf[ddf.sampling_event_identifier.notnull()])  # ~96.3% (147 -> 142)
        # TODO De-dupe by group_identifier (requires re-running pipeline above to add group_identifier)
        # Compress checklist id from str (e.g. 'S10150441') to uint32 (e.g. 10150441), for ~46% compression (e.g. 24mb -> 13mb)
        #   - Assumes no nulls (filtered out above)
        # .assign(sampling_event_identifier=lambda ddf: ddf.sampling_event_identifier.str[1:].astype(np.uint32))
        [[col]]
        .to_parquet(f'/tmp/junk-ebd-col-{col}.parquet', compute=False,
            # compression='gzip',
            compression=None,
        )
        # .compute(get=dask_get_for_scheduler('threads'))
        .compute(get=dask_get_for_scheduler('processes'))
    )

In [None]:
# TODO Prototype (tail of) pipeline
(
    dd_read_parquet_sample(ebd_proj_path,
        # sample=None,      # 1.47b, len[55s], no_checklist[.037]
        # sample=.1,        # 147m,  len[7.1s], no_checklist[.037]
        sample=[.1, .1],  # 14.7m, len[1.1s], no_checklist[.037]
    )
    # Drop obs with no checklist id
    #   - FIXME Some of these nulls are due to misaligned fields (maybe from the tsv split above?)
    #   - Not clear what the other nulls are from; try backing them out to the full field set (including group_identifier)
    .pipe(lambda ddf: ddf[ddf.sampling_event_identifier.notnull()])  # ~96.3% (147 -> 142)
    # TODO De-dupe by group_identifier (requires re-running pipeline above to add group_identifier)
    # Compress checklist id from str (e.g. 'S10150441') to uint32 (e.g. 10150441), for ~46% compression (e.g. 24mb -> 13mb)
    #   - Assumes no nulls (filtered out above)
    .assign(sampling_event_identifier=lambda ddf: ddf.sampling_event_identifier.str[1:].astype(np.uint32))
    # [:200]
    .pipe(len)
)

In [None]:
# Compress checklist id from str (e.g. 'S10150441') to uint32 (e.g. 10150441), for ~46% compression (e.g. 24mb -> 13mb)
#   - Assumes no nulls (filtered out above)
col = 'sampling_event_identifier'
(dd.read_parquet(f'/tmp/junk-ebd-col-{col}.parquet')
    .pipe(lambda ddf: ddf[ddf.sampling_event_identifier.notnull()])
    .assign(
        sampling_event_identifier=lambda ddf: ddf.sampling_event_identifier.str[1:].astype(np.uint32),
    )
    .to_parquet(f'/tmp/junk-ebd-col-{col}-uint32.parquet', compute=False, compression=None)
    # .to_parquet(f'/tmp/junk-ebd-col-{col}-str.parquet', compute=False, compression=None)
    # .query('sampling_event_identifier != sampling_event_identifier')
    # [:20]
    .compute()
)

[                                                                                          ] | 0% Completed |  0.0s

[#                                                                                         ] | 2% Completed |  0.1s

[##################                                                                        ] | 20% Completed |  0.3s

[###############################                                                           ] | 34% Completed |  0.4s

[##########################################                                                ] | 46% Completed |  0.7s

[#######################################################                                   ] | 61% Completed |  0.8s

[################################################################                          ] | 71% Completed |  1.0s

[####################################################################################      ] | 93% Completed |  1.2s

[##########################################################################################] | 100% Completed |  1.3s




In [None]:
# TODO TODO Add a .repartition somewhere above so that part files are uniformly sized (currently weirdly bimodel)

In [None]:
# ebd_geo_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-4-geo.parquet'
ebd_geo_path = f'{data_dir}/ebird/derived/priors/ebd_US_relFeb-2017-5-geo.parquet'

In [None]:
# %%prun -l30
# Convert proj.parquet -> geo.parquet
#   - 4-geo: geohash:str - 2m29s, 264mb
#   - 5-geo: geohash:cat - 1m58s, 219mb
(
    dd.read_parquet(ebd_proj_path, columns=['latitude', 'longitude'])
    # .sample(frac=.1, random_state=0)
    # .head(1_000_000, compute=False)
    .pipe(lambda ddf: ddf.map_partitions(
        meta=ddf._meta.assign(
            geohash='',
            # geohash=np.uint64(),
            # geohash4='',
        ),
        func=lambda df: df.assign(
            # geohash4=None,
            # geohash=lambda df: np.vectorize(geohash.encode, [str])(df.latitude, df.longitude),
            geohash=lambda df: np_geohash_encode_safe(df.latitude, df.longitude),
        ),
    ))
    # Create categories to tell fastparquet to write these columns using dictionary encoding (more compact)
    #   - NOTE Create category dtypes after munging meta, else weird `pd.Index(None)` errors on downstream reads
    .astype({
        'geohash': 'category',
    })
    .pipe(puts, lambda ddf: ddf.npartitions)
    .pipe(tap, lambda ddf: pp(dict(**ddf.dtypes)))
    .to_parquet(ebd_geo_path,
        compression='gzip',
        # compression=None,
        compute=False,
    )
    # .compute(get=dask_get_for_scheduler('synchronous'))
    # .compute(get=dask_get_for_scheduler('threads'))
    .compute(get=dask_get_for_scheduler('processes'))
)

In [None]:
# Test reading the file we just wrote
(dd.read_parquet(ebd_proj_path)
    # .head(1000, compute=False)
    # .pipe(tap, lambda ddf: pp(list(ddf.dtypes)))
    # .categorize() to upgrade unknown->known categoricals (requires an extra pass over the input)
    # .pipe(lambda ddf: ddf.categorize(columns=list(
    #     ddf.dtypes[lambda s: s == 'category'].index
    #     # ['category', 'scientific_name']
    # )))
    # .pipe(tap, lambda ddf: pp(list(ddf.dtypes)))
    .head(1000, compute=False)
    .compute()
    .pipe(df_summary).T
)

In [None]:
# Count stuff
(
    dd.read_parquet(ebd_proj_path)
    # .head(100_000, compute=False)
    # .all_species_reported.size  # 147750723  # FIXME .size (cells) -> len (rows)
    # .all_species_reported.count()  # 142317658
    # .all_species_reported.isnull().mean()  # .037
    # .latitude.isnull().mean()  # .018
    .longitude.isnull().mean()  # .018
    # .category.nunique()  # 9
    # .scientific_name.nunique()  # 1650
    # .observation_count.nunique()  # 8813
    # .county_code.nunique()  # 3139
    # .observation_date.nunique()  # 35722
    # .all_species_reported.nunique()  # 2
    # .sampling_event_identifier.nunique()  # (Big...)
    .compute(get=dask_get_for_scheduler('processes'))
)

In [None]:
# TODO Do the geo agg
(
    dd.read_parquet(ebd_proj_path, columns=['category', 'geohash', 'scientific_name'])
    # .pipe(puts, lambda ddf: ddf.head(10))
    # .head(1_000, compute=False).sample(frac=1., random_state=0)  # Faster
    # Data volume
    #   prec         in     out    time
    #      4        100     145    7.0s
    #      4      1_000     660    7.0s
    #      4     10_000    6404    7.8s
    #      4    100_000   50491   18s
    #      4  1_000_000  253161  119s
    .sample(frac=(
        # 100
        1_000
        # 10_000
        # 100_000
        # 1_000_000
    ) / 147750723, random_state=0)  # Finds more bugs (e.g. nulls)
    # Drop obs without location
    .dropna(subset=['geohash'])
    # Drop obs without a well defined species [https://help.ebird.org/customer/portal/articles/1006825]
    .pipe(lambda ddf: ddf[ddf.category.isin(['species', 'domestic', 'issf', 'form'])])
    .assign(n=1).groupby(['geohash', 'scientific_name']).n.sum()
    # .compute(get=dask_get_for_scheduler('threads'))  # For %debug
    .compute(get=dask_get_for_scheduler('processes'))
    .reset_index()
    .count()
    # .pipe(df_summary).T
    # [:20]
)