In [1]:
import pyarrow as pa

from pygdf.dataframe import DataFrame

import pandas as pd
import json
import numpy as np
from python_scripts.numbaHistinMem import numba_gpu_histogram

def readArrowToDF(source):
    reader = pa.RecordBatchStreamReader(source)
    pa_df = reader.read_all()
    return pa_df.to_pandas()

def arrowToDisk(df, destination):
    pa_df = pa.RecordBatch.from_pandas(df)
    path = destination+".arrow"
    file = open(path, 'wb')
    writer = pa.ipc.RecordBatchStreamWriter(file, pa_df.schema)
    writer.write_batch(pa_df)
    writer.close()
    file.close()

## Read uber-dataset-v2 arrow 

In [2]:
df = readArrowToDF("../data/uber-dataset-v2.arrow")

In [3]:
def process(df,num_of_replicas, name, num_rows = 0):
    df_final = df.append([df]*num_of_replicas, ignore_index=True)
    if num_rows == 0:
        print(len(df_final))
        arrowToDisk(df_final,'node_server/uploads/'+name)
    else:
        df_final = df_final.loc[0:num_rows]
        print(len(df_final))
        arrowToDisk(df_final,'node_server/uploads/'+name)

In [4]:
df.head()

Unnamed: 0,sourceid,dstid,hod,mean_travel_time,standard_deviation_travel_time,geometric_mean_travel_time,geometric_standard_deviation_travel_time,source_lat,source_long,dst_lat,dst_long
0,650,181,19,1187.61,481.13,1104.91,1.45,-122.402451,37.761448,-122.259369,37.816994
1,232,2221,8,1242.33,183.8,1229.94,1.15,-122.128365,37.463764,-122.030777,37.236847
2,809,2460,23,1276.63,226.23,1257.19,1.19,-122.248901,37.817375,-122.407974,37.802288
3,1838,2363,0,431.26,193.26,405.81,1.38,-122.072083,37.369617,-122.001808,37.316303
4,82,910,8,1858.73,342.7,1825.93,1.21,-122.406548,37.751953,-122.50721,37.777042


In [5]:
df.mean_travel_time.describe()

count    4.832478e+07
mean     1.456005e+03
std      8.518368e+02
min      6.380000e+00
25%      8.432700e+02
50%      1.295580e+03
75%      1.909000e+03
max      1.066286e+04
Name: mean_travel_time, dtype: float64

## Create a 1-col dataset

### non-filter

In [4]:
uber_1_col = pd.DataFrame(df[['mean_travel_time']], dtype=np.float32)
del(df)

In [5]:
uber_1_col_final = uber_1_col.append([uber_1_col]*30, ignore_index=True)
print(len(uber_1_col_final))
del(uber_1_col)

1498068149


In [6]:
arrowToDisk(uber_1_col_final,'../data/uber-1-col-nonfilter')

In [7]:
del(uber_1_col_final)

### filter

In [23]:
uber_1_col = pd.DataFrame(df[['mean_travel_time']], dtype=np.float32)

In [24]:
uber_1_col_final = uber_1_col.append([uber_1_col]*4, ignore_index=True)
print(len(uber_1_col_final))

241623895


In [25]:
arrowToDisk(uber_1_col_final,'node_server/uploads/uber-1-col')

In [26]:
del(uber_1_col_final)

## Create a 2-col dataset

In [27]:
uber_2_col = pd.DataFrame(df[['mean_travel_time','hod']], dtype=np.float32)

In [28]:
print(uber_2_col.dtypes)

#48 Million rows
len(uber_2_col)

mean_travel_time    float32
hod                 float32
dtype: object


48324779

#### Multiply the rows by replicating

In [29]:
uber_2_col_final = uber_2_col.append([uber_2_col]*3, ignore_index=True)
len(uber_2_col_final)

193299116

In [30]:
arrowToDisk(uber_2_col_final,'node_server/uploads/uber-2-col')

## Create a 3-col dataset

In [33]:
uber_3_col = pd.DataFrame(df[['mean_travel_time','hod','standard_deviation_travel_time']], dtype=np.float32)

In [34]:
print(uber_3_col.dtypes)

#48 Million rows
len(uber_3_col)

mean_travel_time                  float32
hod                               float32
standard_deviation_travel_time    float32
dtype: object


48324779

In [35]:
uber_3_col_final = uber_3_col.append([uber_3_col]*3, ignore_index=True)
len(uber_3_col_final)

193299116

In [40]:
uber_3_col_final = uber_3_col_final.loc[0:150623895]

In [41]:
arrowToDisk(uber_3_col_final,'node_server/uploads/uber-3-col')

## Create a 4-col dataset

In [24]:
uber_4_col = pd.DataFrame(df[['mean_travel_time','hod','standard_deviation_travel_time','geometric_mean_travel_time']], dtype=np.float32)

In [28]:
process(uber_4_col, 3, 'uber-4-col',130623895)

130623896


In [30]:
del(uber_4_col)

## Create a 5-col dataset

In [29]:
uber_5_col = pd.DataFrame(df[['mean_travel_time','hod','standard_deviation_travel_time','geometric_mean_travel_time','geometric_standard_deviation_travel_time']], dtype=np.float32)

In [31]:
process(uber_5_col, 3, 'uber-5-col',110623895)

110623896


In [35]:
del(uber_5_col)

## Create a 6-col dataset

In [4]:
uber_6_col = pd.DataFrame(df[['mean_travel_time','hod','source_lat','source_long','dst_lat','dst_long']], dtype=np.float32)

In [5]:
process(uber_6_col, 3, 'uber-6-col',100000000)

100000001


## Check pygdf performance

In [8]:
import pygdf

In [9]:
df1 = readArrowToDF('../data/uber-1-col-nonfilter.arrow')

In [10]:
gdf = DataFrame()

In [11]:
df1.columns[0]

'mean_travel_time'

In [12]:
gdf[df1.columns[0]] = pygdf.Series(np.array(df1[df1.columns[0]].values))

In [4]:
del(df1)

In [5]:
backup = gdf

In [6]:
gdf.columns

('mean_travel_time',)

In [7]:
len(gdf)

821521243

In [13]:
%timeit numba_gpu_histogram(gdf['mean_travel_time'].to_gpu_array(),24)

675 ms ± 248 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [50]:
gdf = backup
try:
    gdf = gdf.query('mean_travel_time>1000 and mean_travel_time<10600')
except:
    del(gdf)
    del(backup)
    print("oom")

In [20]:
def querygdf():
    

In [21]:
%timeit -n 1 querygdf()

UnboundLocalError: local variable 'backup' referenced before assignment

In [7]:
temp_gdf = gdf.loc[:,['mean_travel_time']].nlargest(1000,['mean_travel_time']).to_pandas()

GDFError: CUDA ERROR. cudaErrorMemoryAllocation: out of memory

In [15]:
temp_gdf

Unnamed: 0,mean_travel_time,hod
39521679,10578.169922,15.0
87846458,10578.169922,15.0
136171237,10578.169922,15.0
42117261,10531.169922,15.0
90442040,10531.169922,15.0
138766819,10531.169922,15.0
47813881,10481.879883,15.0
96138660,10481.879883,15.0
144463439,10481.879883,15.0
37147108,10439.000000,15.0


In [8]:
del(gdf)

In [23]:
gdf = backup

In [24]:
del(gdf)
del(backup)