In [5]:
import pyspark
from pyspark import SparkContext

# Get or create a SparkContext
sc = SparkContext.getOrCreate()

# Now you can proceed with using sc
# ... your code ...




In [6]:
nums= sc.parallelize([1,2,3,4])


In [7]:
nums.take(1)

                                                                                

[1]

In [8]:
squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print(f"i is {num}")

i is 1
i is 4
i is 9
i is 16


In [9]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)



In [10]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]

In [11]:
rdd = sc.parallelize(list_p)

In [12]:
rdd.map(lambda x: Row(name=x[0],age=x[1]))

PythonRDD[5] at RDD at PythonRDD.scala:53

In [16]:
data = rdd.map(lambda x: Row(name=x[0],age=int(x[1])))
data.collect()
print(data)

PythonRDD[7] at collect at /var/folders/j_/pzfz31z97kl788jqhck22hv80000gn/T/ipykernel_15816/3380909895.py:2


In [17]:
sample_re = data.take(2)

In [19]:
print(sample_re)

[Row(name='John', age=19), Row(name='Smith', age=29)]


In [20]:
# sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

In [23]:
DF_ppl.take(2)

[Row(name='John', age=19), Row(name='Smith', age=29)]

In [24]:
DF_ppl.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



Following are the steps to build a Machine Learning program with PySpark:

Step 1) Basic operation with PySpark \
Step 2) Data preprocessing\
Step 3) Build a data processing pipeline \
Step 4) Build the classifier: logistic  \
Step 5) Train and evaluate the model \
Step 6) Tune the hyperparameter

In this PySpark Machine Learning tutorial, we will use the adult dataset. The purpose of this tutorial is to learn how to use Pyspark. For more information about the dataset, refer to this tutorial.

Note that, the dataset is not significant and you may think that the computation takes a long time. Spark is designed to process a considerable amount of data. Spark’s performances increase relative to other machine learning libraries when the dataset processed grows larger.

### Step 1) Basic operation with PySpark

In [157]:
#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

24/01/25 22:42:09 WARN SparkContext: The path https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv has been added already. Overwriting of added paths is not supported in the current version.


then, you can read the cvs file with sqlContext.read.csv. You use inferSchema set to True to tell Spark to guess automatically the type of data. By default, it is turn to False.

In [158]:
df = sqlContext.read.csv(SparkFiles.get('adult_data.csv'),header =True,inferSchema = True)

In [159]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [160]:
df.show(5, truncate=False)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv 

If you didn’t set inderShema to True, here is what is happening to the type. There are all in string.

