# Data Exploration in Cloud 

In [1]:
import os
import boto3
import pickle
import requests
import pandas as pd
import numpy as np
import shutil

In [2]:
# Let's use Amazon S3
s3 = boto3.resource('s3')

elasticbeanstalk-us-east-1-727409249887
elasticbeanstalk-us-east-2-727409249887
webapp-pablo


## Downloading Dataframe from S3
Here we get the cleaned dataframe with all the metadata about all the pictures. From it we can create the file directory tree in S3, and download all the appropriate images to it.

In [15]:
s3.Bucket('pablo-mushroom-bucket').download_file(
                            'combined_dataset_df.p', 'data.p')
df = pickle.load(open('data.p','rb'), encoding='latin1')

#creating truly unique identifier for each image
df['Unique_ID'] = \
    df['Data_Source'].apply(lambda x: x + '_') \
    + df['id'].apply(lambda x: str(x)) 

We will define some useful functions for doing this. Essentially creating the appropriate file directory, generate the url, and transfer the appropriate image to S3. We may have to do this on a picture by picture basis for MO dataset. For DSA dataset, we will have to transfer the entire dataset which is currently stored locally on this EC2 instance. 

In [21]:
taxa_fields = ['Phylum','Class','Order','Family','Genus','Species']

def create_path_name(series):
    folders = [series.get(w).values[0] for w in taxa_fields]
    if not sum([not isinstance(w,str) for w in folders ]):
        return '/'.join(folders)
    else:
        return None
    
def generate_url_image_API_MO(image_id):
    if not np.isnan(image_id):
        url = 'https://images.mushroomobserver.org/640/' \
                                    + str(int(image_id)) + '.jpg'
        return url
    else:
        return None
    
def upload_image_S3_MO(cid, target):
    url = generate_url_image_API_MO(cid)
    r = requests.get(url, allow_redirects=True)
    open('pics/test.jpg', 'wb').write(r.content)

    # Upload a new file
    data = open('pics/test.jpg', 'rb')
    s3.Bucket('pablo-mushroom-bucket').put_object(Key=target, Body=data)
    
def upload_image_S3_DSA(Image, target):
    data = open(Image, 'rb')
    s3.Bucket('pablo-mushroom-bucket').put_object(Key=target, Body=data)
    
def create_target(Unique_ID):
    tmp = df[df.Unique_ID == Unique_ID]
    if create_path_name(tmp) != None:
        if tmp.shape[0] == 1:
            return 'pics/' + create_path_name(tmp) + '/' + Unique_ID + '.JPG'
        else:
            return ['pics/' + create_path_name(tmp) + '/' + Unique_ID + \
                       '_' + str(w+1) + '.JPG' for w in range(tmp.shape[0])]

def process_entry(Unique_ID):
    tmp = df[df.Unique_ID == Unique_ID]
    #for now we are being strict about picture having all fields
    if create_path_name(tmp) != None:
        if Unique_ID.startswith('D'):
            Image = df[df.Unique_ID == Unique_ID].Image[0]
            target = create_target(Unique_ID)
            upload_image_S3_DSA(Image, target)
        elif Unique_ID.startswith('M'):
            CIDs = tmp.Image.values
            targets = create_target(Unique_ID)
            if tmp.shape[0] > 1:
                for cid, target in zip(CIDs, targets):
                    upload_image_S3_MO(cid, target)
            else:
                upload_image_S3_MO(CIDs[0], targets)
        else:
            print('something went wrong...%s' %Unique_ID)
    else:
        return
    

In [None]:
display(df)

## Let the Transfer Begin
I tested each type of file individually. Now let's transfer/download our dataset. 

In [None]:
J = 7338
for i, Unique_ID in enumerate(df.Unique_ID.iloc[J:]):
    if i%5000 == 0:
        print('Working on entry %i/%i' %(i, df.Unique_ID.shape[0]))
    process_entry(Unique_ID)


Working on entry 0/716900


UPDATE: So it takes ~250ms for each process_entry(Unique_ID) call, which means the entire dataset will go for about 45 hours. This could be trouble/ I really don't think this is the proper way to use the cloud. Also, this notebook keeps disconnecting and messing up the transfer. I switched to using tmux and a script, so at least it won't get interrupted. But really, we're going to have to do this locally. I hope I don't run out of memory. 

In [75]:

    
def MO_write_image(cid, target):
    url = generate_url_image_API_MO(cid)
    r = requests.get(url, allow_redirects=True)
    open(target, 'wb').write(r.content)
    return
    
def DSA_transfer_image(Image, target):
    copyfile(Image, target)
    return

