<p style="font-size:3em"> Run a program on the cloud</p>

<br/>

This notebook corresponds to the python script (which was run on the **AWS Sagemaker**) of the **8th project of my data scientist path** with **OpenClassrooms/CentralSupelec** 

This project is a first step on the big data world, using **Pyspark** and **AWS tools**.

This notebook simply preprocess and get ORB descripters from images found on a Kaggle database (fruit360):<br>

* Create a python program using spark (Dataframes, UDF) which will be scalable
* Save DataBase on an AWS S3 Bucket
* Manage instances roles through AWS IAM
* Run the notebook via AWS Sagemaker 
* Save results on S3 Bucket

___ 
<b>I strongly recommend to use the 'table of content' extension of Jupyter to navigate through this notebook </b

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Imports" data-toc-modified-id="Imports-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Imports</a></span></li><li><span><a href="#Get-data-paths-from-S3" data-toc-modified-id="Get-data-paths-from-S3-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Get data paths from S3</a></span></li><li><span><a href="#preprocessing" data-toc-modified-id="preprocessing-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>preprocessing</a></span></li><li><span><a href="#Save-on-S3" data-toc-modified-id="Save-on-S3-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Save on S3</a></span></li></ul></div>

# Imports

In [321]:
import pyspark
from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,input_file_name
from pyspark.sql.types import *

import sagemaker

In [304]:
spark

In [322]:
from PIL import Image, ImageOps

In [286]:
pip install python-resize-image

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [323]:
import pandas as pd 
import numpy as np 

import cv2
from resizeimage import resizeimage

import boto3
import boto.s3
import s3fs

import time
import io

# Get data paths from S3

In [325]:
''' Create "is_local" variable which is boolean. 
    This variable will decide if the program has to be run with data stored locally or in a S3 Bucket '''

is_local = False

if is_local == True :
    # Need bucket name and folder where fruits are stored
    bucket='ocr-p8-fruits'
    folder = 'fruit_s3/'
    data_location = 's3://{}/{}'.format(bucket, folder)

else : 
    path = "./fruits-360/folder/**"

In [309]:
def get_name(path):
    '''Retrieves image name with using str.split function'''
    if len(path) > 0:
        #catégorie de l'image
        return path.split('/')[-1]
    else:
        return ''

def update_path(path):
    '''Delete the first useless caracters (file:///) when using a local path '''
    if len(path) > 0:
        #catégorie de l'image
        return path.strip('file:///')
    else:
        return ''    
    

def get_categ(path):
    '''Retrieves image category with using str.split function'''
    if len(path) > 0:
        #catégorie de l'image
        return path.split('/')[-2]
    else:
        return ''

def get_s3path(data_location):
    '''Retrieves image path on S3 bucket (using s3fs library)'''
    
    path_list= pd.DataFrame(columns=['path'])
    sub_folder = fs.ls(data_location)

    for fold in sub_folder: 
        path_list = path_list.append(pd.DataFrame(fs.ls('s3://'+fold), columns=['path']), ignore_index=True)
    print(path_list.shape[0], " items found ! ")
    return path_list
    
    
def load_data(path_img, is_local=True):
    '''Loading data in dataframe from either local location or S3 Bucket (depending on is_local value) 
       Use the data folder path to get the path of each image stored and their category
       and return a spark dataframe containing the information collected'''
    
    start = time.time()
    
    if is_local == True:
        #chargement dataframe des images
        df_img = spark.read.format("image").load(path) # ne fonctionne pas si il y a des espaces dans le chemin
        print('Loading achieved')

        #récupération chemin à partir des images
        df_img = df_img.withColumn("path", input_file_name())

        udf_path = udf(update_path, StringType())
        df_img = df_img.withColumn('path', udf_path('path'))
    
    else: 
        df_img = spark.createDataFrame(get_s3path(path_img))
    
    #catégorisation des images
    udf_categorie = udf(get_categ, StringType())
    df_img = df_img.withColumn('categorie', udf_categorie('path'))
    
    #nom des images
    udf_name = udf(get_name, StringType())
    df_img = df_img.withColumn('name', udf_name('path'))
    print('Images loading time : {} seconds \n'.format(time.strftime('%S', time.gmtime(time.time()-start))))
    
    return df_img

In [310]:
spark_df = load_data(data_location, is_local)
spark_df.show()

919  items found ! 
Images loading time : 00 seconds 

