## Conversion of tabular data to other formats for querying 
The purpose of this notebook is to test (memory, time) efficient methods to work with big data that don't fit into memory for queries and to run downstream analyses. In this case the data are large tabular files of mobile data. 
The two methods I will try are: 

1) To create a database [with SQLite](https://www.sqlite.org/index.html)
2) To re-write the data to a parquet format using [Apache Arrow](https://arrow.apache.org/docs/python/parquet.html)

(The second option honestly seems much faster and better than the first, at least so far in my hands.)

other resources to look into: 
- [Apache Beam](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py)
- [mobilekit POI mapping](https://mobilkit.readthedocs.io/en/latest/examples/M4R_03_POI_visit_analysis.html) and related [docs](https://mobilkit.readthedocs.io/en/latest/mobilkit.spatial.html)
- [info](https://docs.dask.org/en/stable/generated/dask.dataframe.read_parquet.html) about reading parquet files with dask 

### Data reading and package imports shared across methods: 

In [1]:
from dotenv import load_dotenv
load_dotenv()

import os
import glob
#from tqdm import tqdm, notebook
from tqdm.notebook import trange, tqdm

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

import dask.dataframe as dd
import geopandas as gpd
from datetime import datetime as dt

import mobilkit #.loader.crop_spatial as mkcrop_spatial

# Access environment variables and define other necessary variables
data_dir = os.getenv('WORKING_DIR')
meta_dir = f'{data_dir}metadata/'

data_2019 = f'{data_dir}data/year=2019/'
data_folders = glob.glob((data_2019 + '*/'))

initial_cols=['device_id', 'id_type', 'latitude', 'longitude', 'horizontal_accuracy', 'timestamp',  'ip_address', 'device_os', 'country', 'unknown_2', 'geohash']
sel_cols = ["device_id","latitude","longitude","timestamp","geohash","horizontal_accuracy"]
final_cols = ["uid","lat","lng","datetime","geohash","horizontal_accuracy"]

# boundary box that roughly captures the larger county of Bogota
minlon = -74.453
maxlon = -73.992
minlat = 3.727
maxlat = 4.835

['/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/year=2019/month=1/', '/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/year=2019/month=2/']


In [2]:
#### FUNCTIONS FOR DATA PROCESSING ####

def get_days(data_folder):
    """Assuming a directory organized as a month's worth of days with files in each directory like "day=01", etc """
    day_dirs = glob.glob((data_folder + '*/'))
    return day_dirs

def get_files(data_folder, day_dir):
    """Assuming a dir corresponding to and named for a day day_dir, (e.g. "day=01") within the data_folder with that day's mobile data files."""
    day = day_dir.split(data_folder)[1]
    filepaths = glob.glob((day_dir + '*[!*.gz]')) # select all the non-zipped mobile data files
    return filepaths, day

def load_data(filepaths, initial_cols, sel_cols, final_cols): 
    """Load in the mobile data and specify the columns"""
    ddf = dd.read_csv(filepaths, names=initial_cols)
    ddf = ddf[sel_cols]
    ddf.columns = final_cols
    return ddf 

def convert_datetime(ddf: dd.DataFrame): #needs work
    """Process timestamp to datetime for dataframe with a "datatime" column with timestamp values. """
    ddf["datetime"] = dd.to_datetime(ddf["datetime"], unit='ms', errors='coerce')
    ddf["datetime"] = ddf["datetime"].dt.tz_localize('UTC').dt.tz_convert('America/Bogota')
    return ddf

def preprocess_mobile(ddf: dd.DataFrame, final_cols: list, minlon , maxlon, minlat, maxlat): #needs work
    """Select only those points within an area of interest and process timestamp to datetime 
    for dataframe with a "datatime" column with timestamp values."""
    ddf = find_within_box(ddf, minlon, maxlon, minlat, maxlat)
    ddf = convert_datetime(ddf)[final_cols]
    df = ddf.compute()
    return df

def find_within_box(ddf, minlon, maxlon, minlat, maxlat):
    """Quick way to filter out points not in a particular rectangular region."""
    box=[minlon,minlat,maxlon,maxlat]
    filtered_ddf = mobilkit.loader.crop_spatial(ddf, box).reset_index()
    return filtered_ddf

#### FUNCTIONS FOR PARQUET CONVERSION ####

def write_to_pq(df, out_dir, filename): 
    table_name = f'{out_dir}{filename}.parquet'
    table = pa.Table.from_pandas(df)
    pq.write_table(table, table_name)

def from_month_write_filter_days_to_pq(data_folder: str, month: str, year: str, out_dir:str):
    day_dirs = glob.glob((data_folder + '*/'))
    for i in tqdm(range(0,len(day_dirs)), desc=f'Files from {year} {month} processed'): 
        day_dir = day_dirs[i]
        filepaths, day = get_files(data_folder, day_dir)
        day_name = day.split('/')[0]
        ddf = load_data(filepaths, initial_cols, sel_cols, final_cols)
        df = preprocess_mobile(ddf, final_cols, minlon, maxlon, minlat, maxlat)
        filename = f'bogota_area_{year}_{month}_{day_name}'
        write_to_pq(df, out_dir, filename)
    return

## Apache Re-formatting

### Convert tabular data to parquet
For each day in each month, load the files for the Colombia mobile data, filter the pings that are roughly in the Bogota area, and process the datetime. Write each day as a parquet file with the year, month, and day information in the filename.

More information [here](https://arrow.apache.org/docs/python/parquet.html).

In [3]:
in_dir = f'{data_dir}data/'
year = 'year=2019'
data_year = f'{in_dir}{year}/'
data_folders = glob.glob((data_year + '*/'))

out_dir = f'{data_dir}data/parquet/'

for i in range(0, len(data_folders)):
    data_folder = data_folders[i]
    month = data_folder.split(data_year)[1].split('/')[0]
    from_month_write_filter_days_to_pq(data_folder, month, year, out_dir)

Files from year=2019 month=1 processed:   0%|          | 0/31 [00:00<?, ?it/s]

Files from year=2019 month=2 processed:   0%|          | 0/28 [00:00<?, ?it/s]

  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))


### Make dataset from parquet
Convert the parquet files into a dataset that is queryable

More information [here](https://arrow.apache.org/docs/python/dataset.html#dataset).

In [104]:
import pyarrow.dataset as ds
pq_dir = f'{data_dir}data/parquet/'
#pq_dir = f'{data_dir}data/test_parquet/'

dataset = ds.dataset(pq_dir, format="parquet")

As in the documentation "Creating a Dataset object does not begin reading the data itself. If needed, it only crawls the directory to find all the file and infers the dataset’s schema (by default from the first file)." 

Lets look at some properties of the dataset, including what files it would be based on and some metadata.

In [105]:
print(dataset.files)
print(dataset.schema.to_string(show_field_metadata=False))
fragments = list(dataset.get_fragments())
print(fragments)
#fragments.split_by_row_group()

['/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/parquet/bogota_area_year=2019_month=1_day=01.parquet', '/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/parquet/bogota_area_year=2019_month=1_day=02.parquet', '/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/parquet/bogota_area_year=2019_month=1_day=03.parquet', '/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/parquet/bogota_area_year=2019_month=1_day=04.parquet', '/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/parquet/bogota_area_year=2019_month=1_day=05.parquet', '/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/parquet/bogota_area_year=2019_month=1_day=06.parquet', '/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/parquet/bogota_area_year=2019_month=1_day=07.parquet', '/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colombia/data/parquet/bogota_area_year=2019_month=1_day=08.parquet', '/Users/emilyrobitschek/git/ETH/SPUR/mobile_data_colomb

Using the Dataset.to_table() method we can read the dataset (or a portion of it) into a pyarrow Table. Depending on the dataset size this can require a lot of memory, so it's best to consider filtering the dataset first. For instance if we only want the user id column we can execute the following:

In [106]:
def compute_user_stats(dataset):
    table = dataset.to_table(columns=['uid', 'datetime']).to_pandas()
    table_dd = dd.from_pandas(table, npartitions=10)
    user_stats = mobilkit.stats.userStats(table_dd).compute()
    return user_stats

user_stats = compute_user_stats(dataset)
user_stats.head()

Unnamed: 0,uid,min_day,max_day,pings,daysActive,daysSpanned,pingsPerDay,avg
0,0004c9e3-5cba-42b9-93a1-9a0863cd727d,2019-01-05 00:00:00-05:00,2019-01-05 00:00:00-05:00,18,1,1,[18],18.0
1,0005e98f-60a8-4fee-aa37-3932aa9b9a09,2019-01-01 00:00:00-05:00,2019-01-24 00:00:00-05:00,14,3,23,"[7, 5, 2]",4.666667
2,00094aa7-3cff-43f2-a4ab-cf294178e973,2019-01-11 00:00:00-05:00,2019-01-30 00:00:00-05:00,108,8,19,"[9, 1, 5, 25, 5, 6, 56, 1]",13.5
3,000e37e7-925a-4ab8-b456-1ba283c2de97,2018-12-31 00:00:00-05:00,2019-01-31 00:00:00-05:00,379,11,31,"[1, 19, 16, 5, 1, 1, 242, 46, 9, 1, 38]",34.454545
4,000f520c-13ce-4e80-803a-801ea05933a8,2019-01-05 00:00:00-05:00,2019-01-29 00:00:00-05:00,133,5,24,"[1, 1, 84, 3, 44]",26.6


In [102]:
min_pings = 60
user_stats = pd.DataFrame(table.uid.value_counts()).reset_index().rename({'index': 'uid', 'uid': 'npings'}, axis=1)
user_stats_filtered = user_stats[user_stats['npings'] >= 60]
print(f'There are {len(user_stats)} users in the dataset, {len(user_stats_filtered)} of which have more than {min_pings} pings.')
user_stats_filtered.head()

There are 171381 users in the dataset, 15089 of which have more than 60 pings.


Unnamed: 0,uid,npings
0,adb6de33-05cd-452d-b0fc-2d77450776e9,13592
1,c278355a-ca73-4093-9ac0-e22a1101307d,12839
2,4a9ac0b5-7157-48c6-85d9-ec411fcc1fe1,10105
3,b7665a25-87d8-4741-8f74-d9d705ab4801,8174
4,78cef64c-8329-4834-a101-8e376db2faf4,7943


In [86]:
uid_minpings = list(user_stats_filtered['uid'])
table = dataset.to_table(filter=ds.field('uid').isin(uid_minpings)).to_pandas()

We can also filter by particular queries, which is so cool! 

In [88]:
def calculate_daysActive(dataset, uid): 
    table = dataset.to_table(filter=ds.field('uid') == uid).to_pandas()
    table['day'] = table['datetime'].apply(lambda x: str(x).split(' ', 1)[0])
    daysactive = table.nunique()['day']
    return daysactive

for i in trange(0, 10): #, desc=f'Users processed'): #len(user_stats_filtered))
    if 'daysActive' not in list(user_stats_filtered.columns):
        user_stats_filtered['daysActive'] = None
    daysactive = calculate_daysActive(dataset, user_stats_filtered['uid'][i])
    user_stats_filtered['daysActive'][i] = daysactive

user_stats_filtered.head()


  0%|          | 0/10 [00:00<?, ?it/s]

Unnamed: 0,uid,npings,daysActive
0,adb6de33-05cd-452d-b0fc-2d77450776e9,85339,31
1,78cef64c-8329-4834-a101-8e376db2faf4,61060,30
2,42149981-aa71-4323-a77a-2e174df5c4a9,55705,56
3,3f52705d-f8b9-437b-aa15-790b5aaf407a,52943,27
4,c278355a-ca73-4093-9ac0-e22a1101307d,40419,18


In [74]:
user_stats_filtered['uid'][0]

'adb6de33-05cd-452d-b0fc-2d77450776e9'

In [43]:

def calculate_daysActive(dataset, uid): 
    table = dataset.to_table(filter=ds.field('uid') == uid).to_pandas()
    table['day'] = table['datetime'].apply(lambda x: str(x).split(' ', 1)[0])
    daysactive = [table.nunique()['day']]
    return table, daysactive

uid = '1fb81f41-7260-4fa1-b061-b4ed1e482984'
table = dataset.to_table(filter=ds.field('uid') == uid).to_pandas()
table['day'] = [str(i).split(' ', 1)[0] for i in table['datetime']] #day_name returns day of the week 

user_stats = pd.DataFrame({})
user_stats['uid'] = [uid]
user_stats['daysActive'] = [table.nunique()['day']]
user_stats['npings'] = [len(table)]

user_stats.head()

Unnamed: 0,uid,daysActive,npings
0,1fb81f41-7260-4fa1-b061-b4ed1e482984,51,8231


In [22]:
pq_dir = f'{data_dir}data/test_parquet/'
ddf = dd.read_parquet(pq_dir)

In [None]:
users_stats_df.head()

## POI mapping

In [15]:
df_poi_loc = pd.read_csv(f"{meta_dir}/places/google_places.csv")
df_poi_loc = df_poi_loc.rename(columns={"lat": "poilat", "lon": "poilon"})
df_poi_loc['radius'] = 0.1 #in kilometers
df_poi_loc.head()

Unnamed: 0,name,poilat,poilon,types,status,perm_closed,filename,radius
0,Veterinaria Cani yi,4.7627,-74.0427,"['veterinary_care', 'point_of_interest', 'esta...",OPERATIONAL,False,vets,0.1
1,Veterinary Clinic La Salle University,4.7553,-74.0261,"['hospital', 'veterinary_care', 'health', 'poi...",OPERATIONAL,False,vets,0.1
2,Asistencia Médica Canina Cani Yi,4.7629,-74.043,"['veterinary_care', 'point_of_interest', 'esta...",OPERATIONAL,False,vets,0.1
3,Pet Shop Pelos Huellas y Bigotes,4.7442,-74.0989,"['veterinary_care', 'point_of_interest', 'esta...",OPERATIONAL,False,vets,0.1
4,guau guau .com,4.7493,-74.1106,"['veterinary_care', 'point_of_interest', 'esta...",OPERATIONAL,False,vets,0.1