def create_target(Unique_ID):
    tmp = df[df.Unique_ID == Unique_ID]
    if create_path_name(tmp) != None:
        if tmp.shape[0] == 1:
            return 'pics/' + create_path_name(tmp) + '/' + Unique_ID + '.JPG'
        else:
            return ['pics/' + create_path_name(tmp) + '/' + Unique_ID + \
                       '_' + str(w+1) + '.JPG' for w in range(tmp.shape[0])]
    else:
        return None
        
        
def process_entry_locally(Unique_ID):
    tmp = df[df.Unique_ID == Unique_ID]
    #for now we are being strict about picture having all fields
    if create_path_name(tmp) != None:
        if Unique_ID.startswith('D'):
            Image = df[df.Unique_ID == Unique_ID].Image[0]
            target = create_target(Unique_ID)
            print(Image)
            print(target)
            DSA_transfer_image(Image, target)
        elif Unique_ID.startswith('M'):
            CIDs = tmp.Image.values
            targets = create_target(Unique_ID)
            if tmp.shape[0] > 1:
                for cid, target in zip(CIDs, targets):
                    MO_write_image(cid, target)
            else:
                MO_write_image(CIDs[0], targets)
        else:
            print('something went wrong...%s' %Unique_ID)
    else:
        return
    

# MVP
For a minimal viable product, we will use tensorflow's example of retraining an Inception V3 (https://www.tensorflow.org/hub/tutorials/image_retraining) on our categories. I think it might be easier to start with DSA dataset that is already flat, and feed it as much of MO's data as we can. We will then only include categories that have at least 100 pictures, re-train, and use model to load into website. 

In [36]:
subfolders = os.listdir(os.getcwd() + '/images')
species_to_dir = {w : w.split('_')[1:] for w in subfolders}
idx = pd.IndexSlice

In [39]:
%timeit (df[['Genus','Species']].iloc[0])
%timeit ['Disciotis', 'venosa'] in species_to_dir.values()

9.96 ms ± 23.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
254 ns ± 0.0757 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [53]:
MO_only = df.loc[df.Data_Source=='MO',['Genus','Species']]
def f(x,y):
    if isinstance(x,str) and isinstance(y,str):
        return [x,y] in species_to_dir.values()
    else:
        return False
    
filter_MO_in_DSA = MO_only.apply(lambda x: f(*x), axis=1)


In [None]:
MO_only = df.loc[df.Data_Source=='MO',:]
MO_only = MO_only[filter_MO_in_DSA]
MO_only['GS_list'] = [[a,b] for a,b in zip(MO_only['Genus'],MO_only['Species'])]
display(MO_only)

In [123]:

def get_folder_species(GS_list):
    species_index = list(species_to_dir.values()).index(GS_list)
    return list(species_to_dir.keys())[species_index]

def MO_write_image(url, target):
    r = requests.get(url, allow_redirects=True)
    open(target, 'wb').write(r.content)
    return


def process_entry(i):
    Genus = MO_only.iloc[i].Genus
    Species = MO_only.iloc[i].Species
    image_id = MO_only.iloc[i].Image
    if isinstance(Genus, str) and isinstance(Species, str):
        print('_'.join([Genus, Species]))
        target_dir = os.environ['HOME'] + '/images/' \
                        + '_'.join([Genus, Species]) \
                        + '/' 
        file_name = MO_only.iloc[i].Unique_ID + '.JPG'
        if not os.path.isdir(target_dir):
            os.mkdir(target_dir)

        target = target_dir + file_name

        if not os.path.isfile(target):
            #url = generate_url_image_API_MO(image_id)
            #MO_write_image(url, target)
            pass
    return



In [98]:
subfolders = os.listdir('aside_for_now/')
not_enough = {}
for subfolder in subfolders:
    species_folder = os.listdir('images/' + subfolder)
    if len(species_folder) < 100:
        not_enough[subfolder] = len(subfolder)

In [97]:
for folder in not_enough.keys():
    shutil.move('images/' + folder, 'aside_for_now/' + folder)

In [105]:
subfolders = os.listdir('aside_for_now/')
for folder in subfolders:
    shutil.move('aside_for_now/' + folder, 'images/' + folder)

## Download the Rest of the Data
OK. Let's finish downloading the rest of the MO dataset. This should be running continuously. Once it's finished, transfer to S3. This should be left running today. 

In [None]:
import pickle
import pandas as pd
df = pickle.load(open('combined_dataset_df.p','rb'), encoding='latin1')
display(df)

