In [None]:
import torchtext

# Define the tokenizer
tokenizer = torchtext.data.get_tokenizer('basic_english')

# Tokenize the data
tokenized_data = []
with open('data/chunk_1.csv', 'r') as f:
    lines = f.read().splitlines()
    for line in lines:
        tokenized_data.append(tokenizer(line))

# Build the vocabulary
vocab = torchtext.vocab.build_vocab_from_iterator(tokenized_data)

# Convert tokens to indices
sequences = []
for token_sequence in tokenized_data:
    sequences.append([vocab[token] for token in token_sequence])

In [None]:
# Print the first 5 sequences as indices
for sequence in sequences[:5]:
    print(sequence)

In [None]:
# Print the first 5 sequences as tokens
for sequence in sequences[:5]:
    print([vocab.get_itos()[index] for index in sequence])

In [None]:
import os
import torch
from torch import nn
from torch.utils.data import random_split, DataLoader
from torch.nn.utils.rnn import pad_sequence

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Set the hyperparameters
embedding_dim = 64
hidden_dim = 128
vocab_size = len(vocab)
print(vocab_size)
num_epochs = 1
batch_size = 32

In [None]:
# Define the model
class RNNModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(RNNModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        output, _ = self.rnn(x)
        output = self.fc(output)
        return output

def pad_collate(batch):
    (sequences, targets) = zip(*batch)
    sequences_pad = pad_sequence(sequences, batch_first=True, padding_value=0)
    targets_pad = pad_sequence(targets, batch_first=True, padding_value=0)
    return sequences_pad, targets_pad

# Initialize the model and move it to the device

model = RNNModel(vocab_size, embedding_dim, hidden_dim)
model = model.to(device)

In [None]:
# Set the loss function and optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

# Load model and optimizer states from a PRETRAINED MODEL

if os.path.exists('saved_models/checkpoint2.pth'):
    checkpoint = torch.load('saved_models/checkpoint2.pth')
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# Convert the sequences to tensors and move them to the device
sequences = [torch.tensor(sequence).to(device) for sequence in sequences]

# Prepare the data for training
inputs = [sequence[:-1] for sequence in sequences]
targets = [sequence[1:] for sequence in sequences]

# Combine the inputs and targets into a single dataset
dataset = list(zip(inputs, targets))

# Split the dataset into a training set and a validation set
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
# Set the number of training epochs and the learning rate
# Set sequence length
sequence_length = max(len(sequence) for sequence, _ in train_dataset)
# Set sequence length
#sequence_length = 100
# Pad or truncate all sequences to this length  FOR USE ONLY WITH GAN
#train_dataset = [sequence[:sequence_length] for sequence in train_dataset]


# Create data loaders for the training and validation sets
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=pad_collate)
val_loader = DataLoader(val_dataset, batch_size=batch_size, collate_fn=pad_collate)

In [None]:
# Train the model
for epoch in range(num_epochs):
# Inside the training loop...
    for i, (sequence, target) in enumerate(train_loader):
        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = model(sequence)

        # Reshape the output and target tensors
        output = output.view(-1, output.shape[-1])  # shape: (sequence_length * batch_size, num_classes)
        target = target.view(-1)  # shape: (sequence_length * batch_size,)

        # for sequence1, sequence2 in zip(output.int(), target.int()):
        #     print(' '.join([vocab.get_itos()[index] for index in sequence]))
        #     print(' '.join([vocab.get_itos()[index] for index in sequence2]))
        #     print("-----------------------------------------------")

        # Compute the loss
        loss = loss_function(output, target)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        print("Step: " + str(i) + " loss: " + str(loss.item()))

    print(f'Epoch {epoch+1}, Training Loss: {loss.item()}')

    # Validate the model
    with torch.no_grad():
        val_loss = 0
        for sequence, target in val_loader:
            output = model(sequence)

            # Reshape the output and target tensors
            output = output.view(-1, output.shape[-1])  # shape: (sequence_length * batch_size, num_classes)
            target = target.view(-1)  # shape: (sequence_length * batch_size,)

            loss = loss_function(output, target)
            val_loss += loss.item()

    print(f'Epoch {epoch+1}, Validation Loss: {(val_loss / len(val_loader)).item()}')

#The training loop should execute below

In [None]:
pred = model.forward(torch.tensor([vocab['[resourcetype]']]))

In [None]:
pred.argmax(dim=1)[0].item()

In [None]:
vocab.get_itos()[pred.argmax(dim=1)[0].item()]

In [None]:
for sequence1, sequence2 in zip(output.int(), target.int()):
    print(' '.join([vocab.get_itos()[index] for index in sequence]))

In [None]:
import os

saved_model_dir = "./saved_models/"

if not os.path.exists(saved_model_dir):
    os.makedirs(saved_model_dir)


In [None]:

# Suppose 'model' is your RNNModel
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
}, 'saved_models/checkpoint2.pth')

In [None]:
for sequence in train_dataset[0]:
    print(' '.join([vocab.get_itos()[index] for index in sequence]))

In [None]:
train_dataset[0][0][0]

