In [None]:
import cudf
import numpy as np
import pyarrow as pa
from math import ceil
import graphistry as g

In [None]:
df = cudf.read_csv('data/Thursday-01-03-2018_TrafficForML_CICFlowMeter.csv')
pdf = df.to_pandas()

In [None]:
print(len(df))
print(df.columns)

In [None]:
# help(g.hypergraph)

In [None]:
# WARNING -- TAKES A LONG TIME AND 55+ GiB OF HOST MEMORY
h = g.hypergraph(pdf, opts={
    'EVENTID': 'Label',
})

In [None]:
print(h['nodes'])

In [None]:
print(h['edges'])

In [None]:
print(h['graph'])

In [None]:
nodes_df = cudf.DataFrame.from_pandas(h['nodes'][['nodeID', 'type']])
nodes_df = cudf.DataFrame({
    'name': nodes_df['nodeID'].astype('category'),
    'type': nodes_df['type'].astype('category'),
})

edges_df = cudf.DataFrame.from_pandas(h['edges'][['attribID', 'Label']])
edges_df = cudf.DataFrame({
    'src': edges_df['attribID'].astype('category'),
    'dst': edges_df['Label'].astype('category')
})

In [None]:
# In case we need to pause and write to disk to avoid OOM'ing cuDF
# def write_arrow(df, path):
#     table = df.to_arrow()
#     writer = pa.RecordBatchStreamWriter(path, table.schema)
#     writer.write_table(table)
# #     writer.write_table(pa.Table.from_batches(table.to_batches(max_chunksize=len(table) / 1000)))
#     writer.close()

# write_arrow(nodes_df, 'data/Thursday-01-03-2018_TrafficForML_CICFlowMeter.nodes.arrow')
# write_arrow(edges_df, 'data/Thursday-01-03-2018_TrafficForML_CICFlowMeter.edges.arrow')

In [None]:
def combine_cats(*cols):
    cols = [col.astype('category') for col in cols]
    cats = cudf \
        .concat([col.cat.categories for col in cols]) \
        .to_series().drop_duplicates(ignore_index=True) \
        ._column
    codes_dtype = np.find_common_type([col.cat.codes.dtype for col in cols], [])
    cols = [
        col.cat._set_categories(
            col.cat.categories, cats, is_unique=True
        ) for col in cols
    ]
    return [
        cudf.Series(cudf.core.column.build_categorical_column(
            size=col.size,
            offset=col.offset,
            mask=col.base_mask,
            ordered=col.dtype.ordered,
            categories=col.cat().categories,
            codes=col.cat().codes.astype(codes_dtype)
        )) for col in cols
    ]

node_indices_col = cudf.core.index.RangeIndex(0, len(nodes_df))
(
    src_col,
    dst_col,
    node_name_col
) = combine_cats(edges_df['src'], edges_df['dst'], nodes_df['name'])

src_df = cudf.DataFrame({ 'src': node_name_col, 'id': node_indices_col })
src_df = cudf.DataFrame({ 'src': src_col }) \
    .merge(src_df, on='src', how='left') \
    .rename({'id': 'src', 'src': 'node'})

print(src_df.head())

dst_df = cudf.DataFrame({ 'dst': node_name_col, 'id': node_indices_col })
dst_df = cudf.DataFrame({ 'dst': dst_col }) \
    .merge(dst_df, on='dst', how='left') \
    .rename({'id': 'dst', 'dst': 'node'})

print(dst_df.head())

def type_to_color(types):
    color_indices = cudf.Series(types.cat.codes)
    color_palette = cudf.Series([
        -12451426,-11583787,-12358156,-10375427,
        -7610114,-4194305,-6752794,-5972565,
        -5914010,-4356046,-6140066
    ])
    color_palettes = []
    num_color_ids = color_indices.max() + 1
    for i in range(ceil(num_color_ids / len(color_palette))):
        color_palettes.append(color_palette)
    return cudf.Series(cudf.core.column.build_categorical_column(
        ordered=True,
        codes=color_indices._column,
        categories=cudf.concat(color_palettes)[:num_color_ids],
    ).as_numerical_column(dtype=np.int32))