+--------------------+---------+-----------+
|                path|categorie|       name|
+--------------------+---------+-----------+
|ocr-p8-fruits/fru...|  Apricot|  0_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|100_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|101_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|102_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|103_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|104_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|105_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|106_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|107_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|108_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|109_100.jpg|
|ocr-p8-fruits/fru...|  Apricot| 10_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|110_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|111_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|112_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|113_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|114_100.jpg|
|ocr-p8-fruits/fru...|  Apricot|115_100.jpg|


# preprocessing

In [311]:
def init_boto3():
    # function to initialize s3 session and client
    
    session = boto3.session.Session(region_name=region)
    s3_client = session.client('s3')
    
    return True 


def prep_and_desc_local(path_img):
    # Get preprocessed image and ORBdescripters from images stored locally '''
    
    # Open image with pillow and apply a few preprocessing steps'''
    img = Image.open(path_img)
    width = 100
    img_equ = ImageOps.equalize(img)
    img_full = resizeimage.resize_cover(img_equ, [width, width])
    
    # The image cannot be stored as a matrix, we need to transform it into a list 
    preproc = np.array(img_full).flatten().tolist()
    
    orb = cv2.ORB_create(nfeatures = 50)
    kp, des = orb.detectAndCompute( np.array(img_full),None)
    
    # ORB might not find any keypoint for some images:
    
    # if no keypoints found, returns an empty list
    if des is None : 
        #
        desc = []

    # else, return a flattened list of descripters (one keypoints equals to 32 descripters )
    else : 
        desc = des.flatten().tolist()
    
    return (preproc, desc)
    


def prep_and_desc_s3 (path_img):
    # Get preprocessed image and ORBdescripters from images stored in a S3 Bucket '''

    # Open image stored on S3 bucket
    s3 = boto3.resource('s3', region_name=boto3.Session().region_name)
    my_bucket = path_img.split('/')[0]
    key = path_img.replace(my_bucket+'/','')

    obj = s3.Object(bucket_name =my_bucket, key=key)
    response = obj.get()
    file_stream = response['Body']
    img= Image.open(file_stream)
    
    # Preprocess image
    width = 100
    img_equ = ImageOps.equalize(img)
    img_full = resizeimage.resize_cover(img_equ, [width, width])
    
    # The image cannot be stored as a matrix, we need to transform it into a list 
    preproc = np.array(img_full).flatten().tolist()
    
    orb = cv2.ORB_create(nfeatures = 50)
    kp, des = orb.detectAndCompute( np.array(img_full),None)
    
    # ORB might not find any keypoint for some images:
    
    # if no keypoints found, returns an empty list
    if des is None : 
        desc = []

    # else, return a flattened list of descripters (one keypoints equals to 32 descripters )
    else : 
        desc = des.flatten().tolist()
    
    return (preproc, desc)
    



In [312]:
'''As our functions return a tupple, we need to provide a schema to the UDF function
   so that we can identify what corresponds to the preprocessed image 
   and what correspond to the descripters
'''

schema = StructType([
    StructField("preproc", ArrayType(IntegerType()), False),
    StructField("desc", ArrayType(IntegerType()), False)
    ])

#Depending on is_local value we use either the local or s3 function
if is_local == True : 
    udf_local = udf(prep_and_desc_local, schema)
    spark_df = spark_df.withColumn('prep_desc', udf_local('path'))
else : 
    init_boto3()
    udf_s3 = udf(prep_and_desc_s3, schema)

    spark_df = spark_df.withColumn('prep_desc', udf_s3('path'))



spark_df.show()

