<a href="https://colab.research.google.com/github/nicsuzor/JustWatchAPI/blob/master/classifying/Convert_JSON_to_TFRecords_for_BERT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Read CSV dataset from Google Cloud Storage, pre-process, convert to TFRecords, and store.

In [1]:
import sys

!test -d bert_repo || git clone https://github.com/google-research/bert bert_repo
if not 'bert_repo' in sys.path:
  sys.path += ['bert_repo']
  
from google.colab import auth
auth.authenticate_user()

Cloning into 'bert_repo'...
remote: Enumerating objects: 299, done.[K
remote: Total 299 (delta 0), reused 0 (delta 0), pack-reused 299[K
Receiving objects: 100% (299/299), 184.41 KiB | 7.09 MiB/s, done.
Resolving deltas: 100% (179/179), done.


In [2]:
import tensorflow as tf
from tensorflow.python.lib.io import file_io
from pandas.compat import StringIO, BytesIO
import pandas as pd
from pathlib import Path
from run_classifier import InputExample, file_based_convert_examples_to_features
import tokenization 
import os 
import datetime
import csv

GCS_INPUT_PATH = 'gs://platform_governance_datasets/twitter_tested_20181221/*.csv.gz' #@param {type:"string"}
GCS_INPUT_GZIP = True #@param {type:"boolean"}
GCS_OUTPUT_PATH = 'gs://platform_governance_datasets/tfrecords_bert/twitter_tested_20181221/' #@param {type:"string"}
assert GCS_OUTPUT_PATH, 'Must specify an existing GCS bucket name'

# Available pretrained model checkpoints:
#   uncased_L-12_H-768_A-12: uncased BERT base model
#   uncased_L-24_H-1024_A-16: uncased BERT large model
#   cased_L-12_H-768_A-12: cased BERT large model
BERT_MODEL = 'uncased_L-24_H-1024_A-16' #@param {type:"string"}
BERT_PRETRAINED_DIR = 'gs://cloud-tpu-checkpoints/bert/' + BERT_MODEL
print('***** BERT pretrained directory: {} *****'.format(BERT_PRETRAINED_DIR))

MAX_SEQ_LENGTH = 64 #@param {type:"integer"}

tf.gfile.MakeDirs(GCS_OUTPUT_PATH)
print('***** TFRecords output directory: {} *****'.format(GCS_OUTPUT_PATH))

VOCAB_FILE = os.path.join(BERT_PRETRAINED_DIR, 'vocab.txt')
DO_LOWER_CASE = BERT_MODEL.startswith('uncased')
CONCURRENCY = 1 #@param {type:"integer"}
TEXT_FIELDS = ['text'] #@param {type:"string"}
ID_FIELD = 'id' #@param {type:"string"}
LABEL_FIELD = ''  #@param {type:"string"}
LABEL_LIST = ['0', '1']  #@param {type:"string"}

ALL_FIELDS = TEXT_FIELDS + [ID_FIELD]
if LABEL_FIELD:
  ALL_FIELDS.append(LABEL_FIELD)
  

# #gs://platform_governance_datasets/twitter_tested_20181221/twitter_tested_20181221_*.csv.gz


***** BERT pretrained directory: gs://cloud-tpu-checkpoints/bert/uncased_L-24_H-1024_A-16 *****
***** TFRecords output directory: gs://platform_governance_datasets/tfrecords_bert/twitter_tested_20181221/ *****


In [0]:
def read_data_gcs(gcs_path):
  """ Read the input data from Google Cloud Storage """
  tf.logging.info(f'downloading csv file from {gcs_path}')     

  # read every column as text
  dtypes = {}
  for t in ALL_FIELDS:
      dtypes[t] = object 

  if GCS_INPUT_GZIP:
    with file_io.FileIO(gcs_path, mode='rb') as f:
      data = pd.read_csv(f, encoding='utf-8', 
                    dtype=dtypes, usecols=ALL_FIELDS,
                    compression='gzip')
  else:
    file_stream = file_io.FileIO(gcs_path, mode='r')
    data = pd.read_csv(StringIO(file_stream.read()), encoding='utf-8', 
                      dtype=dtypes, usecols=ALL_FIELDS,
                      compression=None)

  return data


def preprocess_df(df):
  """ Pre-process the dataframe - wrangle the columns into a standard format."""
  tf.logging.info(f'Starting to preprocess dataframe containing {df.shape[0]} rows.')
  

  assert all(elem in df.columns for elem in ALL_FIELDS), "Dataset appears to be missing columns - please check your inputs."

  # combine multiple text fields if neccessary
  if len(TEXT_FIELDS) > 1:
    df['text_a'] = df[TEXT_FIELDS[0]].str.cat(df[TEXT_FIELDS[1:]], sep=' ', na_rep='')
  else:
    df['text_a'] = df[TEXT_FIELDS[0]]
  
  
  if LABEL_FIELD:
    df['label'] = df[LABEL_FIELD]
  else:
    df['label'] = '0'
    
  df['guid'] = df[ID_FIELD]
  
  df = df[['guid','text_a', 'label']]
  
  # drop fields with empty values
  df = df.dropna(axis='index', subset=['guid','text_a'])
  if LABEL_FIELD: # if we were reading labeled data, only keep labeled records
    df = df.dropna(axis='index', subset=['label'])
  
  df = df.drop_duplicates()

  for col in df.columns:
    # convert to unicode
    df[col] = df[col].astype('unicode')
  
  return df

def convert_df_to_examples_mp(df):
    batch_size = int(math.ceil(df.shape[0] / CONCURRENCY))
    list_df = [df[i:i + batch_size] for i in range(0, df.shape[0], batch_size)]
    logger.info(f'Chunked dataframe into {len(list_df)} chunks.')

    with mp.Pool(processes=CONCURRENCY) as pool:
        results = pool.map(convert_df_to_examples, list_df)

    #df = pd.concat(results)

    tf.logging.info(f'Finished processing dataframe.')

    #return df
    
def convert_df_to_examples(df):
    t0 = datetime.datetime.utcnow()

    """Creates examples from the dataframe rows."""
    examples = []
                
    for index, row in df.iterrows():
      guid = tokenization.convert_to_unicode(row['guid'])
      text_a = tokenization.convert_to_unicode(row['text_a'])
      label = tokenization.convert_to_unicode(row['label'])
  
      examples.append(
          InputExample(guid=guid, text_a=text_a, text_b=None, label=label))

    time_taken = datetime.datetime.utcnow() - t0
    tf.logging.info(f"Finished converting dataframe chunk in {time_taken}.")

    return examples

def save_data_gcs(gcs_path, df):
  with file_io.FileIO(gcs_path, mode='w') as file_stream:
    df.to_csv(file_stream, encoding='utf-8', quoting=csv.QUOTE_ALL)


In [0]:
""" Convert all the input files to TensorFlow Records and save to GCS"""
glob_list = tf.gfile.Glob(GCS_INPUT_PATH)
tokenizer = tokenization.FullTokenizer(
    vocab_file=VOCAB_FILE, do_lower_case=DO_LOWER_CASE)

t0 = datetime.datetime.now()

for file in glob_list:
  t1 = datetime.datetime.now()
  stem = Path(file).stem
  gcs_output_file = os.path.join(GCS_OUTPUT_PATH, stem + f'_{BERT_MODEL}_{MAX_SEQ_LENGTH}.tf_record')
  gcs_output_file_ids = gcs_output_file + '.ids.txt'

  if tf.gfile.Exists(gcs_output_file):
    tf.logging.info(f"Output file {gcs_output_file} already exists. Skipping input from {file}.")
    continue
  
  tf.logging.info(f"Reading CSV from {file}.")

  df = read_data_gcs(file)
  df = preprocess_df(df)
  tf_examples = convert_df_to_examples(df)

  tf.logging.info(f"Tokenizing {df.shape[0]} rows from {file}.")
  file_based_convert_examples_to_features(
      tf_examples, LABEL_LIST, MAX_SEQ_LENGTH, tokenizer, gcs_output_file)
  tf.logging.info(f"Saved features to {gcs_output_file}")
  
  tf.logging.info(f"***** Writing ID file to text {gcs_output_file_ids} *****")
  save_data_gcs(gcs_output_file_ids, df[['guid']])
  
  tz = datetime.datetime.now()
  tf.logging.info(f"Finished file in: {tz-t1}")

tz = datetime.datetime.now()
tf.logging.info(f"Finished entire run in: {tz-t0}")

INFO:tensorflow:Output file gs://platform_governance_datasets/tfrecords_bert/twitter_tested_20181221/twitter_tested_20181221_000000000000.csv_uncased_L-24_H-1024_A-16_64.tf_record already exists. Skipping input from gs://platform_governance_datasets/twitter_tested_20181221/twitter_tested_20181221_000000000000.csv.gz.
INFO:tensorflow:Reading CSV from gs://platform_governance_datasets/twitter_tested_20181221/twitter_tested_20181221_000000000001.csv.gz.
INFO:tensorflow:downloading csv file from gs://platform_governance_datasets/twitter_tested_20181221/twitter_tested_20181221_000000000001.csv.gz
INFO:tensorflow:Starting to preprocess dataframe containing 3165359 rows.
INFO:tensorflow:Tokenizing 3164727 rows from gs://platform_governance_datasets/twitter_tested_20181221/twitter_tested_20181221_000000000001.csv.gz.
INFO:tensorflow:Writing example 0 of 3164727
INFO:tensorflow:*** Example ***
INFO:tensorflow:guid: 917402900497948672
INFO:tensorflow:tokens: [CLS] rt @ 1 ##d ##0 ##ex : i w a n t

[<run_classifier.InputExample at 0x7f52f6087550>,
 <run_classifier.InputExample at 0x7f53652fbe80>,
 <run_classifier.InputExample at 0x7f52f6087588>,
 <run_classifier.InputExample at 0x7f52f60875c0>,
 <run_classifier.InputExample at 0x7f52f60875f8>]

In [0]:
df['label'] = '0'

In [0]:
tf_examples = convert_df_to_examples(df)

In [0]:
file_based_convert_examples_to_features(
    tf_examples, LABEL_LIST, MAX_SEQ_LENGTH, tokenizer, gcs_output_file)
tf.logging.info(f"Saved features to {gcs_output_file}")

tz = datetime.datetime.now()
tf.logging.info(f"Finished file in: {tz-t1}")

INFO:tensorflow:Writing example 0 of 3163860
INFO:tensorflow:*** Example ***
INFO:tensorflow:guid: 951860001273995264
INFO:tensorflow:tokens: [CLS] so ##fr ##er cal ##ada e a pi ##or co ##isa , iss ##o ta me mata ##ndo por dent ##ro . [SEP]
INFO:tensorflow:input_ids: 101 2061 19699 2121 10250 8447 1041 1037 14255 2953 2522 14268 1010 26354 2080 11937 2033 22640 15482 18499 21418 3217 1012 102 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
INFO:tensorflow:input_mask: 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
INFO:tensorflow:segment_ids: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
INFO:tensorflow:label: 0 (id = 0)
INFO:tensorflow:*** Example ***
INFO:tensorflow:guid: 929412161763467264
INFO:tensorflow:tokens: [CLS] fernando corona & amp ; gala monte ##s mu ##y bu ##en cover de cal ##ib ##

In [0]:
import csv
gcs_output_file_ids = gcs_output_file + '.ids.txt'

def save_data_gcs(gcs_path, df):
  with file_io.FileIO(gcs_path, mode='w') as file_stream:
    df.to_csv(file_stream, encoding='utf-8', quoting=csv.QUOTE_ALL)

tf.logging.info(f"***** Writing ID file to text {gcs_output_file_ids} *****")
save_data_gcs(gcs_output_file_ids, df[['guid']])
  

INFO:tensorflow:***** Writing ID file to text gs://platform_governance_datasets/tfrecords_bert/twitter_tested_20181221/twitter_tested_20181221_000000000000.csv_uncased_L-24_H-1024_A-16_64.tf_record.ids.txt *****
