### Start the connection Spark

In [1]:
from pyspark.sql import SparkSession

# Create a SparkSession with specific configurations
spark = SparkSession.builder \
    .appName("Spark Application") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.driver.cores", "2") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "4") \
    .config("spark.python.worker.timeout", "600") \
    .getOrCreate()

### Load the data

In [2]:
dataset_location = "/home/jovyan/work/adult-preprocessed.data"

In [3]:
dataset = spark.read.format('csv') \
               .option('inferSchema', True) \
               .option('header', False) \
               .option('sep', ',') \
               .load(dataset_location)

In [4]:
dataset1 = dataset \
            .withColumnRenamed('_c0', 'idade') \
            .withColumnRenamed('_c1', 'classe_trabalho') \
            .withColumnRenamed('_c2', 'final_weight') \
            .withColumnRenamed('_c3', 'escolaridade') \
            .withColumnRenamed('_c4', 'escolaridade_num') \
            .withColumnRenamed('_c5', 'estado_civil') \
            .withColumnRenamed('_c6', 'ocupacao') \
            .withColumnRenamed('_c7', 'relacionamento_householder') \
            .withColumnRenamed('_c8', 'raca') \
            .withColumnRenamed('_c9', 'sexo') \
            .withColumnRenamed('_c10', 'ganho_capital') \
            .withColumnRenamed('_c11', 'perda_capital') \
            .withColumnRenamed('_c12', 'jornada_trabalho') \
            .withColumnRenamed('_c13', 'nacionalidade') \
            .withColumnRenamed('_c14', 'renda_anual')

In [5]:
from pyspark.sql.functions import col

dataset1 = dataset1.withColumn("classe_trabalho", col("classe_trabalho").cast("string"))\
        .withColumn("escolaridade", col("escolaridade").cast("string"))\
        .withColumn("estado_civil", col("estado_civil").cast("string"))\
        .withColumn("ocupacao", col("ocupacao").cast("string"))\
        .withColumn("relacionamento_householder", col("relacionamento_householder").cast("string"))\
        .withColumn("raca", col("raca").cast("string"))\
        .withColumn("sexo", col("sexo").cast("string"))\
        .withColumn("nacionalidade", col("nacionalidade").cast("string"))\
        .withColumn("renda_anual", col("renda_anual").cast("string"))

### Data Analysis

#### Obtenha o valor máximo de capital líquido (ganho de capital - perda de capital)

In [6]:
dataset1.selectExpr("max(ganho_capital - perda_capital)").show()

+------------------------------------+
|max((ganho_capital - perda_capital))|
+------------------------------------+
|                               99999|
+------------------------------------+



#### Obtenha a idade média das pessoas viúvas com jornada de trabalho acima de 20 horas semanais

In [7]:
dataset1 \
    .filter("jornada_trabalho > 20") \
    .filter(dataset1["estado_civil"] == "Widowed") \
    .selectExpr("avg(idade)") \
    .show()

+------------------+
|        avg(idade)|
+------------------+
|56.394101876675606|
+------------------+



#### Obtendo valores distintos para a combinação de sexo e raça para pessoas com idade acima de 60 anos

In [8]:
dataset1 \
    .filter("idade > 60") \
    .select("sexo", "raca") \
    .distinct() \
    .show()

+------+------------------+
|  sexo|              raca|
+------+------------------+
|  Male|             White|
|Female|Asian-Pac-Islander|
|Female|             White|
|Female|Amer-Indian-Eskimo|
|  Male|             Other|
|  Male|             Black|
|  Male|Asian-Pac-Islander|
|  Male|Amer-Indian-Eskimo|
|Female|             Other|
|Female|             Black|
+------+------------------+



#### Obtenha a média de capital líquido (ganho de capital - perda de capital) por escolaridade de pessoas com idade acima de 30 anos

In [9]:
from pyspark.sql.functions import avg, expr

dataset1 \
    .filter('idade > 30') \
    .groupBy('escolaridade') \
    .agg(avg(expr('ganho_capital - perda_capital'))) \
    .show()

+------------+------------------------------------+
|escolaridade|avg((ganho_capital - perda_capital))|
+------------+------------------------------------+
|     Masters|                  2643.3674540682414|
|        10th|                   533.1254612546126|
|     5th-6th|                  132.26141078838174|
|  Assoc-acdm|                   732.9663072776281|
|   Assoc-voc|                   655.4662638469285|
|     7th-8th|                  196.75375939849624|
|         9th|                   430.9438202247191|
|     HS-grad|                   638.0167474048443|
|   Bachelors|                  2117.6493677555322|
|        11th|                   262.4105461393597|
|     1st-4th|                   94.40579710144928|
|   Preschool|                   72.71794871794872|
|        12th|                  468.90909090909093|
|   Doctorate|                   4751.202046035805|
|Some-college|                   776.4566966398486|
| Prof-school|                  10928.190291262135|
+-----------