THIS IS THE SECTION WITH AN EXPERIMENTAL GAN

In [None]:
# Define the generator (similar to the original RNN model)
class Generator(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(Generator, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.GRU(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        output, _ = self.rnn(x)
        output = self.fc(output)
        return output

# Define the discriminator (a binary classifier)
class Discriminator(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(Discriminator, self).__init__()
        self.rnn = nn.GRU(input_dim, hidden_dim, num_layers=2, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, 1)

    def forward(self, x):
        output, _ = self.rnn(x)
        output = self.fc(output[:, -1, :])
        return output


In [None]:
# Set the number of training epochs and the learning rate
num_epochs = 50
learning_rate = 0.001

# Initialize the generator and discriminator, and move them to the device
generator = Generator(vocab_size, embedding_dim, hidden_dim).to(device)
discriminator = Discriminator(embedding_dim, hidden_dim).to(device)

# Set the loss function and the optimizers
loss_function = nn.BCEWithLogitsLoss()
optimizer_G = torch.optim.Adam(generator.parameters(), lr=learning_rate)
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    # Training
    for i, (real_sequences, _) in enumerate(train_loader):
        # Move the real sequences to the device
        real_sequences = real_sequences.to(device)

        # Create random noise sequences as input for the generator
        noise_sequences = torch.randint(vocab_size, (batch_size, sequence_length), device=device)

        # Use the generator to create fake sequences
        fake_sequences = generator(noise_sequences)

        # Train the discriminator
        optimizer_D.zero_grad()
        real_output = discriminator(real_sequences)
        fake_output = discriminator(fake_sequences.detach())
        loss_D_real = loss_function(real_output, torch.ones_like(real_output))
        loss_D_fake = loss_function(fake_output, torch.zeros_like(fake_output))
        loss_D = (loss_D_real + loss_D_fake) / 2
        loss_D.backward()
        optimizer_D.step()

        # Train the generator
        optimizer_G.zero_grad()
        fake_output = discriminator(fake_sequences)
        loss_G = loss_function(fake_output, torch.ones_like(fake_output))
        loss_G.backward()
        optimizer_G.step()

    # Validation
    with torch.no_grad():
        val_loss_D = 0
        val_loss_G = 0
        for real_sequences in val_loader:
            real_sequences = real_sequences.to(device)
            noise_sequences = torch.randint(vocab_size, (batch_size, sequence_length), device=device)
            fake_sequences = generator(noise_sequences)
            real_output = discriminator(real_sequences)
            fake_output = discriminator(fake_sequences)
            val_loss_D += (loss_function(real_output, torch.ones_like(real_output)) + loss_function(fake_output, torch.zeros_like(fake_output))) / 2
            val_loss_G += loss_function(fake_output, torch.ones_like(fake_output))

    print(f'Epoch {epoch+1}/{num_epochs}, Loss D: {loss_D.item()}, Loss G: {loss_G.item()}, Val Loss D: {val_loss_D / len(val_loader)}, Val Loss G: {val_loss_G / len(val_loader)}')



END GAN SECTION

In [None]:
# Define your seed sequence
seed_sequence = ['[resourceType]CarePlan']
#CarePlan,"[resourceType] CarePlan [id] 5026185d-3747-e969-6564-c4aec1b8d06e [meta][profile][0] http://hl7.org/fhir/us/core/StructureDefinition/us-core-careplan [text][status] generated [text][div] <div xmlns=""http://www.w3.org/1999/xhtml"">Care Plan for Infectious disease care plan (record artifact).<br/>Activities: <ul><li>Infectious disease care plan (record artifact)</li><li>Infectious disease care plan (record artifact)</li><li>Infectious disease care plan (record artifact)</li></ul><br/>Care plan is meant to treat Suspected COVID-19.</div> [status] completed [intent] order [category][0][coding][0][system] http://hl7.org/fhir/us/core/CodeSystem/careplan-category [category][0][coding][0][code] assess-plan [category][1][coding][0][system] http://snomed.info/sct [category][1][coding][0][code] 736376001 [category][1][coding][0][display] Infectious disease care plan (record artifact) [category][1][text] Infectious disease care plan (record artifact) [subject][reference] Patient/2b7d4554-d6e4-9f48-2ab0-0ddf088fe19d [encounter][reference] Encounter/723aa866-47af-c57d-fe71-cebfba9b15cf [period][start] 2020-02-28T22:59:13-05:00 [period][end] 2020-03-14T02:15:17-04:00 [careTeam][0][reference] CareTeam/49500403-2881-84d8-cb65-015bfea4c65d [addresses][0][reference] Condition/c925509e-0334-6544-6af4-e6ffa96144b3 [activity][0][detail][code][coding][0][system] http://snomed.info/sct [activity][0][detail][code][coding][0][code] 444908001 [activity][0][detail][code][coding][0][display] Isolation nursing in negative pressure isolation environment (regime/therapy) [activity][0][detail][code][text] Isolation nursing in negative pressure isolation environment (regime/therapy) [activity][0][detail][status] completed [activity][0][detail][location][display] NEWTON-WELLESLEY HOSPITAL [activity][1][detail][code][coding][0][system] http://snomed.info/sct [activity][1][detail][code][coding][0][code] 409524006 [activity][1][detail][code][coding][0][display] Airborne precautions (procedure) [activity][1][detail][code][text] Airborne precautions (procedure) [activity][1][detail][status] completed [activity][1][detail][location][display] NEWTON-WELLESLEY HOSPITAL [activity][2][detail][code][coding][0][system] http://snomed.info/sct [activity][2][detail][code][coding][0][code] 409526008 [activity][2][detail][code][coding][0][display] Personal protective equipment (physical object) [activity][2][detail][code][text] Personal protective equipment (physical object) [activity][2][detail][status] completed [activity][2][detail][location][display] NEWTON-WELLESLEY HOSPITAL"
# Convert the seed sequence to a tensor of indices
seed_tensor = torch.tensor([vocab[token] for token in seed_sequence]).unsqueeze(0).to(device)

# Set the model to evaluation mode
model.eval()

# Initialize the generated sequence with the seed sequence
generated_sequence = seed_sequence

# Generate a sequence of length 100
for _ in range(100):
    with torch.no_grad():
        # Get the model's prediction for the next token
        output = model(seed_tensor)

        # Get the index of the predicted token
        predicted_index = output.argmax(dim=-1)[0, -1].item()

        # Convert the index to a token
        predicted_token = vocab.get_itos()[predicted_index]

        # Add the predicted token to the generated sequence
        generated_sequence.append(predicted_token)

        # Add the predicted index to the seed tensor
        seed_tensor = torch.cat([seed_tensor, torch.tensor([[predicted_index]]).to(device)], dim=1)

# Print the generated sequence
print(' '.join(generated_sequence))

In [None]:
import re
import json

def string_to_dict(s):
    split_list = re.findall('\[.*?\]\s[^\[]*|\[.*?\]\s|\s', s)
    result = {}

    for item in split_list:
        key, value = item.split('] ')
        keys = key.replace("][", "|").strip("[").split("|")
        temp = result
        for k in keys[:-1]:
            if not k.isdigit():
                if k not in temp:
                    # If the next key is a number, then this key should be a list
                    if keys[keys.index(k) + 1].isdigit():
                        temp[k] = []
                    else:
                        temp[k] = {}
                temp = temp[k]
            else:
                if len(temp) <= int(k):
                    temp.append({})  # append a new dictionary to the list
                temp = temp[int(k)]  # Go to the dictionary at index k in the list

        if isinstance(temp, list):
            if len(temp) <= int(keys[-1]):
                temp.append(value.strip())  # append the new value and strip whitespace
            else:  
                temp[int(keys[-1])] = value.strip()  # if the index already exists, replace the value and strip whitespace
        else:
            temp[keys[-1]] = value.strip()  # strip whitespace

    return result



def dict_to_json(dict_obj):
    return json.dumps(dict_obj)

In [None]:
fhir_dict = string_to_dict(generated_sequence)
json_string = dict_to_json(fhir_dict)
print(json_string)

In [None]:
def check_cardinality(value, min, max):
    """
    Check that the cardinality of the value is within the allowed range.
    """
    # Convert '*' to a large number
    max = float('inf') if max == '*' else int(max)

    # If the value is a list, check the length of the list
    if isinstance(value, list):
        length = len(value)
    else:
        # Otherwise, consider it as a single value
        length = 1

    if length < int(min) or length > max:
        raise ValueError(f"Cardinality {length} not in range {min}..{max}")

def check_type(value, expected_type):
    """
    Check that the value is of the correct type.
    """
    # Get the actual type of the value
    actual_type = type(value).__name__

    # The expected type is a list of dictionaries, extract the code field
    expected_type_codes = [t['code'] for t in expected_type]

    # Map the FHIR data types to Python data types
    type_mapping = {
        'boolean': 'bool',
        'integer': 'int',
        'string': 'str',
        'decimal': 'float',
        # Add more mappings as needed
    }

    expected_python_types = [type_mapping[code] for code in expected_type_codes if code in type_mapping]

    if actual_type not in expected_python_types:
        raise ValueError(f"Type {actual_type} not in {expected_python_types}")
      
def validate_resource(resource, structure_definitions):
    # Identify the resource type
    resource_type = resource['resourceType']

    # Find the corresponding StructureDefinition
    structure_definition = next(sd for sd in structure_definitions if sd['type'] == resource_type)

    # Check each field in the resource
    for field, value in resource.items():
        if field not in structure_definition['element']:
            raise ValueError(f"Unexpected field {field}")
        element_definition = structure_definition['element'][field]
        check_cardinality(value, element_definition['min'], element_definition['max'])
        check_type(value, element_definition['type'])

    # Check for missing fields
    for field, element_definition in structure_definition['element'].items():
        if element_definition['min'] > 0 and field not in resource:
            raise ValueError(f"Missing required field {field}")

In [None]:
validate_resource(json_string, 'CarePlan')