In [1]:
# Librairies génériques
import numpy as np
import sys
import io
import os
import time


In [6]:
import sagemaker
import boto3
import pyspark

from pyspark import SparkContext              # 'SparkContext' permet d'instancier des RDD
from pyspark.sql import SparkSession          # 'SparkSession' permet d'instancier des objets de type DataFrame
from pyspark.sql import Row
from pyspark.sql.functions import udf

from pyspark.sql.types import ArrayType, StringType, IntegerType

# region = "eu-west-1"   # Ireland: la moins couteuse. Indique sur quel data center je suis connecté.

ModuleNotFoundError: No module named 'sagemaker'

In [None]:
# Librairies pour le traitement d'images
from PIL import Image
import cv2


### Variables

In [None]:
mybucket = "lcanonne-p8"
prefix   = "images"
folder   = "s3://{}/{}/".format(mybucket, prefix)

global image1   # globale variable to use in read_image and gest_desc functions

# avocat_data = 's3://{}/{}/{}'.format(mybucket, prefix, 'Avocado')
# banane_data = 's3://{}/{}/{}'.format(mybucket, prefix, 'Banana')
# print(avocat_data)


### Init S3_client and S3 sagemaker region  and boto3 Session

In [None]:
region = boto3.Session().region_name
session = boto3.session.Session(region_name=region)    # Ouverture d'une Session sur la région 'eu-west-1' (Ireland)
s3_client = session.client('s3')                       # indique que le client utilise le service S3 de AWS
s3 = boto3.resource("s3", region_name=region)          # référence sur le service S3

# Création d'une session customisée pour indiquer la region AWS. On ne prend pas la session par défaut.

### Fonctions (load, read, extract, desc)

In [None]:
# Création du dataframe avec la 1ere colonne qui est le nom complet de l'image dans le bucket.

def load_datas(folder, s3_client, sc):
    start_time = time.time()
    
    sub_folders = s3_client.list_objects_v2(Bucket=mybucket, Prefix=prefix)
    # Limite la réponse aux clés qui commencent par la valeur de 'prefix' sur le point d'accés de 'mybucket'
    if "Contents" not in sub_folders:
        print("dossier source non trouvé")
        sys.exit(0)

    lst_path = []

    for key in sub_folders["Contents"]:
        file = key["Key"]
        if ('jpg' in file):             # j'ai remplacé le filter avec cet if (pr filtrer que les images)
            file = file.replace(prefix + "/", "")
            lst_path.append(folder+file)
            
    print("Nombre d'images chargées :", len(lst_path))
    
    rdd = sc.parallelize(lst_path)                         # SparkContext utilisé pour créer un objet RDD
    row_rdd = rdd.map(lambda x: Row(x))                    # rdd doit être constitué d'objet de type 'Row'
    df = spark.createDataFrame(row_rdd, ["path_img"])      # Enregistrement des images dans un Dataframe PySpark
                                                           # <=> création d'un dataframe à partir d'un RDD

    print("Temps execution %sec ---" % (time.time() - start_time))
    return df


# Paramètre possible de parallelize() :  dist, list, tuple, set.  Ici c'est la 'list' qui est utilisée.
# La méthode map() est une transformation RDD qui s'applique à chaque élément du RDD via une fonction lambda

# parallelize() transforme cette list en un ensemble distribué de paths et nous offre toutes les possibilités 
#    de l’infrastructure de Spark.

In [None]:
# 2eme colonne du dataframe:  catégorie

def extract_categ(path):
    lst_file = path.split('/')
    categ  = lst_file[-2]
    return categ

In [None]:
# 3eme colonne du dataframe:  matrice de chaque image mise à plat

def read_image(img):
    start_time = time.time()
    
    global image1   # init a global variable (image1) to pass in the next function (get_dec)
    
    img = img.replace("s3://{}/".format(mybucket), "")
    s3 = boto3.resource("s3", region_name=region)
    bucket = s3.Bucket(mybucket)
    
    object = bucket.Object(img)
    response = object.get()
    file_stream = response['Body']
    image = Image.open(file_stream)
    
    if image is None:
        image = 0
    else:
        image = np.asarray(image)
        image1 = image
        image = image.flatten().tolist()

    print("Temps execution %sec ---" % (time.time() - start_time))
                           
    return image


In [None]:
def get_desc(img):
    start_time = time.time()
    
    read_image(img)
    image = image1
    
    orb = cv2.ORB_create(nfeatures=50)
    keypoints, desc = orb.detectAndCompute(image, None)
    print("Temps execution %sec ---" % (time.time() - start_time))
    
    if desc is None:
        desc = 0
    else:
        desc = desc.flatten().tolist()
        
    return desc


### Init Spark

In [None]:
sc = SparkContext.getOrCreate()   # Instantiation d'un SparkContext
sc.setLogLevel("Warn")

spark = SparkSession.builder.appName('projet8').getOrCreate()  # retourne un objet de type 'SparkSession'

# sc : client qui se connecte à un cluster Spark
# Un objet SparkSession permet d'instancier des objets de type DataFrame
# Un objet SparkContext permet d'instancier des RDD

### Load, Transform and Extract data with Spark (udf: rdd)

In [None]:
df = load_datas(folder, s3_client, sc)

In [None]:
# remplissage de la 2eme colonne 'categ'

udf_categ = udf(extract_categ, StringType())         # Crée un UDF avec la fonction 'extract_categ' avec StringType()
                                                     #   comme type de retour. C'est le type par défaut.
df = df.withColumn("categ", udf_categ("path_img"))   # Ajout de la colonne 'categ' au dataframe avec la valeur de udf_categ()


In [None]:
# remplissage de la 3eme colonne 'image'. Chaque image a été transformé en array numpy puis vecteur unitaire.

udf_image = udf(read_image, ArrayType(IntegerType()))
df = df.withColumn("image", udf_image("path_img"))

In [None]:
df = df.filter(df.image.isNotNull())

In [None]:
# remplissage de la 4eme colonne 'descriptors' avec les descripteurs de chaque image transformés en vecteur unitaire
udf_desc = udf(get_desc, ArrayType(IntegerType()))
df = df.withColumn("descriptors", udf_desc("path_img"))

In [None]:
df = df.filter(df.descriptors.isNotNull())

In [None]:
df.show(5)


### Save df in format parquet  then copy to s3 with sagemaker session

In [None]:
df.repartition(1).write.mode('overwrite').parquet('resultat')   # resultat = name  of folder where the dataframe
                                                                # will be stored in sagemaker instance

In [None]:
upload_data = sagemaker.Session().upload_data(bucket=mybucket, 
                                              path='resultat',             # local file in sagemaker instance
                                              key_prefix='outputresultat') # bucket where we stored parquet in s3
print('upload_data: {}'.format(upload_data))
