In [1]:
# Convert CSV file exported from bigquery table into BigTable avro format that can be ingested using Dataflow template

In [14]:
import json
from datetime import datetime

import pandas as pd
from avro.schema import Parse
from avro.io import DatumWriter
from avro.datafile import DataFileWriter

In [15]:
df = pd.read_csv('feature_table.csv')
df.head()

Unnamed: 0,user_id,channel,history,zip_code,created_timestamp
0,UID0896,Web,141.38,Rural,2022-04-30T00:00:00Z
1,UID5256,Web,121.47,Rural,2022-04-30T00:00:00Z
2,UID7512,Web,49.75,Rural,2022-04-30T00:00:00Z
3,UID3136,Web,447.09,Rural,2022-04-30T00:00:00Z
4,UID2096,Web,63.72,Rural,2022-04-30T00:00:00Z


In [16]:
bigtable_schema = {
    "name" : "BigtableRow",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "key", "type" : "bytes"},
      { "name" : "cells",
        "type" : {
          "type" : "array",
          "items": {
            "name": "BigtableCell",
            "type": "record",
            "fields": [
              { "name" : "family", "type" : "string"},
              { "name" : "qualifier", "type" : "bytes"},
              { "name" : "timestamp", "type" : "long", "logicalType" : "timestamp-micros"},
              { "name" : "value", "type" : "bytes"}
            ]
          }
        }
      }
   ]
}

In [17]:
parsed_schema = Parse(json.dumps(bigtable_schema))

In [25]:
row_key = 'user_id'
family_name = 'feature_name'
feature_list = ['channel', 'zip_code', 'history']

In [27]:
with open('features.avro', 'wb') as f:

    writer = DataFileWriter(f, DatumWriter(), parsed_schema)

    for item in df.iterrows():

        row = item[1]
        ts =  int(datetime.now().timestamp()) * 1000 * 1000

        for feat in feature_list:

            writer.append({
                "key": row[row_key].encode('utf-8'),
                "cells": [{"family": family_name, "qualifier": feat.encode('utf-8'), "timestamp": ts, "value": str(row[feat]).encode('utf-8')}]
            })
        
    writer.close()


In [49]:
!gsutil cp features.avro gs://mitochondrion-bucket-id/features6.avro

Copying file://features.avro [Content-Type=application/octet-stream]...
/ [1 files][  690.0 B/  690.0 B]                                                
Operation completed over 1 objects/690.0 B.                                      
