In [7]:
%%bigquery flights_df --project ml-course-test --verbose

SELECT
    departure_delay,
    distance,
    airline,
    departure_airport,
    arrival_airport,
    CAST(EXTRACT(DAYOFWEEK FROM departure_date) AS STRING) departure_weekday,
    CAST(EXTRACT(MONTH FROM departure_date) AS STRING) departure_month,
    IF(arrival_delay >= 15, 1, 0) delayed
FROM (
    SELECT
        departure_delay,
        ROUND(ST_DISTANCE(
            ST_GEOGPOINT(departure_lon, departure_lat),
            ST_GEOGPOINT(arrival_lon, arrival_lat)
        )/1000) distance,
        airline,
        arrival_airport,
        departure_airport,
        PARSE_DATE('%Y-%m-%d', date) departure_date,
        arrival_delay
    FROM `bigquery-samples.airline_ontime_data.flights`
    WHERE
        date >= '2009-01-01'
        AND date <= '2009-12-31'
        AND departure_delay > 0
)

Executing query with job ID: 7d523dc5-aa11-46e9-8843-641df166ce2b
Query executing: 0.76s
Query complete after 1.08s


In [13]:
def dataframe_to_dataset(dataframe, labels='delayed', shuffle=True, batch_size=32):
    dataframe = dataframe.copy()
    labels = dataframe.pop(labels)
    dataset = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(dataframe))
    dataset = dataset.batch(batch_size)
    return dataset

In [14]:
batch_size = 256
# how many observations should be processed at once

In [17]:
import tensorflow as tf

tf.keras.backend.set_floatx('float64')
# sets default float type, e.g. 64, 32
train_ds = dataframe_to_dataset(flights_df, batch_size=batch_size)
test_ds = dataframe_to_dataset(flights_df, shuffle=False, batch_size=batch_size)
train_ds
# ds does not yet have any data in it
# expects data to be piped in

<BatchDataset shapes: ({departure_delay: (None,), distance: (None,), airline: (None,), departure_airport: (None,), arrival_airport: (None,), departure_weekday: (None,), departure_month: (None,)}, (None,)), types: ({departure_delay: tf.float64, distance: tf.float64, airline: tf.string, departure_airport: tf.string, arrival_airport: tf.string, departure_weekday: tf.string, departure_month: tf.string}, tf.int64)>

In [20]:
# raw data is ready, but features are not specified
example_batch = next(iter(train_ds))[0]

departure_delay = tf.feature_column.numeric_column('departure_delay')
# creates a numeric column
feature_layer_demo = tf.keras.layers.DenseFeatures(departure_delay)
feature_layer_demo(example_batch).numpy()[:5]
# calls feature layer like you would any dense layer

array([[ 2.],
       [ 1.],
       [27.],
       [ 2.],
       [68.]], dtype=float32)

In [22]:
departure_delay_bucketized = tf.feature_column.bucketized_column(departure_delay, boundaries=[2,3,6,9,13])

feature_layer_demo = tf.keras.layers.DenseFeatures(departure_delay_bucketized)
feature_layer_demo(example_batch).numpy()[:5]

array([[0., 1., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1.],
       [0., 1., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1.]], dtype=float32)

In [24]:
departure_delay_bins = [2,3,6,9,13,19,28,44,72]
weekday_voc = [ str(n) for n in list(range(8)) ]

feature_columns = []

# the following code should be made into a function to avoid repetition

departure_delay = tf.feature_column.numeric_column('departure_delay')
departure_delay_buckets = tf.feature_column.bucketized_column(departure_delay, boundaries=departure_delay_bins)
feature_columns.append(departure_delay_buckets)
# create bucketized feature column for a numeric value

weekdays = tf.feature_column.categorical_column_with_vocabulary_list('departure_weekday', weekday_voc)
weekday_dummy = tf.feature_column.indicator_column(weekdays)
feature_columns.append(weekday_dummy)
# create discretized feature column for string values

In [25]:
feature_layer_demo = tf.keras.layers.DenseFeatures(feature_columns)
feature_layer_demo(example_batch).shape

TensorShape([256, 18])

In [26]:
feature_layer_demo(example_batch).numpy()[:1]

array([[0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.,
        0., 0.]], dtype=float32)

In [27]:
# define the model

In [30]:
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)

model_normal = tf.keras.models.Sequential([
    feature_layer,
    tf.keras.layers.Dense(1, activation='sigmoid', kernel_regularizer=tf.keras.regularizers.l2(0.0001))
])
# regularizers provide "penalties" when training, to keep values from becoming to large or erratic

# kernel regularizers are used when you don't know the distribution, shapes the WEIGHTS
# bias regularizers are used to make the output function pass as close to the origin as possible
# activity regularizers are used to make the output to the function chill tf out

# basically avoids overfitting

In [31]:
model_normal.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy']
)

In [32]:
# DISTRIBUTION

In [33]:
## Mirrored Strategy

In [34]:
distribute = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [35]:
with distribute.scope():
    # define model
    # compile model
    
    # same as non-distributed
    pass

In [36]:
# better for large networks