In [19]:
import numpy as np
from trax import layers as tl
from trax import shapes
from trax import fastmath

In [20]:
relu = tl.Relu()

# Inspect properties
print("-- Properties --")
print("name :", relu.name)
print("expected inputs :", relu.n_in)
print("promised outputs :", relu.n_out, "\n")

# Inputs
x = np.array([-2, -1, 0, 1, 2])
print("-- Inputs --")
print("x :", x, "\n")

# Outputs
y = relu(x)
print("-- Outputs --")
print("y :", y)

-- Properties --
name : Relu
expected inputs : 1
promised outputs : 1 

-- Inputs --
x : [-2 -1  0  1  2] 

-- Outputs --
y : [0 0 0 1 2]


In [21]:
# Concatenate layer
concat = tl.Concatenate()

print("-- Properties --")
print("name :", concat.name)
print("expected inputs :", concat.n_in)
print("promised outputs :", concat.n_out, "\n")

# Inputs
x1 = np.array([-10, -20, -30])
x2 = x1 / -10
print("-- Inputs --")
print("x1 :", x1)
print("x2 :", x2, "\n")

# Outputs
y = concat([x1, x2])
print("-- Outputs --")
print("y :", y)

-- Properties --
name : Concatenate
expected inputs : 2
promised outputs : 1 

-- Inputs --
x1 : [-10 -20 -30]
x2 : [1. 2. 3.] 

-- Outputs --
y : [-10. -20. -30.   1.   2.   3.]


In [22]:
# Configure a concatenate layer
concat_3 = tl.Concatenate(n_items=3)  # configure the layer's expected inputs
print("-- Properties --")
print("name :", concat_3.name)
print("expected inputs :", concat_3.n_in)
print("promised outputs :", concat_3.n_out, "\n")

# Inputs
x1 = np.array([-10, -20, -30])
x2 = x1 / -10
x3 = x2 * 0.99
print("-- Inputs --")
print("x1 :", x1)
print("x2 :", x2)
print("x3 :", x3, "\n")

# Outputs
y = concat_3([x1, x2, x3])
print("-- Outputs --")
print("y :", y)

-- Properties --
name : Concatenate
expected inputs : 3
promised outputs : 1 

-- Inputs --
x1 : [-10 -20 -30]
x2 : [1. 2. 3.]
x3 : [0.99 1.98 2.97] 

-- Outputs --
y : [-10.   -20.   -30.     1.     2.     3.     0.99   1.98   2.97]


In [23]:
help(tl.Concatenate)

Help on class Concatenate in module trax.layers.combinators:

class Concatenate(trax.layers.base.Layer)
 |  Concatenate(n_items=2, axis=-1)
 |  
 |  Concatenates n tensors into a single tensor.
 |  
 |  Method resolution order:
 |      Concatenate
 |      trax.layers.base.Layer
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, n_items=2, axis=-1)
 |      Creates a partially initialized, unconnected layer instance.
 |      
 |      Args:
 |        n_in: Number of inputs expected by this layer.
 |        n_out: Number of outputs promised by this layer.
 |        name: Class-like name for this layer; for use when printing this layer.
 |        sublayers_to_print: Sublayers to display when printing out this layer;
 |          By default (when None) we display all sublayers.
 |  
 |  forward(self, xs)
 |      Computes this layer's output as part of a forward pass through the model.
 |      
 |      Authors of new layer subclasses should override this method to d

In [24]:
# Layer initialization
norm = tl.LayerNorm()
# You first must know what the input data will look like
x = np.array([0, 1, 2, 3], dtype="float")

# Use the input data signature to get shape and type for initializing weights and biases
norm.init(shapes.signature(x)) # We need to convert the input datatype from usual tuple to trax ShapeDtype

print("Normal shape:",x.shape, "Data Type:",type(x.shape))
print("Shapes Trax:",shapes.signature(x),"Data Type:",type(shapes.signature(x)))