+--------------------+---------+-----------+--------------------+
|                path|categorie|       name|           prep_desc|
+--------------------+---------+-----------+--------------------+
|ocr-p8-fruits/fru...|  Apricot|  0_100.jpg|[[255, 235, 227, ...|
|ocr-p8-fruits/fru...|  Apricot|100_100.jpg|[[219, 255, 223, ...|
|ocr-p8-fruits/fru...|  Apricot|101_100.jpg|[[223, 255, 234, ...|
|ocr-p8-fruits/fru...|  Apricot|102_100.jpg|[[255, 237, 255, ...|
|ocr-p8-fruits/fru...|  Apricot|103_100.jpg|[[255, 239, 247, ...|
|ocr-p8-fruits/fru...|  Apricot|104_100.jpg|[[255, 234, 255, ...|
|ocr-p8-fruits/fru...|  Apricot|105_100.jpg|[[255, 235, 255, ...|
|ocr-p8-fruits/fru...|  Apricot|106_100.jpg|[[255, 255, 230, ...|
|ocr-p8-fruits/fru...|  Apricot|107_100.jpg|[[255, 249, 233, ...|
|ocr-p8-fruits/fru...|  Apricot|108_100.jpg|[[237, 249, 226, ...|
|ocr-p8-fruits/fru...|  Apricot|109_100.jpg|[[255, 248, 235, ...|
|ocr-p8-fruits/fru...|  Apricot| 10_100.jpg|[[237, 237, 255, ...|
|ocr-p8-fr

In [313]:
spark_df.printSchema()

root
 |-- path: string (nullable = true)
 |-- categorie: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prep_desc: struct (nullable = true)
 |    |-- preproc: array (nullable = false)
 |    |    |-- element: integer (containsNull = true)
 |    |-- desc: array (nullable = false)
 |    |    |-- element: integer (containsNull = true)



In [314]:
spark_df.select('path','categorie','prep_desc.preproc','prep_desc.desc').show()

+--------------------+---------+--------------------+--------------------+
|                path|categorie|             preproc|                desc|
+--------------------+---------+--------------------+--------------------+
|ocr-p8-fruits/fru...|  Apricot|[255, 235, 227, 2...|[202, 144, 182, 2...|
|ocr-p8-fruits/fru...|  Apricot|[219, 255, 223, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[223, 255, 234, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[255, 237, 255, 2...|[189, 215, 111, 2...|
|ocr-p8-fruits/fru...|  Apricot|[255, 239, 247, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[255, 234, 255, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[255, 235, 255, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[255, 255, 230, 2...|[207, 205, 191, 2...|
|ocr-p8-fruits/fru...|  Apricot|[255, 249, 233, 2...|[252, 36, 220, 24...|
|ocr-p8-fruits/fru...|  Apricot|[237, 249, 226, 2...|[239, 247, 245, 2...|
|ocr-p8-fruits/fru...|  A

In [315]:
preprocessed= spark_df.select('prep_desc.preproc').take(1)[0][0]
descripted= spark_df.select('prep_desc.desc').take(1)[0][0]

In [316]:
print('Length array of preprocessed image: ',len(preprocessed))
print('Length array of image descripters: ',len(descripted))

Length array of preprocessed image:  30000
Length array of image descripters:  224


# Save on S3

In [317]:
final_df = spark_df.select('path','categorie','prep_desc.preproc','prep_desc.desc')
final_df.show()

+--------------------+---------+--------------------+--------------------+
|                path|categorie|             preproc|                desc|
+--------------------+---------+--------------------+--------------------+
|ocr-p8-fruits/fru...|  Apricot|[255, 235, 227, 2...|[202, 144, 182, 2...|
|ocr-p8-fruits/fru...|  Apricot|[219, 255, 223, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[223, 255, 234, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[255, 237, 255, 2...|[189, 215, 111, 2...|
|ocr-p8-fruits/fru...|  Apricot|[255, 239, 247, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[255, 234, 255, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[255, 235, 255, 2...|                  []|
|ocr-p8-fruits/fru...|  Apricot|[255, 255, 230, 2...|[207, 205, 191, 2...|
|ocr-p8-fruits/fru...|  Apricot|[255, 249, 233, 2...|[252, 36, 220, 24...|
|ocr-p8-fruits/fru...|  Apricot|[237, 249, 226, 2...|[239, 247, 245, 2...|
|ocr-p8-fruits/fru...|  A

In [318]:
# Turn spark dataframe into pandas dataframe
pandas_df = final_df.toPandas()
pandas_df

Unnamed: 0,path,categorie,preproc,desc
0,ocr-p8-fruits/fruit_s3/Apricot/0_100.jpg,Apricot,"[255, 235, 227, 247, 241, 235, 227, 249, 255, ...","[202, 144, 182, 250, 111, 244, 184, 70, 159, 9..."
1,ocr-p8-fruits/fruit_s3/Apricot/100_100.jpg,Apricot,"[219, 255, 223, 224, 255, 226, 255, 255, 240, ...",[]
2,ocr-p8-fruits/fruit_s3/Apricot/101_100.jpg,Apricot,"[223, 255, 234, 238, 255, 234, 255, 245, 234, ...",[]
3,ocr-p8-fruits/fruit_s3/Apricot/102_100.jpg,Apricot,"[255, 237, 255, 255, 237, 255, 243, 247, 248, ...","[189, 215, 111, 207, 244, 95, 239, 252, 117, 2..."
4,ocr-p8-fruits/fruit_s3/Apricot/103_100.jpg,Apricot,"[255, 239, 247, 255, 239, 247, 255, 255, 255, ...",[]
...,...,...,...,...
914,ocr-p8-fruits/fruit_s3/Avocado/r_317_100.jpg,Avocado,"[255, 255, 255, 255, 255, 255, 255, 255, 255, ...","[191, 125, 239, 214, 253, 223, 94, 244, 153, 2..."
915,ocr-p8-fruits/fruit_s3/Avocado/r_318_100.jpg,Avocado,"[255, 255, 255, 255, 255, 255, 255, 255, 255, ...","[191, 223, 236, 206, 253, 83, 222, 248, 219, 2..."
916,ocr-p8-fruits/fruit_s3/Avocado/r_319_100.jpg,Avocado,"[255, 255, 255, 255, 255, 255, 255, 255, 255, ...","[159, 31, 105, 79, 181, 127, 222, 56, 61, 238,..."
917,ocr-p8-fruits/fruit_s3/Avocado/r_31_100.jpg,Avocado,"[255, 255, 255, 255, 255, 255, 255, 255, 255, ...","[249, 84, 255, 101, 100, 110, 109, 114, 120, 2..."


In [319]:
# Save pandas dataframe in csv format on S3 Bucket
name_to_save = 'data.csv'
csv_buffer = io.StringIO()
pandas_df.to_csv(csv_buffer, index=False)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, name_to_save).put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '2A049125617F3CF1',
  'HostId': 'fFH1VC6l8m/3cHB8wr4l2VV+GY5SJ3xfG4GeQU91h2KDAPbVx57EjN1Q5AmLFxfbq/Q6I0PRYQQ=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'fFH1VC6l8m/3cHB8wr4l2VV+GY5SJ3xfG4GeQU91h2KDAPbVx57EjN1Q5AmLFxfbq/Q6I0PRYQQ=',
   'x-amz-request-id': '2A049125617F3CF1',
   'date': 'Tue, 15 Dec 2020 13:43:22 GMT',
   'etag': '"23189d0d9ea3c2198bf8ef44891128aa"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"23189d0d9ea3c2198bf8ef44891128aa"'}

## Check if data were saved properly on S3

In [327]:
# Read csv stored on S3 bucket
s3 = boto3.client('s3')
obj = s3.get_object(Bucket=bucket, Key=name_to_save)
df = pd.read_csv(io.BytesIO(obj['Body'].read()))

In [328]:
df

Unnamed: 0,path,categorie,preproc,desc
0,ocr-p8-fruits/fruit_s3/Apricot/0_100.jpg,Apricot,"[255, 235, 227, 247, 241, 235, 227, 249, 255, ...","[202, 144, 182, 250, 111, 244, 184, 70, 159, 9..."
1,ocr-p8-fruits/fruit_s3/Apricot/100_100.jpg,Apricot,"[219, 255, 223, 224, 255, 226, 255, 255, 240, ...",[]
2,ocr-p8-fruits/fruit_s3/Apricot/101_100.jpg,Apricot,"[223, 255, 234, 238, 255, 234, 255, 245, 234, ...",[]
3,ocr-p8-fruits/fruit_s3/Apricot/102_100.jpg,Apricot,"[255, 237, 255, 255, 237, 255, 243, 247, 248, ...","[189, 215, 111, 207, 244, 95, 239, 252, 117, 2..."
4,ocr-p8-fruits/fruit_s3/Apricot/103_100.jpg,Apricot,"[255, 239, 247, 255, 239, 247, 255, 255, 255, ...",[]
...,...,...,...,...
914,ocr-p8-fruits/fruit_s3/Avocado/r_317_100.jpg,Avocado,"[255, 255, 255, 255, 255, 255, 255, 255, 255, ...","[191, 125, 239, 214, 253, 223, 94, 244, 153, 2..."
915,ocr-p8-fruits/fruit_s3/Avocado/r_318_100.jpg,Avocado,"[255, 255, 255, 255, 255, 255, 255, 255, 255, ...","[191, 223, 236, 206, 253, 83, 222, 248, 219, 2..."
916,ocr-p8-fruits/fruit_s3/Avocado/r_319_100.jpg,Avocado,"[255, 255, 255, 255, 255, 255, 255, 255, 255, ...","[159, 31, 105, 79, 181, 127, 222, 56, 61, 238,..."
917,ocr-p8-fruits/fruit_s3/Avocado/r_31_100.jpg,Avocado,"[255, 255, 255, 255, 255, 255, 255, 255, 255, ...","[249, 84, 255, 101, 100, 110, 109, 114, 120, 2..."