In [161]:
df_string = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=  False)
df_string.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: string (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- hours-per-week: string (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



To convert the continuous variable in the right format, you can use recast the columns. You can use withColumn to tell Spark which column to operate the transformation.

In [35]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



#### Select columns
You can select and show the rows with select and the names of the features. Below, age and fnlwgt are selected.

In [36]:
df.select('age','fnlwgt').show(5)

+---+------+
|age|fnlwgt|
+---+------+
| 25|226802|
| 38| 89814|
| 28|336951|
| 44|160323|
| 18|103497|
+---+------+
only showing top 5 rows



##### Count by group
If you want to count the number of occurence by group, you can chain:

- groupBy()
- count()

together. In the PySpark example below, you count the number of rows by the education level.

In [38]:
df.groupBy('education').count().sort('count',ascending = True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [39]:
df.groupBy('education').count().show()

+------------+-----+
|   education|count|
+------------+-----+
|        10th| 1389|
|     Masters| 2657|
|     5th-6th|  509|
|  Assoc-acdm| 1601|
|   Assoc-voc| 2061|
|     7th-8th|  955|
|         9th|  756|
|     HS-grad|15784|
|   Bachelors| 8025|
|        11th| 1812|
|     1st-4th|  247|
|   Preschool|   83|
|        12th|  657|
|   Doctorate|  594|
|Some-college|10878|
| Prof-school|  834|
+------------+-----+



##### Describe the data
To get a summary statistics, of the data, you can use describe(). It will compute the:

- count
- mean
- standarddeviation
- min
- max

In [40]:
df.describe().show()

24/01/25 14:03:30 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|summary|                 x|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|     capital-loss|    hours-per-week|native-country|income|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|  

                                                                                

In [41]:
df.describe('capital-gain').show()

+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655418|
|    min|                 0|
|    max|             99999|
+-------+------------------+



##### Crosstab computation
In some occasion, it can be interesting to see the descriptive statistics between two pairwise columns. For instance, you can count the number of people with income below or above 50k by education level. This operation is called a crosstab.

In [50]:
df.crosstab('education','capital-gain').show(n=20, truncate=True)

+----------------------+-----+-----+----+-----+-----+----+----+---+----+-----+----+----+-----+-----+----+----+-----+----+----+-----+-----+----+-----+----+----+----+----+----+-----+-----+----+----+----+----+----+----+----+----+-----+----+----+----+----+----+----+----+----+----+-----+-----+----+----+----+----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+---+----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+----+----+---+-----+
|education_capital-gain|    0|10520|1055|10566|10605|1086|1111|114|1151|11678|1173|1264|13550|14084|1409|1424|14344|1455|1471|15020|15024|1506|15831|1639|1731|1797|1831|1848|18481|20051|2009|2036|2050|2062|2105|2174|2176|2202|22040|2228|2290|2329|2346|2354|2387|2407|2414|2463|25124|25236|2538|2580|2597|2635|2653|27828|2829|2885|2907|2936|2961|

##### Drop column
There are two intuitive API to drop columns:

- drop(): Drop a column
- dropna(): Drop NA’s

Below you drop the column education_num

In [None]:
df.drop('education_num').columns

##### Filter data
You can use filter() to apply descriptive statistics in a subset of data. For instance, you can count the number of people above 40 year old

In [51]:
df.filter(df.age > 40).count()

20211

In [52]:
df.count()

48842

##### Descriptive statistics by group
Finally, you can group data by group and compute statistical operations like the mean.

In [53]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [54]:
df.groupBy('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



#### Step 2) Data preprocessing
Data processing is a critical step in machine learning. After you remove garbage data, you get some important insights.

For instance, you know that age is not a linear function with the income. When people are young, their income is usually lower than mid-age. After retirement, a household uses their saving, meaning a decrease in income. To capture this pattern, you can add a square to the age feature

##### Add age square

To add a new feature, you need to:

* Select the column
* Apply the transformation and add it to the DataFrame

In [55]:
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)



You can see that age_square has been successfully added to the data frame. You can change the order of the variables with select. Below, you bring age_square right after age.


In [59]:
df.columns

['x',
 'age',
 'workclass',
 'fnlwgt',
 'education',
 'educational-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income',
 'age_square']

In [60]:
COLUMNS = ['age',
 'age_square',
 'workclass',
 'fnlwgt',
 'education',
 'educational-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']

df = df.select(COLUMNS)


In [61]:
df.first()

Row(age=25, age_square=625.0, workclass='Private', fnlwgt=226802, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', income='<=50K')

Exclude Holand-Netherlands

When a group within a feature has only one observation, it brings no information to the model. On the contrary, it can lead to an error during the cross-validation.

Let’s check the origin of the household

In [63]:
df.filter(df['native-country'] == 'Holand-Netherlands').count()

1

In [64]:
df.groupBy('native-country').agg({'native-country' : 'count'}).sort(asc('count(native-country)')).show()

+--------------------+---------------------+
|      native-country|count(native-country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|                Laos|                   23|
|          Yugoslavia|                   23|
|Outlying-US(Guam-...|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

In [116]:
df_remove = df.filter(df['native-country'] != 'Holand-Netherlands')

#### Step 3) Build a data processing pipeline
Similar to scikit-learn, Pyspark has a pipeline API.

A pipeline is very convenient to maintain the structure of the data. You push the data into the pipeline. Inside the pipeline, various operations are done, the output is used to feed the algorithm.

For instance, one universal transformation in machine learning consists of converting a string to one hot encoder, i.e., one column by a group. One hot encoder is usually a matrix full of zeroes.

The steps to transform the data are very similar to scikit-learn. You need to:

- Index the string to numeric
- Create the one hot encoder
- Transform the data

Two APIs do the job: StringIndexer, OneHotEncoder

1. First of all, you select the string column to index. The inputCol is the name of the column in the dataset. outputCol is the new name given to the transformed column.
2. Fit the data and transform it
3. Create the news columns based on the group. For instance, if there are 10 groups in the feature, the new matrix will have 10 columns, one for each group.

In [69]:
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
#step 1
stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
#step 2
model = stringIndexer.fit(df)
indexed = model.transform(df)
#step 3
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
model = encoder.fit(indexed)
encoded = model.transform(indexed)
encoded.show(2)

+---+----------+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
|age|age_square|workclass|fnlwgt|education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|workclass_encoded|workclass_vec|
+---+----------+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
| 25|     625.0|  Private|226802|     11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|              0.0|(9,[0],[1.0])|
| 38|    1444.0|  Private| 89814|  HS-grad|              9|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|      

### Build the pipeline
You will build a pipeline to convert all the precise features and add them to the final dataset. The pipeline will have four operations, but feel free to add as many operations as you want.

1. Encode the categorical data
2. Index the label feature
3. Add continuous variable
4. Assemble the steps.

Each step is stored in a list named stages. This list will tell the VectorAssembler what operation to perform inside the pipeline.

1. Encode the categorical data

This step is exaclty the same as the above example, except that you loop over all the categorical features.

In [165]:
df_remove.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'educational-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'label']

In [120]:
df_remove = df_remove.drop('age_square')

In [164]:
df_remove = df_remove.withColumnRenamed('income','label')

In [168]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # stages in our Pipeline

for categorical in CATE_FEATURES:
    # Create a StringIndexer
    stringIndexer = StringIndexer(inputCol=categorical, outputCol=categorical + "Index")

    # Create a OneHotEncoder
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categorical + "classVec"])

    # Add both StringIndexer and OneHotEncoder to the stages
    stages += [stringIndexer, encoder]






In [169]:
stages

[StringIndexer_7c729bfe3dcd,
 OneHotEncoder_b19e14397e1c,
 StringIndexer_d6595d5905b3,
 OneHotEncoder_962a73c455e0,
 StringIndexer_0ffcba88f8e2,
 OneHotEncoder_2890ebbccb8f,
 StringIndexer_311fd947e472,
 OneHotEncoder_8d8e776be6b6,
 StringIndexer_2b2b3c7a00f0,
 OneHotEncoder_3d404580cb67,
 StringIndexer_f59c46803304,
 OneHotEncoder_798db67f6d1d,
 StringIndexer_ed34f5ca0ade,
 OneHotEncoder_bb3f54123b40,
 StringIndexer_992f66cdef8f,
 OneHotEncoder_53c92b5f2dba]

2. Index the label feature

Spark, like many other libraries, does not accept string values for the label. You convert the label feature with StringIndexer and add it to the list stages

In [170]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

# Now 'stages' contains all the stages for the pipeline: StringIndexers and OneHotEncoders for categorical features, and a StringIndexer for the label

In [171]:
stages

[StringIndexer_7c729bfe3dcd,
 OneHotEncoder_b19e14397e1c,
 StringIndexer_d6595d5905b3,
 OneHotEncoder_962a73c455e0,
 StringIndexer_0ffcba88f8e2,
 OneHotEncoder_2890ebbccb8f,
 StringIndexer_311fd947e472,
 OneHotEncoder_8d8e776be6b6,
 StringIndexer_2b2b3c7a00f0,
 OneHotEncoder_3d404580cb67,
 StringIndexer_f59c46803304,
 OneHotEncoder_798db67f6d1d,
 StringIndexer_ed34f5ca0ade,
 OneHotEncoder_bb3f54123b40,
 StringIndexer_992f66cdef8f,
 OneHotEncoder_53c92b5f2dba,
 StringIndexer_fa919c1c21b5]

3. Add continuous variable

The inputCols of the VectorAssembler is a list of columns. You can create a new list containing all the new columns. The code below popluate the list with encoded categorical features and the continuous features.

In [172]:
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Assemble the steps.

Finally, you pass all the steps in the VectorAssembler

In [173]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Now that all the steps are ready, you push the data to the pipeline.

In [174]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [111]:
# Example of testing an individual stage
from pyspark.ml.feature import StringIndexer

# Assuming 'yourColumnName' is a column in df_remove
stringIndexer = StringIndexer(inputCol="age_square", outputCol="outputColumn")
model = stringIndexer.fit(df_remove)
indexed = model.transform(df_remove)

indexed.show()


+---+----------+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+------------+
|age|age_square|       workclass|fnlwgt|   education|educational-num|    marital-status|       occupation| relationship|              race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|outputColumn|
+---+----------+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+------------+
| 25|     625.0|         Private|226802|        11th|              7|     Never-married|Machine-op-inspct|    Own-child|             Black|  Male|           0|           0|            40| United-States| <=50K|        16.0|
| 38|    1444.0|         Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|  

In [82]:
from pyspark.sql import functions as F

# Assuming 'df' is your DataFrame
null_counts = df_remove.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_remove.columns])
null_counts.show()


+---+----------+---------+------+---------+---------------+--------------+----------+------------+----+------+------------+------------+--------------+--------------+------+
|age|age_square|workclass|fnlwgt|education|educational-num|marital-status|occupation|relationship|race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----------+---------+------+---------+---------------+--------------+----------+------------+----+------+------------+------------+--------------+--------------+------+
|  0|         0|        0|     0|        0|              0|             0|         0|           0|   0|     0|           0|           0|             0|             0|     0|
+---+----------+---------+------+---------+---------------+--------------+----------+------------+----+------+------------+------------+--------------+--------------+------+



In [83]:
total_nulls = df_remove.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_remove.columns]).agg(*[F.sum(c).alias(c) for c in df.columns])
total_nulls.show()


