In [1]:
from IPython.core.display import display, HTML 
display(HTML("<style>.container { width:100% !important; }</style>")) #Make full screen width

In [2]:
_ = !pip install --ignore-installed --upgrade tensorflow==2.4.0

In [3]:
import os
import boto3
import pandas as pd; import numpy as np
import tensorflow as tf

import tqdm

In [4]:
print(tf.__version__)

2.4.0


In [5]:
rootDirectory = '/home/ec2-user/SageMaker/'
bucket_name = 'hilcorp-l48operations-plunger-lift-main'
prefix = 'DataByAPI/'

In [6]:
from sagemaker import get_execution_role
role = get_execution_role()
print(role)

arn:aws:iam::446356438225:role/service-role/hilcorp-l48operations-plunger-SagemakerPlungerRole-AC88IBLWFE8K


In [7]:
%%time
### Sync S3 Bucket wtih instance file store (Will replace when figure out how to stream S3 to tensorflow)
!aws s3 ls s3://hilcorp-l48operations-plunger-lift-main/DataByAPI --recursive --human-readable --summarize | head -5
!aws s3 sync s3://hilcorp-l48operations-plunger-lift-main/DataByAPI/ /home/ec2-user/SageMaker/DataByAPI

2021-01-11 17:28:46    8.4 MiB DataByAPI/0506705008.csv
2021-01-11 17:28:46   14.1 MiB DataByAPI/0506705009.csv
2021-01-11 17:28:46    8.3 MiB DataByAPI/0506705016.csv
2021-01-11 17:28:46    7.1 MiB DataByAPI/0506705027.csv
2021-01-11 17:28:46    8.6 MiB DataByAPI/0506705031.csv

[Errno 32] Broken pipe
Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>
BrokenPipeError: [Errno 32] Broken pipe
CPU times: user 36.3 ms, sys: 24.7 ms, total: 61 ms
Wall time: 3.05 s


In [8]:
#This is a new path which may be more efficient for training
def csv_to_tensor(file_path):
    #First read the file into a string
    sCsv = tf.io.read_file(file_path)#, [tf.constant('-1')]*1)#.numpy().decode('utf-8')
    #Then split the string into rows
    rowSplit = tf.strings.split(sCsv, sep=b'\r\n', maxsplit=-1, name='SplitLines')
    rowSplit = tf.strings.split(sCsv, sep=b'\n', maxsplit=-1, name='SplitLines')
    #The split that into rows and columns
    colSplit = tf.strings.split(rowSplit, sep=b',', maxsplit=-1, name='SplitLines')
    #Remove the header. The last line will be empty b/c the previous ended with the new line character. Convert to tensor
    colSplit = colSplit[1:-1,4:].to_tensor()#This removes the column header and the empty last row
    #Replace empty strings with -1
    colSplit = tf.where(tf.equal(colSplit, b''), b'-1', colSplit)
    #Convert from string to float32
    outTensor = tf.strings.to_number(colSplit, out_type = tf.dtypes.float32, name = 'f32TensorCsv')

    return outTensor

#This is a new path which may be more efficient for training
def replaceNanOrInf(X):
    bMask = tf.math.logical_or(tf.math.is_nan(X),tf.math.is_inf(X))
    return tf.where(bMask,-1.,X)

def process_path(file_path):

    # tf.print('file path: {}'.format(file_path))

    PLUNGER_SPEED_loc = 1

    GAS_PER_CYCLE_loc = 23
    FLOW_LENGTH_loc = 12
    SHUTIN_LENGTH_loc = 11
    LEAKING_VALVE_loc = 29

    FLOW_RATE_END_FLOW_loc = 2
    CS_MINUS_LN_SI_loc = 75
    PERCENT_CL_END_FLOW_loc = 76
    inputTensor = csv_to_tensor(file_path)

    inputTensor = tf.clip_by_value(inputTensor, -1e6, 1e6, name='ClippedInput')

    #Remove non-physical rows
    bMask = tf.greater(inputTensor[:,SHUTIN_LENGTH_loc],1)
    bMask = tf.logical_and(bMask,tf.greater(inputTensor[:,FLOW_LENGTH_loc],1))
    bMask = tf.logical_and(bMask,tf.greater(inputTensor[:,PLUNGER_SPEED_loc],1))
    bMask = tf.logical_and(bMask,tf.less(inputTensor[:,LEAKING_VALVE_loc],0.5))
    bMask = tf.logical_and(bMask,tf.greater(inputTensor[:,CS_MINUS_LN_SI_loc],1))

    inputTensor = tf.boolean_mask(inputTensor,bMask)

    Xpolicy = tf.stack((
                 inputTensor[1:-1,PERCENT_CL_END_FLOW_loc],
                 inputTensor[2:,CS_MINUS_LN_SI_loc]
                 ),
                 axis = 1)

    X = tf.concat((inputTensor[0:-2,:], #Results
                 Xpolicy), axis = 1) #Next Cycle's controller value

    #This calcualtes the MCFD for the cycle definition which ends with a plunger arrival.
    correctedMCFD = inputTensor[1:-1,GAS_PER_CYCLE_loc]/((inputTensor[1:-1,FLOW_LENGTH_loc]+inputTensor[2:,SHUTIN_LENGTH_loc])/86400.)

    Y = tf.stack((correctedMCFD,
                 inputTensor[2:,1]), #Plunger speed
                 axis = 1,
                #  name = r'Y_'+str(file_path)
                 )


    X = replaceNanOrInf(X)
    Y = replaceNanOrInf(Y)

    X = tf.clip_by_value(X,-1e2,1e6)
    Y = tf.clip_by_value(Y,0.,2000.)

    tf.debugging.check_numerics(X, 'X error, file: {} '.format(file_path))
    tf.debugging.check_numerics(Y, 'Y error, file: {} '.format(file_path))

    return X, Y, file_path

