## Configurando Spark

In [None]:
## Instalar as dependências

### Instalar Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

### Realizar o download do Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz

### Descompactar o arquivo baixado
!tar xf spark-3.4.0-bin-hadoop3.tgz

### Instalar findspark
!pip install -q findspark

## Configurar as variáveis de ambiente
### Importar a biblioteca os
import os

### Definir a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

### Definir a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

### Importar e iniciar a biblioteca do findspark
import findspark
findspark.init('spark-3.4.0-bin-hadoop3')

## Baixando dataset do kaggle

In [None]:
! pip install -q kaggle
from google.colab import files
files.upload()

In [None]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets download -d tolgadincer/labeled-chest-xray-images
! mkdir labeled-chest-xray-images
! unzip -q labeled-chest-xray-images -d labeled-chest-xray-images

Downloading labeled-chest-xray-images.zip to /content
 99% 1.16G/1.17G [00:15<00:00, 127MB/s]
100% 1.17G/1.17G [00:15<00:00, 83.1MB/s]


## Checando número de cores do colab

In [None]:
from multiprocessing import cpu_count

cpu_count()

2

## Iniciando Pyspark

In [None]:
# iniciar uma sessão local
from pyspark.sql import SparkSession
import time

import io

import numpy as np
import pandas as pd
from PIL import Image

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col, expr, pandas_udf, PandasUDFType, udf
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType

import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

spark = SparkSession.builder.appName('ChestImages').master('local[*]').config('spark.ui.port', '4050').getOrCreate()

### Para visualizar interface gráfica do pyspark do colab

In [None]:
# !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
# !unzip ngrok-stable-linux-amd64.zip
# !./ngrok authtoken 2QvMFHiPW2gkYlWhsN1KqdNqzKO_5jV9FXU6ue9d9ZHdnz1zv
# get_ipython().system_raw('./ngrok http 4050 &')
# !sleep 3
# !curl -s http://localhost:4040/api/tunnels

## Processamento dos dados de entrada

In [None]:
# read in the files from the mounted storage as binary file
path = "/content/labeled-chest-xray-images/chest_xray"
path_train = path + "/train"
path_test = path + "/test"

images_df_train = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.jpeg") \
.option("recursiveFileLookup", "true") \
.load(path_train)

images_df_test = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.jpeg") \
.option("recursiveFileLookup", "true") \
.load(path_test)


In [None]:
images_df_train = images_df_train.withColumn("Target", expr("reverse(split(path, '/'))[1]"))
images_df_test = images_df_test.withColumn("Target", expr("reverse(split(path, '/'))[1]"))

In [None]:
images_df_train.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- Target: string (nullable = true)



In [None]:
images_df_train.count()

5232

## Treinamento do modelo

### Preparação

In [None]:
model = ResNet50(include_top=False)

bc_model_weights = spark.sparkContext.broadcast(model.get_weights())

def model_fn():
  """
  Returns a ResNet50 model with top layer removed and broadcasted pretrained weights.
  """
  model = ResNet50(weights=None, include_top=False)
  model.set_weights(bc_model_weights.value)
  return model

def preprocess(content):
  """
  Preprocesses raw image bytes for prediction.
  """
  img = Image.open(io.BytesIO(content)).resize([64, 64])
  img = img.convert('RGB')
  arr = img_to_array(img)
  return preprocess_input(arr)

def featurize_series(model, content_series):
  """
  Featurize a pd.Series of raw images using the input model.
  :return: a pd.Series of image features
  """
  input = np.stack(content_series.map(preprocess))
  preds = model.predict(input)
  # For some layers, output features will be multi-dimensional tensors.
  # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
  output = [p.flatten() for p in preds]
  return pd.Series(output)


@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
  '''
  This method is a Scalar Iterator pandas UDF wrapping our featurization function.
  The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

  :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
  '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5




In [None]:
# Pandas UDFs on large records (e.g., very large images) can run into Out Of Memory (OOM) errors.
# If you hit such errors in the cell below, try reducing the Arrow batch size via `maxRecordsPerBatch`.
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "512")

In [None]:
# featurization on the spark dataframe.
features_df_train = images_df_train.select(col("path"), col("Target"),  featurize_udf("content").alias("features"))
features_df_test = images_df_test.select(col("path"), col("Target"),  featurize_udf("content").alias("features"))

# post processing features column format
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

features_df_train = features_df_train.select(
   col("path"),
    list_to_vector_udf(features_df_train["features"]).alias("features"),
   col("Target")
)
features_df_test = features_df_test.select(
   col("path"),
    list_to_vector_udf(features_df_test["features"]).alias("features"),
   col("Target")
)

df_train =  features_df_train.sample(fraction=1.0, seed=42)
df_test =  features_df_test.sample(fraction=1.0, seed=42)

In [None]:
df_train.cache()
df_test.cache()

DataFrame[path: string, features: vector, Target: string]

### Treinamento em si


In [None]:
labelIndexer = StringIndexer(inputCol="Target", outputCol="indexedTarget").fit(features_df_train)

# concatenate all feature columns into a single one
vectorAssembler = VectorAssembler(inputCols=['features'], outputCol="featuresModel")

lr = LogisticRegression(maxIter=10, regParam=0.03,
                        elasticNetParam=0.5, labelCol="indexedTarget", featuresCol="featuresModel")

sparkdn = Pipeline(stages=[labelIndexer,vectorAssembler,lr])

In [None]:
model = sparkdn.fit(df_train)

## Avaliação

In [None]:
predictions = model.transform(df_test)

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedTarget", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (accuracy))

Test Accuracy = 0.879808


In [None]:
preds_and_labels = predictions.select("indexedTarget", "prediction").orderBy('prediction')

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())



[[382.  67.]
 [  8. 167.]]


## Referências

Referências:
- https://docs.databricks.com/applications/machine-learning/preprocess-data/transfer-learning-tensorflow.html
- https://github.com/tntn123/spark_transferlearning/blob/main/main.py