# Tabula Muris Ingest

Ingest, wrangle and store [Tabula Muris](http://tabula-muris.ds.czbiohub.org/) into a [TFRecordDataset](https://docs.google.com/presentation/d/16kHNtQslt-yuJ3w8GIx-eEH6t_AvFeQOchqGRFpAD7U/edit) optimized for multi-GPU distributed machine learning. See [github](https://github.com/czbiohub/tabula-muris) for details on the data. 

Note: Initially only ingesting FACS dataset as droplet has significant QC dropout and several tissues missing.

In [11]:
import os
import re
import glob
import json
import numpy as np
import pandas as pd
import tensorflow as tf

np.random.seed(42)  # reproducibility

## Download

Download the dataset and unpack into a local directory. This could be /tmp if we only plan to use the S3 copy. I usually have ~/data symlinked to a local scratch disk location for speed and to keep home directories on the shared big memory servers down in size. For a much larger dataset with the files separately downloadable it would be better to download directly into memory, convert to tfrecord and push to S3.

In [12]:
# Switch to data directory for storing download and tfrecords
!mkdir -p ~/data/tabula-muris
os.chdir(os.path.expanduser("~/data/tabula-muris"))

In [13]:
# Download the FACS data set and unzip into per tissue expression csv files 
!wget --no-clobber --output-document data.zip https://ndownloader.figshare.com/articles/5715040/versions/1
!if [ ! -f FACS.zip ]; then unzip data.zip; fi
!if [ ! -d FACS ]; then unzip FACS.zip; fi

--2018-10-11 02:46:25--  https://ndownloader.figshare.com/articles/5715040/versions/1
Resolving ndownloader.figshare.com (ndownloader.figshare.com)... 34.248.163.7, 34.250.206.8, 34.255.241.113, ...
Connecting to ndownloader.figshare.com (ndownloader.figshare.com)|34.248.163.7|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 308503289 (294M) [application/zip]
Saving to: 'data.zip'


2018-10-11 02:46:56 (9.46 MB/s) - 'data.zip' saved [308503289/308503289]

Archive:  data.zip
 extracting: FACS.zip                
 extracting: metadata_FACS.csv       
 extracting: annotations_FACS.csv    
Archive:  FACS.zip
   creating: FACS/
  inflating: FACS/Lung-counts.csv    
  inflating: FACS/Spleen-counts.csv  
  inflating: FACS/Trachea-counts.csv  
  inflating: FACS/Brain_Microglia-counts.csv  
  inflating: FACS/Kidney-counts.csv  
  inflating: FACS/Pancreas-counts.csv  
  inflating: FACS/Skin-counts.csv    
  inflating: FACS/Tongue-counts.csv  
  inflating: FACS/Colon-coun

In [14]:
tissues = sorted([re.findall(r"FACS\/(\w.+?)-counts.csv", f)[0] for f in glob.glob("FACS/*-counts.csv")])
print("Found {} tissues: {}".format(len(tissues), tissues))

Found 18 tissues: ['Bladder', 'Brain_Microglia', 'Brain_Neurons', 'Colon', 'Fat', 'Heart', 'Kidney', 'Liver', 'Lung', 'Mammary', 'Marrow', 'Muscle', 'Pancreas', 'Skin', 'Spleen', 'Thymus', 'Tongue', 'Trachea']


## Wrangle
Divide each tissue.csv into train and test sets and store into .tfrecord files

In [15]:
from sklearn.model_selection import train_test_split

def write_tfrecord(writer, features, labels):
    writer.write(tf.train.Example(features=tf.train.Features(
        feature = {
            "features": tf.train.Feature(float_list=tf.train.FloatList(value=features.values)),
            "labels": tf.train.Feature(int64_list=tf.train.Int64List(value=labels))
        })).SerializeToString())

num_train_samples = 0
num_test_samples = 0
for tissue in tissues:
    print("{}: ".format(tissue), end="")
    # Load expression for single tissue and transpose rows=features to columns=features 
    expression = pd.read_csv("FACS/{}-counts.csv".format(tissue), index_col=0) \
        .astype(np.int32).T.sort_index(axis="columns")
    print(expression.shape[0], end=" ")
    
    train, test = train_test_split(expression, test_size=0.2)
    num_train_samples += train.shape[0]
    num_test_samples += test.shape[0]

    # Compressed which saves significant space as this is sparse data:
    # Bladder: csv (full) = 81M, tfrecord (train) = 118M, tfrecord.gzip (train) = 14M
    options=tf.python_io.TFRecordOptions(
        compression_type=tf.python_io.TFRecordCompressionType.GZIP)

    with tf.python_io.TFRecordWriter("FACS/{}.train.gzip.tfrecord".format(tissue), 
                                     options=options) as writer:
            for (_, features) in train.iterrows():
                write_tfrecord(writer, features, [tissues.index(tissue)])
                
    with tf.python_io.TFRecordWriter("FACS/{}.test.gzip.tfrecord".format(tissue), 
                                     options=options) as writer:
            for (_, features) in test.iterrows():
                write_tfrecord(writer, features, [tissues.index(tissue)])
            
# Save meta data
import json
with open("FACS/metadata.json", "w") as f:
    f.write(json.dumps({
        "tissues": tissues,
        "num_train_samples": num_train_samples,
        "num_test_samples": num_test_samples,
        "genes": expression.columns.tolist()}))

print("\nFinished. \nTotal Training Samples: {}, \nTotal Test Samples: {}".format(
    num_train_samples, num_test_samples))

Bladder: 1638 Brain_Microglia: 4762 Brain_Neurons: 5799 Colon: 4149 Fat: 5862 Heart: 7115 Kidney: 865 Liver: 981 Lung: 1923 Mammary: 2663 Marrow: 5355 Muscle: 2102 Pancreas: 1961 Skin: 2464 Spleen: 1718 Thymus: 1580 Tongue: 1432 Trachea: 1391 
Finished. 
Total Training Samples: 43001, 
Total Test Samples: 10759


## Upload
Upload the tfrecord and metadata into S3 so we can train on it from pods in the k8s cluster. We could move this into the tfrecord generation above for a large dataset to reduce the amount stored on disk.

In [None]:
# Delete all s3 files so we start clean
# !aws s3 rm --recursive s3://stuartlab/tabula-muris/ --profile prp --endpoint https://s3.nautilus.optiputer.net

In [20]:
# Use the aws cli's rsync like sync command
!aws --profile {os.getenv("AWS_PROFILE")} --endpoint {os.getenv("AWS_S3_ENDPOINT")} \
    s3 sync FACS/ s3://stuartlab/tabula-muris/dataset --exclude "*.csv"

upload: FACS/Bladder.test.gzip.tfrecord to s3://stuartlab/tabula-muris/dataset/Bladder.test.gzip.tfrecord
upload: FACS/Brain_Microglia.test.gzip.tfrecord to s3://stuartlab/tabula-muris/dataset/Brain_Microglia.test.gzip.tfrecord
upload: FACS/Brain_Neurons.test.gzip.tfrecord to s3://stuartlab/tabula-muris/dataset/Brain_Neurons.test.gzip.tfrecord
upload: FACS/Brain_Microglia.train.gzip.tfrecord to s3://stuartlab/tabula-muris/dataset/Brain_Microglia.train.gzip.tfrecord
upload: FACS/Bladder.train.gzip.tfrecord to s3://stuartlab/tabula-muris/dataset/Bladder.train.gzip.tfrecord
upload: FACS/Colon.test.gzip.tfrecord to s3://stuartlab/tabula-muris/dataset/Colon.test.gzip.tfrecord
upload: FACS/Brain_Neurons.train.gzip.tfrecord to s3://stuartlab/tabula-muris/dataset/Brain_Neurons.train.gzip.tfrecord
upload: FACS/Kidney.train.gzip.tfrecord to s3://stuartlab/tabula-muris/dataset/Kidney.train.gzip.tfrecord
upload: FACS/Liver.test.gzip.tfrecord to s3://stuartlab/tabula-muris/dataset/Liver.test.gzip.t

In [None]:
# Copy each file via boto3...preserved for reference when/if we overlap with tfrecord creation above
# import glob
# import boto3

# bucket_name = "stuartlab"
# project_name = "tabula-muris"  # Dataset folder and output location

# session = boto3.session.Session(profile_name=os.getenv("AWS_PROFILE"))
# s3 = session.resource("s3", endpoint_url=os.getenv("AWS_S3_ENDPOINT"))
# bucket = s3.Bucket(bucket_name)

# # Upload the json and tfrecord files to s3. TFRecordWrite doesn't appear to support 
# # writing directly to S3, otherwise we could read, wrangle and write to s3 from memory
# for path in glob.glob("FACS/*.tfrecord") + glob.glob("FACS/*.json"):
#     print(path)
#     bucket.Object("{}/{}".format(project, path)).upload_file(path, ExtraArgs={'ACL':'public-read'})
# print("Done.")

In [21]:
!aws --profile {os.getenv("AWS_PROFILE")} --endpoint {os.getenv("AWS_S3_ENDPOINT")} \
    s3 ls s3://stuartlab/tabula-muris/dataset/ 

2018-10-11 03:04:32    3601253 Bladder.test.gzip.tfrecord
2018-10-11 03:04:34   13949030 Bladder.train.gzip.tfrecord
2018-10-11 03:04:33    6087427 Brain_Microglia.test.gzip.tfrecord
2018-10-11 03:04:34   24800122 Brain_Microglia.train.gzip.tfrecord
2018-10-11 03:04:33    6953732 Brain_Neurons.test.gzip.tfrecord
2018-10-11 03:04:35   27735153 Brain_Neurons.train.gzip.tfrecord
2018-10-11 03:04:34    8509884 Colon.test.gzip.tfrecord
2018-10-11 03:04:37   33913093 Colon.train.gzip.tfrecord
2018-10-11 03:04:37    9294719 Fat.test.gzip.tfrecord
2018-10-11 03:04:38   36352355 Fat.train.gzip.tfrecord
2018-10-11 03:04:37    8437405 Heart.test.gzip.tfrecord
2018-10-11 03:04:40   33972222 Heart.train.gzip.tfrecord
2018-10-11 03:04:37     656415 Kidney.test.gzip.tfrecord
2018-10-11 03:04:37    2540177 Kidney.train.gzip.tfrecord
2018-10-11 03:04:37    1459875 Liver.test.gzip.tfrecord
2018-10-11 03:04:38    5672397 Liver.train.gzip.tfrecord
2018-10-11 03:04:38    2785052 Lung.test.g