In [1]:
import numpy as np
import pandas as pd

import boto3

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import ArrayType, FloatType, IntegerType
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, PCA

import cv2 as cv

local = True
write_data = True

In [2]:
spark = SparkSession.builder.appName('FruitsPreProc').config(
    'spark.hadoop.fs.s3a.impl',
    'org.apache.hadoop.fs.s3a.S3AFileSystem').getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set('fs.s3a.impl',
                                  'org.apache.hadoop.fs.s3a.S3AFileSystem')
sc._jsc.hadoopConfiguration().set(
    "fs.s3a.aws.credentials.provider",
    "com.amazonaws.auth.profile.ProfileCredentialsProvider")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
                                  "s3.eu-west-3.amazonaws.com")
spark.sparkContext._conf.getAll()


22/05/30 18:55:23 WARN Utils: Your hostname, BigDeb resolves to a loopback address: 127.0.1.1; using 192.168.0.11 instead (on interface enp3s0)
22/05/30 18:55:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/30 18:55:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/30 18:55:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


[('spark.app.id', 'local-1653929724706'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', '192.168.0.11'),
 ('spark.driver.port', '36981'),
 ('spark.app.name', 'FruitsPreProc'),
 ('spark.sql.warehouse.dir',
  'file:/home/lancelot/Documents/FormationDataScientist/P8/spark-warehouse'),
 ('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.app.startTime', '1653929723561'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

In [3]:
s3 = boto3.client('s3')


In [4]:
if local is True:
    path = './fruits-360_dataset/fruits-360/Training/'
else:
    path = 's3a://stockp8oc/fruits-360/Training/'


def load_img_data(path=path):
    ImgData = spark.read.format('binaryFile') \
                    .option('pathGlobFilter', '*.jpg') \
                    .option('recursiveFileLookup', 'true') \
                    .load(path) \
                    .select('path', 'content')
    ImgData = ImgData.withColumn('label',
                                 F.element_at(F.split(F.col('path'), '/'), -2))
    ImgData = ImgData.withColumn('TruePath',
                                 F.element_at(F.split(F.col('path'), ':'), 2))
    ImgData = ImgData.withColumn(
        'imgName',
        F.concat('label', F.lit('_'),
                 F.element_at(F.split(F.col('path'), '/'), -1)))
    ImgData = ImgData.drop('path')
    return ImgData.sample(.01)



In [5]:
ImgData = load_img_data()



                                                                                

In [6]:
def get_desc(img):
    image = cv.imread(img)
    orb = cv.ORB_create(nfeatures=100)
    keypoints_orb, desc = orb.detectAndCompute(image, None)
    if desc is None:
        desc = [np.array(32 * [0]).astype(np.float64).tolist()]
    else:
        desc = desc.astype(np.float64).tolist()
    return desc



In [7]:
udf_image = F.udf(
    get_desc,
    ArrayType(ArrayType(FloatType(), containsNull=False), containsNull=False))

ImgDesc = ImgData.drop('content').withColumn("descriptors",
                                             F.explode(udf_image("TruePath")))


In [8]:
kmean = KMeans(k=1000, featuresCol='descriptors', seed=0)
model = kmean.fit(ImgDesc)


                                                                                

In [9]:
Pred = model.transform(ImgDesc)
Pred.show(3)

+---------+--------------------+--------------------+--------------------+----------+
|    label|            TruePath|             imgName|         descriptors|prediction|
+---------+--------------------+--------------------+--------------------+----------+
|Raspberry|/home/lancelot/Do...|Raspberry_16_100.jpg|[101.0, 217.0, 20...|       488|
|Raspberry|/home/lancelot/Do...|Raspberry_16_100.jpg|[99.0, 176.0, 170...|       226|
|Raspberry|/home/lancelot/Do...|Raspberry_16_100.jpg|[48.0, 166.0, 124...|       802|
+---------+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



In [10]:
ImgPred = Pred.groupBy('label', 'prediction').count()

In [11]:
BoVW = ImgPred.groupBy('label').pivot('prediction').sum('count').fillna(0)
BoVW.show()


22/05/30 19:01:31 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/05/30 19:02:22 WARN DAGScheduler: Broadcasting large task binary with size 1014.2 KiB
                                                                                

+---------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---

In [12]:
VA = VectorAssembler(inputCols=BoVW.drop('label').columns,
                     outputCol='features')
pca = PCA(k=100, inputCol='features', outputCol='pca_features')
pipe = Pipeline(stages=[VA, pca])

In [13]:
pipePCA = pipe.fit(BoVW)

22/05/30 19:03:16 WARN DAGScheduler: Broadcasting large task binary with size 1081.0 KiB
22/05/30 19:03:17 WARN DAGScheduler: Broadcasting large task binary with size 1081.0 KiB
22/05/30 19:03:18 WARN DAGScheduler: Broadcasting large task binary with size 1083.1 KiB
22/05/30 19:03:19 WARN DAGScheduler: Broadcasting large task binary with size 1081.5 KiB
22/05/30 19:03:19 WARN DAGScheduler: Broadcasting large task binary with size 1082.2 KiB
22/05/30 19:03:21 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/05/30 19:03:21 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


In [14]:
pcaData = pipePCA.transform(BoVW)
pcaDataDF = pcaData.select(['label', 'pca_features']).toPandas()

22/05/30 19:04:17 WARN DAGScheduler: Broadcasting large task binary with size 1870.7 KiB
                                                                                

In [15]:
pcaDataDFClean = pcaDataDF.join(
    pd.DataFrame(
        pcaDataDF['pca_features'].tolist())).drop(columns='pca_features')
if write_data is True:
    pcaDataDFClean.to_csv('./featuresPCA.csv', index=False)