+---+----------+---------+------+---------+---------------+--------------+----------+------------+----+------+------------+------------+--------------+--------------+------+
|age|age_square|workclass|fnlwgt|education|educational-num|marital-status|occupation|relationship|race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----------+---------+------+---------+---------------+--------------+----------+------------+----+------+------------+------------+--------------+--------------+------+
|  0|         0|        0|     0|        0|              0|             0|         0|           0|   0|     0|           0|           0|             0|             0|     0|
+---+----------+---------+------+---------+---------------+--------------+----------+------------+----+------+------------+------------+--------------+--------------+------+



In [175]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# List of categorical columns
categoricalColumns = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']

stages = []  # stages in the pipeline

# Convert string columns to categorical indices
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]


In [176]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

# Now 'stages' contains all the stages for the pipeline: StringIndexers and OneHotEncoders for categorical features, and a StringIndexer for the label

In [177]:
from pyspark.ml.feature import VectorAssembler

# List of continuous (numeric) feature columns
continuousColumns = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']

# Assembling mixed data types (continuous and categorical) into a feature vector
assemblerInputs = [c + "classVec" for c in categoricalColumns] + continuousColumns
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


In [178]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)  # Assuming 'df_remove' is your DataFrame
model = pipelineModel.transform(df_remove)


In [179]:
model.take(1)