# Inspect properties
print("-- Properties --")
print("name :", norm.name)
print("expected inputs :", norm.n_in)
print("promised outputs :", norm.n_out)
# Weights and biases
print("weights :", norm.weights[0])
print("biases :", norm.weights[1], "\n")

# Inputs
print("-- Inputs --")
print("x :", x)

# Outputs
y = norm(x)
print("-- Outputs --")
print("y :", y)

Normal shape: (4,) Data Type: <class 'tuple'>
Shapes Trax: ShapeDtype{shape:(4,), dtype:float64} Data Type: <class 'trax.shapes.ShapeDtype'>
-- Properties --
name : LayerNorm
expected inputs : 1
promised outputs : 1
weights : [1. 1. 1. 1.]
biases : [0. 0. 0. 0.] 

-- Inputs --
x : [0. 1. 2. 3.]
-- Outputs --
y : [-1.3416404  -0.44721344  0.44721344  1.3416404 ]


In [25]:
help(tl.Fn)

Help on function Fn in module trax.layers.base:

Fn(name, f, n_out=1)
    Returns a layer with no weights that applies the function `f`.
    
    `f` can take and return any number of arguments, and takes only positional
    arguments -- no default or keyword arguments. It often uses JAX-numpy (`jnp`).
    The following, for example, would create a layer that takes two inputs and
    returns two outputs -- element-wise sums and maxima:
    
        `Fn('SumAndMax', lambda x0, x1: (x0 + x1, jnp.maximum(x0, x1)), n_out=2)`
    
    The layer's number of inputs (`n_in`) is automatically set to number of
    positional arguments in `f`, but you must explicitly set the number of
    outputs (`n_out`) whenever it's not the default value 1.
    
    Args:
      name: Class-like name for the resulting layer; for use in debugging.
      f: Pure function from input tensors to output tensors, where each input
          tensor is a separate positional arg, e.g., `f(x0, x1) --> x0 + x1`.
          

In [26]:
# Define a custom layer
# In this example you will create a layer to calculate the input times 2

def TimesTwo():
    layer_name = "TimesTwo" #don't forget to give your custom layer a name to identify

    # Custom function for the custom layer
    def func(x):
        return x * 2

    return tl.Fn(layer_name, func)

# Test it
times_two = TimesTwo()

# Inspect properties
print("-- Properties --")
print("name :", times_two.name)
print("expected inputs :", times_two.n_in)
print("promised outputs :", times_two.n_out, "\n")

# Inputs
x = np.array([1, 2, 3])
print("-- Inputs --")
print("x :", x, "\n")

# Outputs
y = times_two(x)
print("-- Outputs --")
print("y :", y)

-- Properties --
name : TimesTwo
expected inputs : 1
promised outputs : 1 

-- Inputs --
x : [1 2 3] 

-- Outputs --
y : [2 4 6]


In [27]:
# combinators are used to combine layers into more complex layers
# Serial Combinator is most common and easiest to use.
# help(tl.Serial)
help(tl.Parallel)

Help on class Parallel in module trax.layers.combinators:

class Parallel(trax.layers.base.Layer)
 |  Parallel(*sublayers, name=None)
 |  
 |  Combinator that applies a list of layers in parallel to its inputs.
 |  
 |  Layers in the list apply to successive spans of inputs, where the spans are
 |  determined how many inputs each layer takes. The resulting output is the
 |  (flattened) concatenation of the respective layer outputs.
 |  
 |  For example, suppose one has three layers:
 |  
 |    - F: 1 input, 1 output
 |    - G: 3 inputs, 1 output
 |    - H: 2 inputs, 2 outputs (h1, h2)
 |  
 |  Then Parallel(F, G, H) will take 6 inputs and give 4 outputs:
 |  
 |    - inputs: a, b, c, d, e, f
 |    - outputs: F(a), G(b, c, d), h1, h2
 |  
 |  As an important special case, a None argument to Parallel acts as if it takes
 |  one argument, which it leaves unchanged. (It acts as a one-arg no-op.) For
 |  example:
 |  
 |    Parallel(None, F)
 |  
 |  creates a layer that passes its first in