In [9]:
#https://medium.com/radix-ai-blog/tensorflow-sagemaker-d17774417f08
#https://stackoverflow.com/questions/62513518/how-to-save-a-tensor-to-tfrecord
""" 
Converts data to TFRecords file format 
"""

def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def convert_ds_to_TFRecord(ds, num_elements, name, directory):
#     num_elements = ds.reduce(np.int64(0), lambda x, _: x + 1).numpy()
    filename = os.path.join(directory, f'{name}-{num_elements}-Records.tfrecords')
    print(f'Writing {filename}')
    with tf.io.TFRecordWriter(filename) as writer: 
        for X, Y, path in tqdm.tqdm_notebook(ds):
            UWI = path.numpy()[-14:-4]
            num_time_steps = X.shape[0]
            # Serialize the tensors
            X_raw = X.numpy().tostring()
            Y_raw = Y.numpy().tostring()

            example = tf.train.Example(features=tf.train.Features(feature={
                    'UWI': _bytes_feature(UWI),
                    'X_raw': _bytes_feature(X_raw),
                    'Y_raw': _bytes_feature(Y_raw),
                    'num_time_steps': _int64_feature(num_time_steps)
                    }))

            writer.write(example.SerializeToString())

In [10]:
lFileNames = !ls /home/ec2-user/SageMaker/DataByAPI
num_examples = len(lFileNames)
print(f'{num_examples} wells to train on')

5138 wells to train on


In [11]:
DataFileNames = [rootDirectory + r'DataByAPI/*.csv']
buffer_size = 64
batch_size = 2

raw_dataset = tf.data.Dataset.list_files(DataFileNames)
allWellDs = raw_dataset.map(process_path)

In [12]:
# convert_ds_to_TFRecord(allWellDs,num_examples,'DatasetOneExamplePerWellWithUWI',r's3://hilcorp-l48operations-plunger-lift-main/TFRecordFiles/')
convert_ds_to_TFRecord(allWellDs,num_examples,'DatasetOneExamplePerWellWithUWI',r'/home/ec2-user/SageMaker/TFRecordFiles/')

Writing /home/ec2-user/SageMaker/TFRecordFiles/DatasetOneExamplePerWellWithUWI-5138-Records.tfrecords


HBox(children=(FloatProgress(value=0.0, max=5135.0), HTML(value='')))




In [1]:
# !aws s3 cp /home/ec2-user/SageMaker/TFRecordFiles/DatasetOneExamplePerWellWithUWI-5138-Records.tfrecords s3://hilcorp-l48operations-plunger-lift-main/TFRecordFiles/ 

Completed 13.4 GiB/26.1 GiB (60.3 MiB/s) with 1 file(s) remaining  

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



upload: TFRecordFiles/DatasetOneExamplePerWellWithUWI-5138-Records.tfrecords to s3://hilcorp-l48operations-plunger-lift-main/TFRecordFiles/DatasetOneExamplePerWellWithUWI-5138-Records.tfrecords


In [None]:
import pandas as pd
fname = 's3://hilcorp-l48operations-plunger-lift-main/DataByAPI/3003921089.csv'
df = pd.read_csv(fname, nrows = 100).head(1)
print(df.head())

In [None]:
from tensorflow.python.lib.io import file_io
print(file_io.stat(fname))
# print(file_io.stat('s3://datasets.elasticmapreduce/ngrams/books/20090715/eng-1M/1gram/data')) #This is a public S3 file
# file_io.stat('s3://datasets.elasticmapreduce/ngrams/books/20090715/eng-1M/1gram/data')