In [1]:
import json

from pathlib import Path
from functools import partial
from tempfile import TemporaryDirectory

import datazimmer as dz
import geopandas as gpd
import pandas as pd

from parquetranger import TableRepo
from tqdm.notebook import tqdm


from metazimmer.gpsping.ubermedia.raw_proc import ping_table, ExtendedPing
from metazimmer.gpsping.minor_report import date_agg_table
from src.semantic_info import semantic_info_table, SemanticTemporalInfo
from src.util import filtered_count
from src.parks import write_part

In [2]:
park_path = dz.get_raw_data_path("UA2018_CAT14100_HUN_WGS84.zip")

In [3]:
gdf = gpd.read_file(park_path)

In [4]:
home_df = semantic_info_table.get_full_df(env="complete")

In [5]:
tmpdir = TemporaryDirectory()

In [7]:
out_trepo = TableRepo(f"{tmpdir.name}/park-pings-homed", group_cols=[ExtendedPing.device_group])

In [None]:
list(
    ping_table.map_partitions(
        env="complete",
        fun=partial(write_part, gdf=gdf, home_df=home_df, out_trepo=out_trepo),
        pbar=True,
        workers=4
    )
)

In [9]:
filled_clean_dfs = []
for df in tqdm(out_trepo.dfs):
    filled_clean_dfs.append(
        df.drop(
            [
                ExtendedPing.year_month,
                ExtendedPing.device_group,
                "time_bin",
                "info_name",
            ],
            axis=1,
        )
        .groupby(ExtendedPing.device_id, group_keys=False)
        .apply(pd.DataFrame.fillna, method="ffill")
    )

0it [00:00, ?it/s]

In [10]:
conc_df = pd.concat(filled_clean_dfs)

In [45]:
conc_df.shape[0] / 10 ** 6

28.197543

In [44]:
chunksize = 3_000_000
for i in tqdm(range(0, conc_df.shape[0], chunksize)):
    conc_df.iloc[i : i + chunksize, :].to_csv(
        dz.get_raw_data_path(f"parks-complete-pings-w-homes-{i+1:02}.csv.gz"),
        index=None,
        compression="gzip",
    )

  0%|          | 0/10 [00:00<?, ?it/s]

In [46]:
by_device_miss = (
    conc_df.groupby(ExtendedPing.device_id)["last_home__loc__lat"]
    .agg(lambda s: s.isna().mean())
    .value_counts()
)

In [47]:
by_device_miss.to_frame().assign(
    ibin=lambda df: pd.cut(df.index, [-0.01, 0.2, 0.4, 0.6, 0.8, 1])
).groupby("ibin").sum() / by_device_miss.sum()

Unnamed: 0_level_0,last_home__loc__lat
ibin,Unnamed: 1_level_1
"(-0.01, 0.2]",0.164107
"(0.2, 0.4]",0.005596
"(0.4, 0.6]",0.003984
"(0.6, 0.8]",0.002824
"(0.8, 1.0]",0.823489


In [48]:
devices = set(conc_df[ExtendedPing.device_id].unique())

In [49]:
dpath = Path(dz.get_raw_data_path("parks-devices-appearing.json"))

In [50]:
dpath.write_text(json.dumps(list(devices)))

29310864

In [16]:
# devices = set(json.loads(dpath.read_text()))

In [17]:
full_park_ping_counts = pd.concat(
    ping_table.map_partitions(
        env="complete", fun=partial(filtered_count, fset=devices), pbar=True, workers=4
    )
)

6144it [09:11, 11.14it/s]


In [18]:
comp_out = (
    full_park_ping_counts.groupby(level=0)
    .sum()
    .rename(columns=lambda s: f"filtered_{s}")
    .join(date_agg_table.get_full_df("complete").rename(columns=lambda s: f"total_{s}"))
)

In [19]:
comp_out.to_csv(dz.get_raw_data_path("parks-complete-count.csv"))