In [28]:
# Serial combinator
serial = tl.Serial(
    tl.LayerNorm(),         # normalize input
    tl.Relu(),              # convert negative values to zero
    times_two,              # the custom layer you created above, multiplies the input recieved from above by 2
    
    ### START CODE HERE
    tl.Dense(n_units=2),  # try adding more layers. eg uncomment these lines
    # tl.Dense(n_units=1),  # Binary classification, maybe? uncomment at your own peril
    tl.LogSoftmax()       # Yes, LogSoftmax is also a layer
    ### END CODE HERE
)

# Initialization
x = np.array([-2, -1, 0, 1, 2]) #input
serial.init(shapes.signature(x)) #initialising serial instance

print("-- Serial Model --")
print(serial,"\n")
print("-- Properties --")
print("name :", serial.name)
print("sublayers :", serial.sublayers)
print("expected inputs :", serial.n_in)
print("promised outputs :", serial.n_out)
print("weights & biases:", serial.weights, "\n")

# Inputs
print("-- Inputs --")
print("x :", x, "\n")

# Outputs
y = serial(x)
print("-- Outputs --")
print("y :", y)

-- Serial Model --
Serial[
  LayerNorm
  Relu
  TimesTwo
  Dense_2
  LogSoftmax
] 

-- Properties --
name : Serial
sublayers : [LayerNorm, Relu, TimesTwo, Dense_2, LogSoftmax]
expected inputs : 1
promised outputs : 1
weights & biases: [(DeviceArray([1, 1, 1, 1, 1], dtype=int32), DeviceArray([0, 0, 0, 0, 0], dtype=int32)), (), (), (DeviceArray([[ 0.47118202, -0.7620581 ],
             [ 0.3756661 , -0.4218606 ],
             [ 0.15808171, -0.07303074],
             [-0.2424559 , -0.12089023],
             [-0.2705172 , -0.8956146 ]], dtype=float32), DeviceArray([8.7000068e-08, 1.6511547e-06], dtype=float32)), ()] 

-- Inputs --
x : [-2 -1  0  1  2] 

-- Outputs --
y : [-0.1845535 -1.7806741]


In [29]:
# Jax
# Numpy vs fastmath.numpy have different data types
# Regular ol' numpy
x_numpy = np.array([1, 2, 3])
print("good old numpy : ", type(x_numpy), "\n")

# Fastmath and jax numpy
x_jax = fastmath.numpy.array([1, 2, 3])
print("jax trax numpy : ", type(x_jax))

good old numpy :  <class 'numpy.ndarray'> 

jax trax numpy :  <class 'jax.interpreters.xla._DeviceArray'>


# Classes and subclasses

In [30]:
class My_Class:
    x = None

instance_a = My_Class()
instance_b = My_Class()
print(str(instance_a.x))
print(str(instance_b.x))

None
None


In [31]:
instance_a.x = 5
print(instance_a.x)

5


In [32]:
class My_Class: 
    def __init__(self, y): # The __init__ method takes as input the instance to be initialized and a variable y
        self.x = y         # Sets parameter x to be equal to y

instance_c = My_Class(10)
instance_c.x

10

In [33]:
# __call__ method
class My_Class:
    def __init__(self, y):
        self.x = y
    
    def __call__(self, z):
        self.x +=  z
        print(self.x)

instance_d = My_Class(5)
instance_d(10)

15


