In [None]:
def decode_to_strings(d):
    newmetadata = {}
    for k,v in d.items():
        newk,newv = k,v
        if isinstance(v, bytes):
            newv = v.decode("utf-8") 
        if isinstance(v, dict):
            newv = decode_to_strings(v)
        if isinstance(k, bytes):
            newk = k.decode("utf-8") 
        newmetadata[newk] = newv
    return newmetadata

with np.load('dsprites-dataset/dsprites_ndarray_co1sh3sc6or40x32y32_64x64.npz',encoding='latin1', allow_pickle=True) as data:
    labels = data['latents_values']
    with open('latents_values.pickle', 'wb') as handle:
        pickle.dump(labels, handle, protocol=pickle.HIGHEST_PROTOCOL) 

In [None]:
with open('dsprites.pickle', 'rb') as f:
    pdata = pickle.load(f)
    fname_template='dsprites-dataset/images/{cap}.npy'
    for i in range(len(pdata["dataset"])):
        with open(fname_template.format(cap=i) , 'wb') as f:
            np.save(f, pdata["dataset"][i] )
    with open('labels.pickle', 'wb') as handle:
        pickle.dump(pdata["labels"], handle, protocol=pickle.HIGHEST_PROTOCOL) 

In [None]:
#im, la = next(dataiter)

def draw_img(index, images, labels):
    images = images.view((BATCH_SIZE, 1,64,64))
    sample = images[index].view(1,1,64,64)
    model.train()
    pred = model(sample)
    model.eval()
    img_grid = torchvision.utils.make_grid(sample, nrow=16)
    print(f"({pred.tolist()}, {labels[index]})" )
    matplotlib_imshow(img_grid, one_channel=True)
    
#draw_img(5, im, la)

In [None]:


# Helper function for inline image display
def matplotlib_imshow(img):
    img = img[0]
    plt.imshow(img, cmap="Greys")

dataiter = iter(training_loader)
images, labels = next(dataiter)
images = images.view((BATCH_SIZE, 1,64,64))
# Create a grid from the images and show them
img_grid = torchvision.utils.make_grid(images, nrow=16)
matplotlib_imshow(img_grid)

In [None]:
class ShapeConvolutionalNeuralNetwork(nn.Module):

    def __init__(self):
        super(ConvolutionalNeuralNetwork, self).__init__()
        self.convolutional_layers = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=4, stride=2, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=1),
            nn.Conv2d(16, 32, kernel_size=8, stride=2, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=1),
            nn.Conv2d(32, 32, kernel_size=4, stride=2, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=1),
        )
        self.linear_layers = nn.Sequential(
            nn.Linear(128, 64),
            nn.Tanh(),
            nn.Linear(64, 3),
        )

    def forward(self, x):
        x = self.convolutional_layers(x)
        x = x.view(x.size(0), -1)
        x = self.linear_layers(x)
        return x

In [None]:
# big network for orientation
class OrientationConvolutionalNeuralNetwork(nn.Module):

    def __init__(self):
        super(OrientationConvolutionalNeuralNetwork, self).__init__()
        self.convolutional_layers = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=8, stride=2, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=1),
            nn.Conv2d(32, 32, kernel_size=8, stride=2, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=1),
            nn.Conv2d(32, 64, kernel_size=4, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=1),
        )
        self.linear_layers = nn.Sequential(
            nn.Linear(1600, 160),
            nn.Tanh(),
            nn.Linear(160, 40),
        )

    def forward(self, x):
        x = self.convolutional_layers(x)
        x = x.view(x.size(0), -1)
        x = self.linear_layers(x)
        return x

In [None]:
# small network for orientation
class OrientationConvolutionalNeuralNetwork(nn.Module):

    def __init__(self):
        super(OrientationConvolutionalNeuralNetwork, self).__init__()
        self.convolutional_layers = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=8, stride=2, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=1),
            nn.Conv2d(16, 16, kernel_size=8, stride=2, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=1),
            nn.Conv2d(16, 16, kernel_size=4, stride=2, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=1),
        )
        self.linear_layers = nn.Sequential(
            nn.Linear(64, 40),
            nn.ReLU(),
            nn.Linear(40, 40),
        )

    def forward(self, x):
        x = self.convolutional_layers(x)
        x = x.view(x.size(0), -1)
        x = self.linear_layers(x)
        return x

In [None]:
# dataset for orientation (40 classes)

class DSpritesImageDataset(Dataset):
    def __init__(self, train=True, transform=None):
        self.train = train
        self.transform = transform
        self.img_dir = "dsprites-dataset/images/"
        with open('labels.pickle', 'rb') as f:
            labels = pickle.load(f)
            self.labels = labels
        #dataset_zip = np.load('dsprites-dataset/dsprites_ndarray_co1sh3sc6or40x32y32_64x64.npz',encoding='bytes', allow_pickle=True)
        #self.metadata = dataset_zip['metadata'][()]
        #self.latents_values = dataset_zip['latents_values']
        
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        img_path = os.path.join(self.img_dir, f'{index}.npy')
        image = np.load(img_path)
        image = torch.from_numpy(np.asarray(image, dtype=np.float32)).view(1,64,64)
        target = self.labels[index][3]
        # use actual numerical value instead
        # target = float(metadata["latents_possible_values"]["orientation"][target])
        # use one-hot-encoding
        # target = one_hot(target)
        if not self.train:
            return (image, target, index)
        return (image, target)


In [None]:
# find class distribution
cond_layer = 'linear_layers.2'
labelcount = []
for blub in tqdm(range(100)):
    dataiter = iter(training_loader)
    images, labels = next(dataiter)
    images = images.view((BATCH_SIZE, 1,64,64))
    output = model(images)
    pred = output.data.max(1, keepdim=True)[1]
    preds = pred.flatten().tolist()
    res = [f'{preds[i]}_{labels[i]}' for i in range(BATCH_SIZE)]
    labelcount += res
collections.Counter(labelcount)

In [None]:
# test if biased 
is_biased = []
for k, data in tqdm(enumerate(training_loader)):
    inputs, labels, indices = data
    latents = dsprites_dataset_test.labels
    is_biased += [latents[i][4] < 16 and latents[i][5] < 16 for i in indices]
print(collections.Counter(is_biased))