nodes_df = cudf.DataFrame({
    'id': node_indices_col,
    'name': node_name_col,
    'type': nodes_df['type'],
    'color': type_to_color(nodes_df['type'])
})

edges_df = cudf.DataFrame({
    'src': src_df['src'],
    'dst': dst_df['dst'],
})

print(nodes_df.dtypes)
print(nodes_df.head())

print(edges_df.dtypes)
print(edges_df.head())

In [None]:
def add_edge_bundles(edges):
    edges = edges.reset_index().rename({'index': 'eid'}, copy=False)
    # Create a duplicate table with:
    # * all the [src, dst] in the upper half
    # * all the [dst, src] pairs as the lower half, but flipped so dst->src, src->dst
    bundles = cudf.DataFrame({
        'eid': cudf.concat([edges['eid'], edges['eid']]),
        # concat [src, dst] into the 'src' column
        'src': cudf.concat([edges['src'], edges['dst']]),
        # concat [dst, src] into the 'dst' column
        'dst': cudf.concat([edges['dst'], edges['src']])
    })

    # Group the duplicated edgelist by [src, dst] and get the min edge id.
    # Since all the [dst, src] pairs have been flipped to [src, dst], each
    # edge with the same [src, dst] or [dst, src] vertices will be assigned
    # the same bundle id
    bundles = bundles \
        .groupby(['src', 'dst']).agg({'eid': 'min'}) \
        .reset_index().rename({'eid': 'bid'}, copy=False)

    # Join the bundle ids into the edgelist
    edges = edges.merge(bundles, on=['src', 'dst'], how='inner')

    # Determine each bundle's size and relative offset
    bundles = edges['bid'].sort_values()
    lengths = bundles.value_counts(sort=False)
    offsets = lengths.cumsum() - lengths
    # Join the bundle segment lengths + offsets into the edgelist
    edges = edges.merge(cudf.DataFrame({
        'bid': bundles.unique().reset_index(drop=True),
        'start': offsets.reset_index(drop=True).astype(np.int32),
        'count': lengths.reset_index(drop=True).astype(np.int32),
    }), on='bid', how='left')

    # Determine each edge's index relative to its bundle
    edges = edges.sort_values(by='bid').reset_index(drop=True)
    edges['index'] = cudf.core.index.RangeIndex(0, len(edges)) - edges['start']
    edges['index'] = edges['index'].astype(np.int32)

    # Re-sort the edgelist by edge id and cleanup
    edges = edges.sort_values('eid').reset_index(drop=True)
    edges = edges.rename({'eid': 'id'}, copy=False)

    return edges[['id', 'src', 'dst', 'index', 'count']]

edges_df_2 = add_edge_bundles(edges_df)
print(edges_df_2.dtypes)
print(edges_df_2)

edges_df_2 = cudf.DataFrame({
    'edge': cudf.core.column.NumericalColumn(edges_df_2[['src', 'dst']]
        .reset_index(drop=True).stack().reset_index(drop=True)
        .data, dtype=np.int32),
    'bundle': cudf.core.column.NumericalColumn(edges_df_2[['index', 'count']]
        .reset_index(drop=True).stack().reset_index(drop=True)
        .data, dtype=np.int32),
})

print(edges_df_2.dtypes)
print(edges_df_2)

def write_arrow(df, path):
    table = df.to_arrow()
    writer = pa.RecordBatchStreamWriter(path, table.schema)
    writer.write_table(table)
    writer.close()

write_arrow(nodes_df[['id', 'color']], 'data/01032018-webgl-nodes.arrow')
write_arrow(edges_df[['src', 'dst']], 'data/01032018-webgl-edges.arrow')
write_arrow(edges_df_2[['edge', 'bundle']], 'data/01032018-webgl-edgelist.arrow')