In [34]:
class My_Class: 
    def __init__(self, y, z): #Initialization of x_1 and x_2 with arguments y and z
        ### START CODE HERE (2 lines) ### 
        self.x_1 = y
        self.x_2 = z
        ### END CODE HERE ###
    def __call__(self):       #When called, adds the values of parameters x_1 and x_2, prints and returns the result 
        ### START CODE HERE (1 line) ### 
        result = self.x_1 + self.x_2 
        ### END CODE HERE ### 
        print("Addition of {} and {} is {}".format(self.x_1,self.x_2,result))
        return result

instance_e = My_Class(10,15)
def test_class_definition():
    
    assert instance_e.x_1 == 10, "Check the value assigned to x_1"
    assert instance_e.x_2 == 15, "Check the value assigned to x_2"
    assert instance_e() == 25, "Check the __call__ method"
    
    print("\033[92mAll tests passed!")
    
test_class_definition()

Addition of 10 and 15 is 25
[92mAll tests passed!


In [35]:
class My_Class: 
    def __init__(self, y, z): #Initialization of x_1 and x_2 with arguments y and z
        self.x_1 = y
        self.x_2 = z
    def __call__(self):       #Performs an operation with x_1 and x_2, and returns the result
        a = self.x_1 - 2*self.x_2 
        return a
    def my_method(self, w):   #Multiplies x_1 and x_2, adds argument w and returns the result
        result = self.x_1*self.x_2 + w
        return result

instance_f = My_Class(1, 10)
instance_f.my_method(16)

26

In [36]:
# hidden-cell
class My_Class: 
    def __init__(self, y, z):      #Initialization of x_1 and x_2 with arguments y and z
        self.x_1 = y
        self.x_2 = z
    def __call__(self):            #Performs an operation with x_1 and x_2, and returns the result
        a = self.x_1 - 2*self.x_2 
        return a
    def my_method(self, w):        #Multiplies x_1 and x_2, adds argument w and returns the result
        b = self.x_1*self.x_2 + w
        return b
    def new_method(self, v):       #Calls My_method with argument v
        result = self.my_method(v)
        return result

instance_g = My_Class(1,10)
print(instance_g.my_method(16))
print(instance_g.new_method(16))

26
26


In [37]:
# Part 2: Subclasses and Inheritance
class sub_c(My_Class):           #Subclass sub_c from My_class
    def additional_method(self): #Prints the value of parameter x_1
        print(self.x_1)

In [38]:
instance_sub_a = sub_c(1,10)
print('Parameter x_1 of instance_sub_a: ' + str(instance_sub_a.x_1))
print('Parameter x_2 of instance_sub_a: ' + str(instance_sub_a.x_2))
print("Output of my_method of instance_sub_a:",instance_sub_a.my_method(16))

Parameter x_1 of instance_sub_a: 1
Parameter x_2 of instance_sub_a: 10
Output of my_method of instance_sub_a: 26


In [39]:
class sub_c(My_Class):           #Subclass sub_c from My_class
    def my_method(self):         #Multiplies x_1 and x_2 and returns the result
        ### START CODE HERE (1 line) ###
        b = self.x_1*self.x_2 
        ### END CODE HERE ###
        return b

test = sub_c(3,10)
assert test.my_method() == 30, "The method my_method should return the product between x_1 and x_2"

print("Output of overridden my_method of test:",test.my_method())

Output of overridden my_method of test: 30


In [40]:
y,z= 1,10
instance_sub_a = sub_c(y,z)
instance_a = My_Class(y,z)
print('My_method for an instance of sub_c returns: ' + str(instance_sub_a.my_method()))
print('My_method for an instance of My_Class returns: ' + str(instance_a.my_method(10)))

My_method for an instance of sub_c returns: 10
My_method for an instance of My_Class returns: 20


# Data generators

In [45]:
# Generator is like an iterator, it returns the net item. Useful to handle loading and transforming data for different applications

import random
import numpy as np

# Traversing a list of indexes to create a circular list
a = [1, 2, 3, 4]
b = [0] * 10

a_size = len(a)
b_size = len(b)
lines_index = [*range(a_size)]
index = 0
for i in range(b_size): # b is longer than a, forcing a wrap
    if index >= a_size:
        index = 0

    b[i] = a[lines_index[index]]
    index += 1

print(b)


[1, 2, 3, 4, 1, 2, 3, 4, 1, 2]


In [48]:
# Shuffling the data order
# Example of traversing a list of indexes to create a circular list
a = [1, 2, 3, 4]
b = []

a_size = len(a)
b_size = 10
lines_index = [*range(a_size)]
print("Original order of index:",lines_index)

# if we shuffle the index_list we can change the order of our circular list
# without modifying the order or our original data
random.shuffle(lines_index) # Shuffle the order
print("Shuffled order of index:",lines_index)

print("New value order for first batch:",[a[index] for index in lines_index])
batch_counter = 1
index = 0                # similar to index in data_generator below
for i in range(b_size):  # `b` is longer than `a` forcing a wrap
    # We wrap by resetting index to 0
    if index >= a_size:
        index = 0
        batch_counter += 1
        random.shuffle(lines_index) # Re-shuffle the order
        print("\nShuffled Indexes for Batch No.{} :{}".format(batch_counter,lines_index))
        print("Values for Batch No.{} :{}".format(batch_counter,[a[index] for index in lines_index]))
    
    b.append(a[lines_index[index]])     #  `indexes_list[index]` point to a index of a. Store the result in b
    index += 1
print()    
print("Final value of b:",b)

Original order of index: [0, 1, 2, 3]
Shuffled order of index: [1, 2, 0, 3]
New value order for first batch: [2, 3, 1, 4]

Shuffled Indexes for Batch No.2 :[0, 2, 3, 1]
Values for Batch No.2 :[1, 3, 4, 2]

Shuffled Indexes for Batch No.3 :[1, 0, 2, 3]
Values for Batch No.3 :[2, 1, 3, 4]

Final value of b: [2, 3, 1, 4, 1, 3, 4, 2, 2, 1]


In [49]:
# Implement a data generator function that takes in batch_size, x, y shuffle
def data_generator(batch_size, data_x, data_y, shuffle=True):
    data_lng = len(data_x)
    index_list = [*range(data_lng)]

    if shuffle:
        random.shuffle(index_list)
    
    index = 0
    while True:
        X = [None] * batch_size
        Y = [None] * batch_size

        for i in range(batch_size):
            #Wrap the index each time that we reach the end of the list
            if index >= data_lng:
                index=0
                if shuffle:
                    random.shuffle(index_list)

            X[i] = data_x[index_list[index]]
            Y[i] = data_y[index_list[index]]

            index += 1
        yield((X,Y))

In [52]:
def test_data_generator():
    x = [1, 2, 3, 4]
    y = [xi ** 2 for xi in x]
    
    generator = data_generator(3, x, y, shuffle=False)

    assert np.allclose(next(generator), ([1, 2, 3], [1, 4, 9])),  "First batch does not match"
    assert np.allclose(next(generator), ([4, 1, 2], [16, 1, 4])), "Second batch does not match"
    assert np.allclose(next(generator), ([3, 4, 1], [9, 16, 1])), "Third batch does not match"
    assert np.allclose(next(generator), ([2, 3, 4], [4, 9, 16])), "Fourth batch does not match"

    print("\033[92mAll tests passed!")

test_data_generator()

[92mAll tests passed!


In [53]:
import base64

solution = "ZGVmIGRhdGFfZ2VuZXJhdG9yKGJhdGNoX3NpemUsIGRhdGFfeCwgZGF0YV95LCBzaHVmZmxlPVRydWUpOgoKICAgIGRhdGFfbG5nID0gbGVuKGRhdGFfeCkgIyBsZW4oZGF0YV94KSBtdXN0IGJlIGVxdWFsIHRvIGxlbihkYXRhX3kpCiAgICBpbmRleF9saXN0ID0gWypyYW5nZShkYXRhX2xuZyldICMgQ3JlYXRlIGEgbGlzdCB3aXRoIHRoZSBvcmRlcmVkIGluZGV4ZXMgb2Ygc2FtcGxlIGRhdGEKICAgIAogICAgIyBJZiBzaHVmZmxlIGlzIHNldCB0byB0cnVlLCB3ZSB0cmF2ZXJzZSB0aGUgbGlzdCBpbiBhIHJhbmRvbSB3YXkKICAgIGlmIHNodWZmbGU6CiAgICAgICAgcm5kLnNodWZmbGUoaW5kZXhfbGlzdCkgIyBJbnBsYWNlIHNodWZmbGUgb2YgdGhlIGxpc3QKICAgIAogICAgaW5kZXggPSAwICMgU3RhcnQgd2l0aCB0aGUgZmlyc3QgZWxlbWVudAogICAgd2hpbGUgVHJ1ZToKICAgICAgICBYID0gWzBdICogYmF0Y2hfc2l6ZSAjIFdlIGNhbiBjcmVhdGUgYSBsaXN0IHdpdGggYmF0Y2hfc2l6ZSBlbGVtZW50cy4gCiAgICAgICAgWSA9IFswXSAqIGJhdGNoX3NpemUgIyBXZSBjYW4gY3JlYXRlIGEgbGlzdCB3aXRoIGJhdGNoX3NpemUgZWxlbWVudHMuIAogICAgICAgIAogICAgICAgIGZvciBpIGluIHJhbmdlKGJhdGNoX3NpemUpOgogICAgICAgICAgICAKICAgICAgICAgICAgIyBXcmFwIHRoZSBpbmRleCBlYWNoIHRpbWUgdGhhdCB3ZSByZWFjaCB0aGUgZW5kIG9mIHRoZSBsaXN0CiAgICAgICAgICAgIGlmIGluZGV4ID49IGRhdGFfbG5nOgogICAgICAgICAgICAgICAgaW5kZXggPSAwCiAgICAgICAgICAgICAgICAjIFNodWZmbGUgdGhlIGluZGV4X2xpc3QgaWYgc2h1ZmZsZSBpcyB0cnVlCiAgICAgICAgICAgICAgICBpZiBzaHVmZmxlOgogICAgICAgICAgICAgICAgICAgIHJuZC5zaHVmZmxlKGluZGV4X2xpc3QpICMgcmUtc2h1ZmZsZSB0aGUgb3JkZXIKICAgICAgICAgICAgCiAgICAgICAgICAgIFhbaV0gPSBkYXRhX3hbaW5kZXhfbGlzdFtpbmRleF1dIAogICAgICAgICAgICBZW2ldID0gZGF0YV95W2luZGV4X2xpc3RbaW5kZXhdXSAKICAgICAgICAgICAgCiAgICAgICAgICAgIGluZGV4ICs9IDEKICAgICAgICAKICAgICAgICB5aWVsZCgoWCwgWSkp"

# Print the solution to the given assignment
print(base64.b64decode(solution).decode("utf-8"))

def data_generator(batch_size, data_x, data_y, shuffle=True):

    data_lng = len(data_x) # len(data_x) must be equal to len(data_y)
    index_list = [*range(data_lng)] # Create a list with the ordered indexes of sample data
    
    # If shuffle is set to true, we traverse the list in a random way
    if shuffle:
        rnd.shuffle(index_list) # Inplace shuffle of the list
    
    index = 0 # Start with the first element
    while True:
        X = [0] * batch_size # We can create a list with batch_size elements. 
        Y = [0] * batch_size # We can create a list with batch_size elements. 
        
        for i in range(batch_size):
            
            # Wrap the index each time that we reach the end of the list
            if index >= data_lng:
                index = 0
                # Shuffle the index_list if shuffle is true
                if shuffle:
                    rnd.shuffle(index_list) # re-shuffle the order
            
            X[i] = data_x[index_list[in