This code is for a 2 state markov chain

LOAD THE REQUIRED LIBRARIES

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers

DEFINE THE GENERATOR AND DISCRIMINATOR MODEL ARCHITECHTURES

In [3]:
def make_generator_model():
    model = tf.keras.Sequential()
    model.add(layers.Dense(128, use_bias=False, input_shape=(100,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Dense(256, use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Dense(20, activation='tanh'))

    return model

In [5]:
def make_discriminator_model():
    model = tf.keras.Sequential()
    
    model.add(layers.Dense(256, input_shape=(20,)))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Dense(128))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Dense(1, activation='sigmoid'))

    return model

DEFINE THE LOSSES

In [8]:
# This method returns a helper function to compute cross entropy loss
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [9]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

In [10]:
def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

HERE I AM DEFININ THE OPTIMIZERS

In [11]:
# Define the Generator optimizer
generator_optimizer = tf.keras.optimizers.legacy.Adam(1e-4)

In [12]:

# Define the Discriminator optimizer
discriminator_optimizer = tf.keras.optimizers.legacy.Adam(1e-4)

THIS IS THE TRAIN STEP

In [13]:
# Define the training function
@tf.function
def train_step(real_sequences):

    # Generate the latent noise
    noise = tf.random.normal([real_sequences.shape[0], 100])

    # Calculate the losses
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_sequences = generator(noise, training=True)

        real_output = discriminator(real_sequences, training=True)
        fake_output = discriminator(generated_sequences, training=True)

        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)

    # Calculate the gradients
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    # Update the generator and discriminator
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))


In [14]:
# Define the number of training epochs and batch size
num_epochs = 500
batch_size = 128

GENERATING TRAINING DATA

In [15]:
# Define the training dataset
# Replace `training_data` with your own training dataset of shape (num_samples, 20)
training_data = np.random.randint(0, 10, size=(1000, 20))
print(training_data.shape)

(1000, 20)


In [16]:
#declare a transition matrix

n = 2
values = [[0.15 , 0.85], [0.4 , 0.6] ]

matrix = np.array(values)

print(matrix)

cumm_matrix = np.array([[0.15 , 1], [0.4 , 1]])

print(cumm_matrix)

[[0.15 0.85]
 [0.4  0.6 ]]
[[0.15 1.  ]
 [0.4  1.  ]]


In [17]:
import random

array = np.zeros(200000)  
random_array = np.random.rand(199999)
array[0] = 1
cnt1 = 0
cnt2 = 0

for p in range(199999):
    val = int(array[p])
    x = random_array[p]

    fin = 0
    #for q in range(2):
    if(val == -1):
        val = 0
        
    if(x <= cumm_matrix[val][0]):
        fin = -1
        cnt1 = cnt1 + 1
    else: 
        fin = 1
        cnt2 = cnt2 + 1
    

    array[p+1] = fin


print(cnt1)
print(cnt2)

63888
136111


In [18]:
zz = 0
zo = 0
oz = 0
oo = 0


for p in range(199999):
    #noise = tf.random.normal([1, 100])
    #generated_sequence = generator(noise, training=False).numpy().squeeze()

    
    if ((array[p] == -1) and (array[p+1] == -1)):
        zz = zz + 1
    if ((array[p] == -1) and (array[p+1] == 1)):
        zo = zo + 1
    if ((array[p] == 1) and (array[p+1] == -1)):
        oz = oz + 1
    if ((array[p] == 1) and (array[p+1] == 1)):
        oo = oo + 1

v1 = zz/(zz + zo)
v2 = zo/(zz + zo)

v3 = oz/(oz + oo)
v4 = oo/(oz + oo)

matrix = np.array([[v1 , v2],
                   [v3,  v4]])

print(matrix)

[[0.15073253 0.84926747]
 [0.39863053 0.60136947]]


In [19]:
training_data = array.reshape(10000, 20)

In [20]:
print(training_data)

[[ 1. -1.  1. ... -1.  1.  1.]
 [-1.  1. -1. ... -1. -1.  1.]
 [-1.  1. -1. ...  1.  1. -1.]
 ...
 [ 1. -1.  1. ... -1.  1. -1.]
 [ 1.  1.  1. ... -1.  1.  1.]
 [ 1.  1.  1. ...  1.  1.  1.]]


In [21]:
print(training_data.shape)

(10000, 20)


In [22]:
# Create TensorFlow Dataset from the training data
train_dataset = tf.data.Dataset.from_tensor_slices(training_data).shuffle(len(training_data)).batch(batch_size)


In [23]:
print(train_dataset)

<BatchDataset element_spec=TensorSpec(shape=(None, 20), dtype=tf.float64, name=None)>


INITIALIZE THE GENERATOR AND DISCRIMINATOR MODELS

In [24]:
# Instantiate the Generator and Discriminator models
generator = make_generator_model()
discriminator = make_discriminator_model()

HERE THE TRAINING OCCURS