#### Obtenha a combinação de escolaridade e ocupação com menor jornada de trabalho média que tenham renda maior de 50 mil dólares por ano

In [10]:
dataset1 \
    .filter(dataset1['renda_anual'] ==  '>50K') \
    .groupBy('escolaridade', 'ocupacao') \
    .agg(avg('jornada_trabalho').alias('jornada_media')) \
    .orderBy('jornada_media') \
    .limit(1) \
    .show()

+------------+-------------+-------------+
|escolaridade|     ocupacao|jornada_media|
+------------+-------------+-------------+
|     Masters|Other-service|         15.0|
+------------+-------------+-------------+



#### O atributo final_weight representa quantas vezes uma determinada leitura do censo é repetida, ou seja, o peso daquela linha no conjunto de dados. Itere sobre a lista das 5 maiores nacionalidades (baseado na soma do final weight) e para cada uma, obtenha os três estados civis com maior jornada de trabalho médio. Utilize a API SQL.

In [11]:
dataset1.createOrReplaceTempView("dataset")

In [12]:
nacionalidades = spark.sql('''
    select nacionalidade, sum(final_weight) qtde
    from dataset
    group by nacionalidade
    order by qtde desc
    limit 5
''').collect()

for row in nacionalidades:
    nacionalidade = row["nacionalidade"]
    print(f"nacionalidade: {nacionalidade}")

    res = spark.sql('''
        select estado_civil, avg(jornada_trabalho) as jornada_media
        from dataset
        where nacionalidade = '%s'
        group by estado_civil
        order by jornada_media desc
        limit 3
    ''' % nacionalidade).collect()

    for row2 in res:
        print(f"{row2['estado_civil']}: {row2['jornada_media']}")
    print("-"*50)

nacionalidade: United-States
Married-civ-spouse: 43.391457211250746
Divorced: 41.30946660259491
Married-AF-spouse: 41.130434782608695
--------------------------------------------------
nacionalidade: Mexico
Divorced: 42.45454545454545
Married-civ-spouse: 41.48874598070739
Separated: 39.75757575757576
--------------------------------------------------
nacionalidade: ?
Married-spouse-absent: 43.80952380952381
Married-civ-spouse: 43.429577464788736
Widowed: 42.92857142857143
--------------------------------------------------
nacionalidade: Philippines
Separated: 43.25
Married-civ-spouse: 40.922330097087375
Married-spouse-absent: 39.45454545454545
--------------------------------------------------
nacionalidade: El-Salvador
Separated: 38.0
Married-civ-spouse: 37.67567567567568
Divorced: 37.25
--------------------------------------------------


### Data Modeling

#### Check the columns

In [13]:
for i in dataset1.columns:
    print(f"{dataset1.select(i).dtypes}")
    print(f"{dataset1.filter(dataset1[i].isNull()).count()}")
    print("-"*20)

[('idade', 'int')]
0
--------------------
[('classe_trabalho', 'string')]
0
--------------------
[('final_weight', 'int')]
0
--------------------
[('escolaridade', 'string')]
0
--------------------
[('escolaridade_num', 'int')]
0
--------------------
[('estado_civil', 'string')]
0
--------------------
[('ocupacao', 'string')]
0
--------------------
[('relacionamento_householder', 'string')]
0
--------------------
[('raca', 'string')]
0
--------------------
[('sexo', 'string')]
0
--------------------
[('ganho_capital', 'int')]
0
--------------------
[('perda_capital', 'int')]
0
--------------------
[('jornada_trabalho', 'int')]
0
--------------------
[('nacionalidade', 'string')]
0
--------------------
[('renda_anual', 'string')]
0
--------------------


### Check the categorical labels

In [14]:
dataset1.printSchema()