In [36]:
'''
First we create subdirectories for each category in flat_images. 
There are a lot of species that we are including here
'''
for i, GS_Dir in df.groupby('GS_Dir'):
    if not os.path.isdir(image_dir + '/' + i):
        os.mkdir(image_dir + '/' + i)

In [54]:
base_dir = os.environ['HOME'] 
image_dir = base_dir + '/flat_images'

def process_entry(cid):
    target = '/'.join([image_dir, df.GS_Dir.iloc[cid], df.Unique_ID.iloc[cid] + '.JPG'])
    if not os.path.isfile(target):
        if df.Data_Source.iloc[cid] == 'MO':
            
            url = generate_url_image_API_MO(df.Image.iloc[cid])
            r = requests.get(url, allow_redirects=True)
            open(target, 'wb').write(r.content)
        if df.Data_Source.iloc[cid] == 'DSA':
            if not os.path.isfile(df.Image.iloc[cid]):
                print('DSA file cannot be found: %s' %df.Image.iloc[cid])
            os.rename(df.Image.iloc[cid], target)
            

In [48]:
'''
Let's get a sense of how much time it's going to take. Since MO requires a download, whlie
DSA just a file copy, they are inherently different so we'll get different metrics for each
'''
MO_cid = 0
DSA_cid = 900001
%timeit process_entry(MO_cid)
%timeit process_entry(DSA_cid)

232 ms ± 8.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
152 µs ± 1.26 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [51]:
'''
Expected time to complete !
'''
for i,sub in df.groupby('Data_Source'):
    print(i, sub.shape[0])

hours = ((89760 * 0.000075) + 899605 * (0.25)) / (60 * 60)
print('we are looking at %1.2f hours' %hours)

DSA 89760
MO 899605
we are looking at 62.47 hours


In [55]:
import time
J = 0

In [56]:
start = time.time()
for cid in range(J, df.shape[0]):
    if J%100000 == 0:
        print('On observation %i/%i' %(J, df.shape[0]))
        print('Minutes elapsed: %1.2f' %((time.time() - start) / 60))
    process_entry(cid)
    J = cid

On observation 0/989365
Minutes elapsed: 0.00
On observation 0/989365
Minutes elapsed: 0.00
On observation 100000/989365
Minutes elapsed: 170.62
On observation 200000/989365
Minutes elapsed: 328.78
On observation 300000/989365
Minutes elapsed: 471.01
On observation 400000/989365
Minutes elapsed: 616.03
On observation 500000/989365
Minutes elapsed: 771.77
On observation 600000/989365
Minutes elapsed: 951.53
On observation 700000/989365
Minutes elapsed: 1165.61
On observation 800000/989365
Minutes elapsed: 1361.48
On observation 900000/989365
Minutes elapsed: 1562.37


## Must Redefine Downloader
Unfortunately, I did not include unique identifiers for observations with multiple images, so this whole run was skipping them. I've had to remake the indexing dataframe, and redefined the function below to download the ones i missed. 

In [2]:
df = pickle.load(open('combined_dataset_df.p','rb'), encoding='latin1')
display(df)

Unnamed: 0,id,location_id,in_location,vote_cache,when,Domain,Kingdom,Phylum,Class,Order,Family,Genus,Species,Image,Data_Source,Unique_ID,GS_Dir
0,1,214,1,1.923350,['2004-07-13'],Eukarya,Fungi,Ascomycota,Sordariomycetes,Xylariales,Xylariaceae,Xylaria,polymorpha,1,MO,MO_1,Xylaria_polymorpha
0,2,53,1,2.706040,['2004-07-17'],,,,,,,Xylaria,magnoliae,2,MO,MO_2,Xylaria_magnoliae
0,3,60,1,2.499910,['2002-01-08'],Eukarya,Fungi,Ascomycota,Sordariomycetes,Xylariales,Xylariaceae,Xylaria,hypoxylon,3,MO,MO_3,Xylaria_hypoxylon
0,4,5,1,2.499910,['1996-01-15'],Eukarya,Fungi,Ascomycota,Sordariomycetes,Xylariales,Xylariaceae,Xylaria,hypoxylon,4,MO,MO_4,Xylaria_hypoxylon
0,5,36,1,1.666610,['2002-12-28'],Eukarya,Fungi,Basidiomycota,Agaricomycetes,Agaricales,Typhulaceae,Xeromphalina,unknown,5,MO,MO_5,Xeromphalina_unknown
0,6,58,1,2.505280,['2002-01-08'],Eukarya,Fungi,Basidiomycota,Agaricomycetes,Agaricales,Typhulaceae,Xeromphalina,campanella,6,MO,MO_6,Xeromphalina_campanella
0,7,58,0,2.499910,['2005-01-07'],,,,,,,Xerocomus,zelleri,7,MO,MO_7,Xerocomus_zelleri
0,8,39,1,2.451440,['2004-11-26'],,,,,,,Xerocomus,zelleri,8,MO,MO_8,Xerocomus_zelleri
0,9,69,1,2.499910,['2003-01-03'],,,,,,,Xerocomus,subtomentosus,9,MO,MO_9,Xerocomus_subtomentosus
1,9,69,1,2.499910,['2003-01-03'],,,,,,,Xerocomus,subtomentosus,10,MO,MO_9_1,Xerocomus_subtomentosus