In [25]:

# Training loop
for epoch in range(num_epochs):
    for sequences in train_dataset:
        train_step(sequences)

    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1}/{num_epochs} completed.")


Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Constant'
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Constant'


  output, from_logits = _get_logits(


Epoch 10/500 completed.
Epoch 20/500 completed.
Epoch 30/500 completed.
Epoch 40/500 completed.
Epoch 50/500 completed.
Epoch 60/500 completed.
Epoch 70/500 completed.
Epoch 80/500 completed.
Epoch 90/500 completed.
Epoch 100/500 completed.
Epoch 110/500 completed.
Epoch 120/500 completed.
Epoch 130/500 completed.
Epoch 140/500 completed.
Epoch 150/500 completed.
Epoch 160/500 completed.
Epoch 170/500 completed.
Epoch 180/500 completed.
Epoch 190/500 completed.
Epoch 200/500 completed.
Epoch 210/500 completed.
Epoch 220/500 completed.
Epoch 230/500 completed.
Epoch 240/500 completed.
Epoch 250/500 completed.
Epoch 260/500 completed.
Epoch 270/500 completed.
Epoch 280/500 completed.
Epoch 290/500 completed.
Epoch 300/500 completed.
Epoch 310/500 completed.
Epoch 320/500 completed.
Epoch 330/500 completed.
Epoch 340/500 completed.
Epoch 350/500 completed.
Epoch 360/500 completed.
Epoch 370/500 completed.
Epoch 380/500 completed.
Epoch 390/500 completed.
Epoch 400/500 completed.
Epoch 410

In [26]:
# Generate a sequence of length 20
noise = tf.random.normal([1, 100])
generated_sequence = generator(noise, training=False).numpy().squeeze()

print("Generated Sequence:", generated_sequence)


Generated Sequence: [ 1.          1.          1.          1.          0.89844203 -1.
  0.9999987   1.         -1.          0.99999416  1.          0.9989759
  0.82211626  1.          1.         -0.9994055   0.9999714   0.99941415
  0.99965197  0.95481765]


GENERATING SEQUENCES AND FINDING THE OUTPUT TRANSITION MATRIX

In [27]:
zz = 0
zo = 0
oz = 0
oo = 0


for q in range(5000):
    # LATENT VARIABLE
    noise = tf.random.normal([1, 100])

    # USING THE GENERATOR TO GENERATE
    generated_sequence = generator(noise, training=False).numpy().squeeze()

    # ROUNDING OFF
    for p in range(20):
        if(generated_sequence[p] < 0):
            generated_sequence[p] = -1
        else:
            generated_sequence[p] = 1

    for p in range(19):
        if ((generated_sequence[p] == -1) and (generated_sequence[p+1] == -1)):
            zz = zz + 1
        if ((generated_sequence[p] == -1) and (generated_sequence[p+1] == 1)):
            zo = zo + 1
        if ((generated_sequence[p] == 1) and (generated_sequence[p+1] == -1)):
            oz = oz + 1
        if ((generated_sequence[p] == 1) and (generated_sequence[p+1] == 1)):
            oo = oo + 1

# CALCULATING THE TRANSITION MATRIX
v1 = zz/(zz + zo)
v2 = zo/(zz + zo)

v3 = oz/(oz + oo)
v4 = oo/(oz + oo)

matrix = np.array([[v1 , v2],
                   [v3,  v4]])





#print(generated_sequence)

In [28]:
values = np.array(values)

print("Original Transition matrix")
print(np.array_str(values, precision=4, suppress_small=True))

print("\nGenerated Transition Matrix")
print(np.array_str(matrix, precision=4, suppress_small=True))

Original Transition matrix
[[0.15 0.85]
 [0.4  0.6 ]]

Generated Transition Matrix
[[0.158  0.842 ]
 [0.3938 0.6062]]


In [29]:
from tabulate import tabulate

In [30]:
def print_matrix_as_table(matrix, matrix_name):
    headers = ["State {}".format(i + 1) for i in range(matrix.shape[1])]
    rows = ["State {}".format(i + 1) for i in range(matrix.shape[0])]
    table = tabulate(matrix, headers=headers, showindex=rows, tablefmt="grid")
    print(f"\n{matrix_name}:\n{table}")

# Print matrices as tables with labels
print_matrix_as_table(values, "Original Transition Matrix")
print_matrix_as_table(matrix, "Generated Transition Matrix")


Original Transition Matrix:
+---------+-----------+-----------+
|         |   State 1 |   State 2 |
| State 1 |      0.15 |      0.85 |
+---------+-----------+-----------+
| State 2 |      0.4  |      0.6  |
+---------+-----------+-----------+

Generated Transition Matrix:
+---------+-----------+-----------+
|         |   State 1 |   State 2 |
| State 1 |   0.15804 |   0.84196 |
+---------+-----------+-----------+
| State 2 |   0.3938  |   0.6062  |
+---------+-----------+-----------+