[Row(age=25, workclass='Private', fnlwgt=226802, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', label='<=50K', workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 93: 25.0, 94: 226802.0, 96: 7.0, 98: 40.0}))]

In [180]:
model.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- workclassIndex: double (nullable = false)
 |-- workclassclassVec: vector (nullable = true)
 |-- educationIndex: double (nullable = false)
 |-- educationclassVec: vector (nullable = true)
 |-- marital-statusIndex: double (nullable = false)
 |-- marital-statusclassVec: vector (nullable = true)
 |-- occupationIndex: double (nullable = false)
 |-- occupationcla

#### Step 4) Build the classifier: logistic
To make the computation faster, you convert model to a DataFrame.

You need to select newlabel and features from model using map.

In [181]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

You are ready to create the train data as a DataFrame. You use the sqlContext

In [182]:
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

                                                                                

In [183]:
df_train.show(2)


[Stage 537:>                                                        (0 + 1) / 1]

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



                                                                                

##### Create a train/test set

You split the dataset 80/20 with randomS

In [184]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Let’s count how many people with income below/above 50k in both training and test set

In [185]:
train_data.groupby('label').agg({'label': 'count'}).show()



+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       29731|
|  1.0|        9294|
+-----+------------+



                                                                                

In [186]:
test_data.groupby('label').agg({'label': 'count'}).show()




+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        7423|
|  1.0|        2393|
+-----+------------+



                                                                                

#### Build the logistic regressor
Last but not least, you can build the classifier. Pyspark has an API called LogisticRegression to perform logistic regression.

You initialize lr by indicating the label column and feature columns. You set a maximum of 10 iterations and add a regularization parameter with a value of 0.3. Note that in the next section, you will use cross-validation with a parameter grid to tune the model

In [187]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

24/01/25 22:54:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/01/25 22:54:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

You can see the coefficients from the regression



In [188]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.012920702062341053,-0.0737171157653256,0.017825872813620844,-0.1462968212684535,-0.03305607380614736,0.29546029790146017,0.23527107113244636,-0.3246323110859749,-0.1424754923698266,-0.03964849047232577,0.22925384009602537,0.35749331377536936,0.017396692098775382,-0.21762284336025045,0.027851671518641076,-0.21538923787633896,-0.27251992538417275,0.531533059039377,-0.2540090473830295,-0.15023729464936955,0.5335998661837736,-0.20112242801551708,-0.22431043695341493,0.40535711379766115,-0.309129603249302,-0.1742905347308898,-0.18579754276477667,-0.15370921289051548,-0.12587573851259398,0.2471040014498598,-0.04026978005757287,0.34160262924942025,-0.09801767555610637,0.06068901377182592,-0.24537767398227364,-0.15041642187142096,-0.14680879508228098,-0.0862566027670031,-0.22754246095258393,-0.2928804399943198,0.1369175896782796,0.10226671321851488,-0.268176782237584,0.34059845173423453,-0.18538112204779592,-0.2761487807095205,-0.22120317211422724,0.4461091766004014,0.0869031

##### Step 5) Train and evaluate the model
To generate prediction for your test set,

You can use linearModel with transform() on test_data

In [189]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [190]:
# You can print the elements in predictions
predictions.printSchema()



root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



You are interested by the label, prediction and the probability

In [191]:
selected = predictions.select("label", "prediction", "probability")
selected.show(20)

[Stage 557:>                                                        (0 + 1) / 1]

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.94749111707796...|
|  0.0|       0.0|[0.94176544257712...|
|  0.0|       1.0|[0.34135333951602...|
|  0.0|       0.0|[0.93261351471201...|
|  0.0|       0.0|[0.92677852659477...|
|  0.0|       0.0|[0.68162131221420...|
|  0.0|       0.0|[0.68966231801780...|
|  0.0|       1.0|[0.41485396199618...|
|  0.0|       0.0|[0.86253410740179...|
|  0.0|       0.0|[0.87493076283524...|
|  0.0|       0.0|[0.74764395212137...|
|  0.0|       0.0|[0.86279313920564...|
|  0.0|       0.0|[0.78727785328466...|
|  0.0|       0.0|[0.84719580286115...|
|  0.0|       0.0|[0.81715479158045...|
|  0.0|       0.0|[0.84258187168281...|
|  0.0|       0.0|[0.86417222938566...|
|  0.0|       0.0|[0.89703863672127...|
|  0.0|       0.0|[0.85490713968171...|
|  0.0|       0.0|[0.63747443807667...|
+-----+----------+--------------------+
only showing top 20 rows



                                                                                

##### Evaluate the model
You need to look at the accuracy metric to see how well (or bad) the model performs. Currently, there is no API to compute the accuracy measure in Spark. The default value is the ROC, receiver operating characteristic curve. It is a different metrics that take into account the false positive rate.

Before you look at the ROC, let’s construct the accuracy measure. You are more familiar with this metric. The accuracy measure is the sum of the correct prediction over the total number of observations.

You create a DataFrame with the label and the `prediction.

In [192]:
cm = predictions.select("label", "prediction")


You can check the number of class in the label and the prediction


In [193]:
cm.groupby('label').agg({'label': 'count'}).show()



+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        7423|
|  1.0|        2393|
+-----+------------+



                                                                                

In [194]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()




+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             8773|
|       1.0|             1043|
+----------+-----------------+



                                                                                

For instance, in the test set, there is 1578 household with an income above 50k and 5021 below. The classifier, however, predicted 617 households with income above 50k.

You can compute the accuracy by computing the count when the label are correctly classified over the total number of rows.

In [195]:

cm.filter(cm.label == cm.prediction).count() / cm.count()

                                                                                

0.8213121434392828

You can wrap everything together and write a function to compute the accuracy.


In [196]:

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
    
accuracy_m(model = linearModel)



Model accuracy: 82.131%


                                                                                

#### ROC metrics
The module BinaryClassificationEvaluator includes the ROC measures. The Receiver Operating Characteristic curve is another common tool used with binary classification. It is very similar to the precision/recall curve, but instead of plotting precision versus recall, the ROC curve shows the true positive rate (i.e. recall) against the false positive rate. The false positive rate is the ratio of negative instances that are incorrectly classified as positive. It is equal to one minus the true negative rate. The true negative rate is also called specificity. Hence the ROC curve plots sensitivity (recall) versus 1 – specificity

In [197]:


### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

                                                                                

0.8924748465074416
areaUnderROC


In [198]:
print(evaluator.evaluate(predictions))


                                                                                

0.8924707931926159


##### Step 6) Tune the hyperparameter
Last but not least, you can tune the hyperparameters. Similar to scikit learn you create a parameter grid, and you add the parameters you want to tune.

To reduce the time of the computation, you only tune the regularization parameter with only two values.

In [199]:


from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Finally, you evaluate the model with using the cross valiation method with 5 folds. It takes around 16 minutes to train.

In [200]:


from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

                                                                                

Time to train model: 234.361 seconds


The best regularization hyperparameter is 0.01, with an accuracy of 85.316 percent.

In [203]:


accuracy_m(model = cvModel)





Model accuracy: 84.271%


                                                                                

You can exctract the recommended parameter by chaining cvModel.bestModel with extractParamMap()


In [204]:

bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_c01317f9108c', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_c01317f9108c', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_c01317f9108c', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_c01317f9108c', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_c01317f9108c', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_c01317f9108c', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_c01317f9108c', name='maxBlockSizeInMB', doc='maximum memory in MB for s

24/01/26 04:59:39 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 913762 ms exceeds timeout 120000 ms
24/01/26 04:59:39 WARN SparkContext: Killing executors is not supported by current scheduler.
24/01/26 04:59:43 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o

In [None]:
# Remember to stop the SparkContext when you're done
sc.stop()