In [22]:
base_dir = os.environ['HOME'] 
image_dir = base_dir + '/flat_images'


def process_entry(cid):
    Unique_ID = df.Unique_ID.iloc[cid]
    split_Unique_ID = Unique_ID.split('_')
    target = '/'.join([image_dir, df.GS_Dir.iloc[cid], Unique_ID + '.JPG'])
    
        
    def download_MO(cid):
        url = generate_url_image_API_MO(df.Image.iloc[cid])
        r = requests.get(url, allow_redirects=True)
        open(target, 'wb').write(r.content)
        

    if len(split_Unique_ID) == 2:
        if not os.path.isfile(target):
            if df.Data_Source.iloc[cid] == 'MO':
                download_MO(cid)
            if df.Data_Source.iloc[cid] == 'DSA':
                if not os.path.isfile(df.Image.iloc[cid]):
                    print('DSA file cannot be found: %s' %df.Image.iloc[cid])
                else:
                    os.rename(df.Image.iloc[cid], target)
    elif len(split_Unique_ID) > 2:
        assert df.Data_Source.iloc[cid] == 'MO'
        if int(split_Unique_ID[2]) == 0:
            #want to rename already downloaded image
            old_target = '_'.join(split_Unique_ID[:2])
            old_target = '/'.join([image_dir, df.GS_Dir.iloc[cid], old_target + '.JPG'])
            if os.path.isfile(old_target):
                #file already exists - just rename it
                os.rename(old_target, target)
            else:
                #actually haven't downloaded it so do so
                download_MO(cid)

        elif int(split_Unique_ID[2]) > 0:
            if not os.path.isfile(target):
                download_MO(cid)
    else:
        print('abnormal ID: %s' %Unique_ID)


In [26]:
J = 0

In [27]:
start = time.time()
for cid in range(J, df.shape[0]):
    if J%100000 == 0:
        print('On observation %i/%i' %(J, df.shape[0]))
        print('Minutes elapsed: %1.2f' %((time.time() - start) / 60))
    process_entry_second_pass(cid)
    J = cid

On observation 0/989365
Minutes elapsed: 0.00
On observation 0/989365
Minutes elapsed: 0.00
On observation 100000/989365
Minutes elapsed: 195.88
On observation 200000/989365
Minutes elapsed: 404.09
On observation 300000/989365
Minutes elapsed: 618.30
On observation 400000/989365
Minutes elapsed: 829.02
On observation 500000/989365
Minutes elapsed: 1041.98
On observation 600000/989365
Minutes elapsed: 1345.45
On observation 700000/989365
Minutes elapsed: 1720.65
On observation 800000/989365
Minutes elapsed: 2095.50
On observation 900000/989365
Minutes elapsed: 2457.83


## Transfer Dataset to S3  
Since this is taking forever, I am very worried that the EC2 instance will crash and then I will be in some deep shit. For this reason we will make a script to upload everything to S3. 

In [None]:
import boto3
#starting S3 object
s3 = boto3.resource('s3')
#total number of files that we want to transfer
total_files = sum([len(f) for r,d,f in os.walk('flat_images/')])
print('uploading %i files to S3.' %total_files)
images_transfered = 0
for (root, dirs, files) in os.walk('flat_images/'):
    if root != 'flat_images/':
        sub_files = [root + '/' + w for w in files]
        print('Working on species %s - includes %i images' %(root, len(sub_files)))
        for sf in sub_files:
            if os.path.isfile(sf):
                data = open(sf, 'rb')
                s3.Bucket('pablo-mushroom-bucket').put_object(Key=sf, Body=data)
                data.close()
                images_transfered += 1
                if images_transfered%1 == 10000:
                    print('uploaded %i entries %i/%i' %(images_transfered, total_files))

print('Went thru %i entries, and we had %i total files' %(images_transfered,total_files))