root
 |-- idade: integer (nullable = true)
 |-- classe_trabalho: string (nullable = true)
 |-- final_weight: integer (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- escolaridade_num: integer (nullable = true)
 |-- estado_civil: string (nullable = true)
 |-- ocupacao: string (nullable = true)
 |-- relacionamento_householder: string (nullable = true)
 |-- raca: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- ganho_capital: integer (nullable = true)
 |-- perda_capital: integer (nullable = true)
 |-- jornada_trabalho: integer (nullable = true)
 |-- nacionalidade: string (nullable = true)
 |-- renda_anual: string (nullable = true)



In [15]:
categorical_features = ["classe_trabalho", "escolaridade", "estado_civil", 
                       "ocupacao", "relacionamento_householder", "raca",
                       "sexo", "nacionalidade", "renda_anual"]

#### Preprocessing the data

In [16]:
from pyspark.sql.functions import regexp_replace

In [17]:
from typing import List
from pyspark.sql import SparkSession, DataFrame

def remove_accents(categorical_features: List[str], dataset: DataFrame) -> DataFrame:
    for categorical_feature in categorical_features:
        if categorical_feature != "renda_anual":
            if categorical_feature == "nacionalidade" or categorical_feature == "classe_trabalho" or categorical_feature == "ocupacao":
                dataset = dataset.withColumn(
                    categorical_feature,
                    regexp_replace(categorical_feature, r'\?', 'indefinido')  # Using raw string for escaping '?'
                )
            
            # General replacement for all other features
            dataset = dataset.withColumn(
                categorical_feature,
                regexp_replace(categorical_feature, r'[^a-zA-Z0-9]', '')  # Regex to remove non-alphanumeric characters
            )
    
    return dataset

def verify_empty_values(categorical_features: List[str], dataset: DataFrame) -> DataFrame:
    for categorical_feature in categorical_features:
        for qtd_rows in range(len(dataset.select(categorical_feature).distinct().collect())):
            if len(dataset.select(categorical_feature).distinct().collect()[qtd_rows][0]) == 0:
                print(f"Column with string empty: {categorical_feature}")
                print("-"*20)

##### Remove Accents

In [18]:
dataset_ok = remove_accents(
    categorical_features = categorical_features, 
    dataset = dataset1
)

##### Verify empty values

In [19]:
verify_empty_values(
    categorical_features = categorical_features, 
    dataset = dataset_ok
)

#### Create some features

In [20]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

def create_ohe_stages(categorical_features, drop_last=False):

    stages = []
    
    for feature in categorical_features:
        # Define StringIndexer stage
        indexer = StringIndexer(
            inputCol=feature,
            outputCol=f"{feature}_index"
        )
        stages.append(indexer)
        
        # Define OneHotEncoder stage
        encoder = OneHotEncoder(
            inputCol=f"{feature}_index",
            outputCol=f"{feature}_vec",
            dropLast=drop_last
        )
        stages.append(encoder)
    
    return stages

In [21]:
numerical_features = ["idade", "final_weight", "escolaridade_num", 
                      "ganho_capital", "perda_capital", "jornada_trabalho"]

In [22]:
from pyspark.ml.feature import VectorAssembler

ohe_stages = create_ohe_stages(categorical_features)

pipeline_ohe = Pipeline(stages=ohe_stages)

dataset_transformed = pipeline_ohe.fit(dataset_ok).transform(dataset_ok)

#### Train and Test the Clustering strategy

In [23]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
    
# Define VectorAssembler to combine all features into a single vector
assembler = VectorAssembler(
    inputCols=[f"{col}_vec" for col in categorical_features] + numerical_features,
    outputCol="features"
)

kmeans = KMeans(featuresCol="features", predictionCol="prediction")

# Define pipeline
pipeline = Pipeline(stages=[assembler, kmeans])

# Define the ClusteringEvaluator
evaluator = ClusteringEvaluator(
    metricName="silhouette", 
    featuresCol="features", 
    predictionCol="prediction"
)

# Define parameter grid for KMeans
grid = ParamGridBuilder()\
    .addGrid(kmeans.k, [2, 4, 6])\
    .addGrid(kmeans.seed, [1, 20, 40])\
    .build()

# Set up CrossValidator
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=10
)

# Fit the model
cv_model = crossval.fit(dataset_transformed)

In [24]:
# Get the best model and the best K of the Kmeans
best_model = cv_model.bestModel
best_k = best_model.stages[-1].getK()

print(f"Best k: {best_k}")

Best k: 2


In [25]:
# # Extract silhouette score from the cross-validation results
avg_metrics = cv_model.avgMetrics
best_index = avg_metrics.index(max(avg_metrics))  # Index of the best silhouette score
best_silhouette = avg_metrics[best_index]

print(f"Best silhouette score: {best_silhouette}")

Best silhouette score: 0.7447907513941895


In [26]:
# Transform the dataset with the best model
predictions = best_model.transform(dataset_transformed)

#### Create a TempView

In [27]:
predictions_selected = predictions.select(
    categorical_features + numerical_features + ["prediction"]
)

In [28]:
predictions_selected = predictions_selected.withColumnRenamed(
    "prediction",
    "target"
)

In [29]:
predictions_selected.createOrReplaceTempView("predictions_selected")

# -------------------------

#### Classification Process

In [47]:
from xgboost.spark import SparkXGBClassifier

#### Prepare dataset for classification process

In [48]:
dataset_clustered = spark.sql("select * from predictions_selected")

##### Remove Accents

In [49]:
dataset_clustered_ok = remove_accents(
    categorical_features = categorical_features, 
    dataset = dataset_clustered
)

##### Verify empty values

In [50]:
verify_empty_values(
    categorical_features = categorical_features, 
    dataset = dataset_clustered_ok
)

In [51]:
from pyspark.ml.feature import VectorAssembler

ohe_stages = create_ohe_stages(categorical_features)

pipeline_ohe = Pipeline(stages=ohe_stages)

dataset_transformed = pipeline_ohe.fit(dataset_clustered_ok).transform(dataset_clustered_ok)

In [52]:
assembler = VectorAssembler(
    inputCols=[f"{col}_vec" for col in categorical_features] + numerical_features,
    outputCol="features"
)

#### Train and test Classification strategy

In [58]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define the XGBoost Classifier
xgb_classification = SparkXGBClassifier(
    features_col="features",
    label_col="target",
    num_workers=2
)

# Define the Pipeline
pipeline = Pipeline(
    stages=[assembler, xgb_classification]
)

# Define the evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="target", 
    metricName="accuracy"
)

# Define parameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(xgb_classification.max_depth, [5, 10])
             .addGrid(xgb_classification.n_estimators, [100, 200])
             .build())

# Define the CrossValidator
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=10
)

# Fit the model
cv_model = crossval.fit(dataset_transformed)

INFO:XGBoost-PySpark:Running xgboost-2.1.1 on 2 workers with
	booster params: {'device': 'cpu', 'objective': 'binary:logistic', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.1 on 2 workers with
	booster params: {'device': 'cpu', 'objective': 'binary:logistic', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.1 on 2 workers with
	booster params: {'device': 'cpu', 'objective': 'binary:logistic', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.1 on 2 workers with
	bo

In [61]:
# Get the best model
best_model = cv_model.bestModel

In [62]:
# Extract accuracy score from the cross-validation results
avg_metrics = cv_model.avgMetrics
best_index = avg_metrics.index(max(avg_metrics))  # Index of the best accuracy score
best_accuracy = avg_metrics[best_index]

print(f"Best accuracy score: {best_accuracy}")

Best accuracy score: 0.9988953754677633


In [68]:
# Transform the dataset with the best model
predictions = best_model.transform(dataset_transformed)

#### Extract of the accuracy score

In [67]:
performance = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
accuracy = performance.evaluate(predictions)
print(accuracy)

0.9999692884125181


#### Create a TempView

In [69]:
predictions_renamed = predictions.withColumnRenamed(
    "target",
    "prediction_kmeans"
)

In [70]:
predictions_renamed = predictions_renamed.withColumnRenamed(
    "prediction",
    "prediction_xgboost"
)

In [71]:
predictions_selected = predictions_renamed.select(
    categorical_features + numerical_features + ["prediction_kmeans", "prediction_xgboost"]
)

In [73]:
predictions_selected.createOrReplaceTempView("class_predictions")

#### Select of the classification predictions

In [75]:
spark.sql("select * from class_predictions").show()

+---------------+------------+-------------------+----------------+--------------------------+----------------+------+-------------+-----------+-----+------------+----------------+-------------+-------------+----------------+-----------------+------------------+
|classe_trabalho|escolaridade|       estado_civil|        ocupacao|relacionamento_householder|            raca|  sexo|nacionalidade|renda_anual|idade|final_weight|escolaridade_num|ganho_capital|perda_capital|jornada_trabalho|prediction_kmeans|prediction_xgboost|
+---------------+------------+-------------------+----------------+--------------------------+----------------+------+-------------+-----------+-----+------------+----------------+-------------+-------------+----------------+-----------------+------------------+
|       Stategov|   Bachelors|       Nevermarried|     Admclerical|               Notinfamily|           White|  Male| UnitedStates|      <=50K|   39|       77516|              13|         2174|            0|   

### Stop the connection Spark

In [None]:
spark.stop()