In [None]:
pip install pyspark



In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('crew_requirement').getOrCreate()

In [None]:
df = spark.read.csv('/content/cruise_ship_info.csv', inferSchema = True, header = True)

In [None]:
df.show(5)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'Cruise_line', outputCol = "Cruise_line_index")

df_indexed = indexer.fit(df).transform(df)

In [None]:
df_indexed.show(10)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-----------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Cruise_line_index|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-----------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|             16.0|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|             16.0|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|              1.0|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|              1.0|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|              1.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|       

In [None]:
df_indexed.select('Cruise_line', 'Cruise_line_index').distinct().show()

+-----------------+-----------------+
|      Cruise_line|Cruise_line_index|
+-----------------+-----------------+
|            Costa|              5.0|
|        Norwegian|              4.0|
|              MSC|              7.0|
|           Orient|             19.0|
|Regent_Seven_Seas|             10.0|
|           Disney|             18.0|
|         Windstar|             15.0|
|              P&O|              8.0|
|  Royal_Caribbean|              0.0|
|         Seabourn|             14.0|
|             Star|              9.0|
|         Princess|              2.0|
|          Oceania|             13.0|
|          Azamara|             16.0|
| Holland_American|              3.0|
|           Cunard|             12.0|
|        Celebrity|              6.0|
|        Silversea|             11.0|
|          Crystal|             17.0|
|         Carnival|              1.0|
+-----------------+-----------------+



In [None]:
df_indexed.groupby("Cruise_line").count().show()

+-----------------+-----+
|      Cruise_line|count|
+-----------------+-----+
|            Costa|   11|
|              P&O|    6|
|           Cunard|    3|
|Regent_Seven_Seas|    5|
|              MSC|    8|
|         Carnival|   22|
|          Crystal|    2|
|           Orient|    1|
|         Princess|   17|
|        Silversea|    4|
|         Seabourn|    3|
| Holland_American|   14|
|         Windstar|    3|
|           Disney|    2|
|        Norwegian|   13|
|          Oceania|    3|
|          Azamara|    2|
|        Celebrity|   10|
|             Star|    6|
|  Royal_Caribbean|   23|
+-----------------+-----+



In [None]:
df_indexed.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'Cruise_line_index']

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'Cruise_line_index'], outputCol = 'features')

output = assembler.transform(df_indexed)

In [None]:
output.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-----------------+--------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Cruise_line_index|            features|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-----------------+--------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|             16.0|[6.0,30.276999999...|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|             16.0|[6.0,30.276999999...|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|              1.0|[26.0,47.262,14.8...|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|              1.0|[11.0,110.0,29.74...|
|    Destiny|   Carnival| 17|           101.353|

In [None]:
train_data , test_data = output.randomSplit([.8,.2])
train_data.describe().show()

+-------+---------+-----------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+
|summary|Ship_name|Cruise_line|              Age|          Tonnage|        passengers|            length|           cabins|passenger_density|             crew|Cruise_line_index|
+-------+---------+-----------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+
|  count|      125|        125|              125|              125|               125|               125|              125|              125|              125|              125|
|   mean|     NULL|       NULL|           16.024|71.55742400000001|18.649600000000007| 8.135919999999997|8.913360000000004|         39.62568|7.772080000000005|             5.04|
| stddev|     NULL|       NULL|8.028642274643552|38.48742195882334|10.047567704384653|1.8111180364447494|4.614

In [None]:
from pyspark.ml.regression import LinearRegression
crew_req = LinearRegression(featuresCol= 'features', labelCol = 'crew')

model = crew_req.fit(train_data)



In [None]:
result = model.evaluate(train_data)

In [None]:
result.r2

0.9498470837158337

In [None]:
pred = model.transform(test_data)

In [None]:
result_test = model.evaluate(test_data)

In [None]:
result_test.r2

0.8410521293456138

In [None]:
pred.show()

+------------+-----------------+---+------------------+----------+------+------+-----------------+-----+-----------------+--------------------+------------------+
|   Ship_name|      Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density| crew|Cruise_line_index|            features|        prediction|
+------------+-----------------+---+------------------+----------+------+------+-----------------+-----+-----------------+--------------------+------------------+
|   Caribbean|         Princess|  9|             116.0|      26.0|  9.51|  13.0|            44.62| 11.0|              2.0|[9.0,116.0,26.0,9...| 10.93086351574195|
|    Conquest|         Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99| 19.1|              1.0|[11.0,110.0,29.74...| 11.81333995623822|
|        Dawn|        Norwegian| 11|              90.0|      22.4|  9.65|  11.2|            40.18| 11.0|              4.0|[11.0,90.0,22.4,9...| 9.946650129430145|
|        Dawn|        

In [None]:
pip install ucimlrepo



In [None]:
from ucimlrepo import fetch_ucirepo

# fetch dataset
adult = fetch_ucirepo(id=2)

# data (as pandas dataframes)
X = adult.data.features
y = adult.data.targets

# metadata
print(adult.metadata)

# variable information
print(adult.variables)


{'uci_id': 2, 'name': 'Adult', 'repository_url': 'https://archive.ics.uci.edu/dataset/2/adult', 'data_url': 'https://archive.ics.uci.edu/static/public/2/data.csv', 'abstract': 'Predict whether income exceeds $50K/yr based on census data. Also known as "Census Income" dataset. ', 'area': 'Social Science', 'tasks': ['Classification'], 'characteristics': ['Multivariate'], 'num_instances': 48842, 'num_features': 14, 'feature_types': ['Categorical', 'Integer'], 'demographics': ['Age', 'Income', 'Education Level', 'Other', 'Race', 'Sex'], 'target_col': ['income'], 'index_col': None, 'has_missing_values': 'yes', 'missing_values_symbol': 'NaN', 'year_of_dataset_creation': 1996, 'last_updated': 'Mon Aug 07 2023', 'dataset_doi': '10.24432/C5XW20', 'creators': ['Barry Becker', 'Ronny Kohavi'], 'intro_paper': None, 'additional_info': {'summary': 'Extraction was done by Barry Becker from the 1994 Census database.  A set of reasonably clean records was extracted using the following conditions: ((AAG

In [None]:
X.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba


In [None]:
data = spark.read.csv('/content/adult.data')
data.show()

+---+-----------------+-------+-------------+---+--------------------+------------------+--------------+-------------------+-------+------+----+----+--------------+------+
|_c0|              _c1|    _c2|          _c3|_c4|                 _c5|               _c6|           _c7|                _c8|    _c9|  _c10|_c11|_c12|          _c13|  _c14|
+---+-----------------+-------+-------------+---+--------------------+------------------+--------------+-------------------+-------+------+----+----+--------------+------+
| 39|        State-gov|  77516|    Bachelors| 13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|  2174|   0|  40| United-States| <=50K|
| 50| Self-emp-not-inc|  83311|    Bachelors| 13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|     0|   0|  13| United-States| <=50K|
| 38|          Private| 215646|      HS-grad|  9|            Divorced| Handlers-cleaners| Not-in-family|              White|   Male|     0| 

In [None]:
labels = ['age', 'workclass', 'fnlwgt', 'education', 'numbers', 'marital', 'occupation', 'relation', 'race', 'gender', 'gain', 'loss', 'hourlypay', 'country', 'income']

In [None]:
df = data.toDF(*labels)


In [None]:
df.show(5)

+---+-----------------+-------+----------+-------+-------------------+------------------+--------------+------+-------+-----+----+---------+--------------+------+
|age|        workclass| fnlwgt| education|numbers|            marital|        occupation|      relation|  race| gender| gain|loss|hourlypay|       country|income|
+---+-----------------+-------+----------+-------+-------------------+------------------+--------------+------+-------+-----+----+---------+--------------+------+
| 39|        State-gov|  77516| Bachelors|     13|      Never-married|      Adm-clerical| Not-in-family| White|   Male| 2174|   0|       40| United-States| <=50K|
| 50| Self-emp-not-inc|  83311| Bachelors|     13| Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|    0|   0|       13| United-States| <=50K|
| 38|          Private| 215646|   HS-grad|      9|           Divorced| Handlers-cleaners| Not-in-family| White|   Male|    0|   0|       40| United-States| <=50K|
| 53|          Private

In [None]:
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- numbers: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gain: string (nullable = true)
 |-- loss: string (nullable = true)
 |-- hourlypay: string (nullable = true)
 |-- country: string (nullable = true)
 |-- income: string (nullable = true)



In [None]:
from pyspark.sql.functions import col
new_df = df.withColumn('age' , col('age').cast('integer'))

In [None]:
new_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- numbers: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gain: string (nullable = true)
 |-- loss: string (nullable = true)
 |-- hourlypay: string (nullable = true)
 |-- country: string (nullable = true)
 |-- income: string (nullable = true)



In [None]:

for i in [ 'fnlwgt' , 'numbers', 'gain', 'loss', 'hourlypay']:
  new_df = new_df.withColumn(i , col(i).cast('integer'))

In [None]:
from pyspark.sql.functions import *

new_df.select([count(when(col(c).isNull(), c )).alias(c) for c in new_df.columns]).show()

+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|age|workclass|fnlwgt|education|numbers|marital|occupation|relation|race|gender|gain|loss|hourlypay|country|income|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|  0|        0|     0|        0|      0|      0|         0|       0|   0|     0|   0|   0|        0|      0|     0|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+



In [None]:
df.select('workclass').distinct().show()

+-----------------+
|        workclass|
+-----------------+
|        State-gov|
|      Federal-gov|
| Self-emp-not-inc|
|        Local-gov|
|          Private|
|                ?|
|     Self-emp-inc|
|      Without-pay|
|     Never-worked|
+-----------------+



In [None]:
df = new_df.replace(" ?" , None)

In [None]:
from pyspark.sql.functions import *

df.select([count(when(col(c).isNull(), c )).alias(c) for c in df.columns]).show()

+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|age|workclass|fnlwgt|education|numbers|marital|occupation|relation|race|gender|gain|loss|hourlypay|country|income|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|  0|     1836|     0|        0|      0|      0|      1843|       0|   0|     0|   0|   0|        0|    583|     0|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+



In [None]:
df.groupby('occupation').count().show()

+------------------+-----+
|        occupation|count|
+------------------+-----+
|   Farming-fishing|  994|
|              NULL| 1843|
| Handlers-cleaners| 1370|
|    Prof-specialty| 4140|
|      Adm-clerical| 3770|
|   Exec-managerial| 4066|
|      Craft-repair| 4099|
|             Sales| 3650|
|      Tech-support|  928|
|  Transport-moving| 1597|
|   Protective-serv|  649|
|      Armed-Forces|    9|
| Machine-op-inspct| 2002|
|     Other-service| 3295|
|   Priv-house-serv|  149|
+------------------+-----+



In [None]:
df = df.fillna(" United-States", subset = ['country'])

In [None]:
df = df.fillna(" Private", subset = ['workclass'])

In [None]:
df = df.fillna(" Prof-specialty", subset = ['occupation'])

In [None]:
df.select([count(when(col(c).isNull(), c )).alias(c) for c in df.columns]).show()

+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|age|workclass|fnlwgt|education|numbers|marital|occupation|relation|race|gender|gain|loss|hourlypay|country|income|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|  0|        0|     0|        0|      0|      0|         0|       0|   0|     0|   0|   0|        0|      0|     0|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+



In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer , VectorAssembler



In [None]:
df.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'numbers',
 'marital',
 'occupation',
 'relation',
 'race',
 'gender',
 'gain',
 'loss',
 'hourlypay',
 'country',
 'income']

In [None]:
categorical_cols = [ 'workclass',
 'education',
 'marital',
 'occupation',
 'relation',
 'race',
 'gender',
 'country']
numeric_cols = [ 'age' ,'fnlwgt' , 'numbers', 'gain', 'loss', 'hourlypay']
label = 'income'


In [None]:
indexer = [StringIndexer(inputCol = c , outputCol = f"{c}_index", handleInvalid = 'keep') for c in categorical_cols]


In [None]:
label_indexer = StringIndexer(inputCol = 'income', outputCol = 'label', handleInvalid = 'keep')

In [None]:
assembler = VectorAssembler(inputCols = [f'{c}_index' for c in categorical_cols] + numeric_cols, outputCol = 'features')

In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label')


In [None]:
pipeline = Pipeline(stages = indexer + [assembler , label_indexer , lr] )

In [None]:
train_data , test_data = df.randomSplit([.8, .2])

In [None]:
model = pipeline.fit(train_data)

In [None]:
prediction = model.transform(test_data)

In [None]:
prediction.select('label', 'prediction').show()

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 20 rows



In [None]:
prediction.groupby('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  807|
|  0.0|       1.0|  313|
|  1.0|       0.0|  744|
|  0.0|       0.0| 4716|
+-----+----------+-----+



In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol = 'prediction', labelCol = 'label', metricName = 'accuracy')

In [None]:
evaluator.evaluate(prediction)

0.8393617021276596

In [None]:
df.show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+
| 39|        State-gov| 77516|    Bachelors|     13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male| 2174|   0|       40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311|    Bachelors|     13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|    0|   0|       13| United-States| <=50K|
| 38|          Private|215646|      HS-grad|      9|            Divorced| Handlers-cleaners| Not-in-famil

In [None]:
categorical_cols = [ 'workclass',
 'education',
 'marital',
 'occupation',
 'relation',
 'race',
 'gender',
 'country']
numeric_cols = [ 'age' ,'fnlwgt' , 'numbers', 'gain', 'loss', 'hourlypay']
label = 'income'


In [None]:
def indexer(df, col):
  indexer = StringIndexer(inputCol = col , outputCol = f'{col}_index', handleInvalid = 'keep')
  indexed = indexer.fit(df).transform(df)
  return indexed



In [None]:
indexer(df, 'workclass').show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|workclass_index|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+
| 39|        State-gov| 77516|    Bachelors|     13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male| 2174|   0|       40| United-States| <=50K|            3.0|
| 50| Self-emp-not-inc| 83311|    Bachelors|     13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|    0|   0|       13| United-States| <=50K|            1.0|
| 38|          Private|21

In [None]:
df.show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+
| 39|        State-gov| 77516|    Bachelors|     13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male| 2174|   0|       40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311|    Bachelors|     13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|    0|   0|       13| United-States| <=50K|
| 38|          Private|215646|      HS-grad|      9|            Divorced| Handlers-cleaners| Not-in-famil

In [None]:
for col in categorical_cols:
  index_df = indexer(new_df, col)
  new_df = index_df

IllegalArgumentException: requirement failed: Output column workclass_index already exists.

In [None]:
new_df.show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|workclass_index|education_index|marital_index|occupation_index|relation_index|race_index|gender_index|country_index|income_index|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+
| 39|        State-gov| 77516|    Bachelors|     13|       Never-married|   

In [None]:
from pyspark.ml.stat import Correlation


In [None]:
assembler = VectorAssembler(inputCols = [f'{c}_index' for c in categorical_cols] + numeric_cols, outputCol = 'features')

In [None]:
vector = assembler.transform(new_df)

In [None]:
vector.show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+--------------------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|workclass_index|education_index|marital_index|occupation_index|relation_index|race_index|gender_index|country_index|income_index|            features|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+--------------------+
| 39|        

In [None]:
col = [f'{c}_index' for c in categorical_cols] + numeric_cols

In [None]:
r1 =Correlation.corr(vector, 'features').head()

In [None]:
print("Pearson correlation matrix:\n" + str(r1[0]))

Pearson correlation matrix:
DenseMatrix([[ 1.00000000e+00,  2.50587933e-02, -2.09477645e-02,
               4.22182173e-03, -4.52925664e-02,  1.89958265e-02,
              -1.64130127e-02, -2.13924109e-02,  1.02229290e-01,
               1.54389798e-01, -3.21605500e-02,  1.27681211e-01,
               4.18082447e-02,  2.39291045e-02,  1.36880859e-02],
             [ 2.50587933e-02,  1.00000000e+00, -5.43462520e-03,
              -1.64882302e-02, -1.40352218e-02,  3.20378119e-02,
              -4.02158184e-02,  8.29998874e-02,  4.24489521e-02,
               7.61125290e-02,  3.44838605e-02, -1.70081765e-01,
               5.92847625e-02,  2.19380216e-02, -7.92952053e-03],
             [-2.09477645e-02, -5.43462520e-03,  1.00000000e+00,
               4.46472295e-02,  4.11140727e-01,  6.89439876e-02,
               4.08301403e-01,  2.15264796e-02, -3.11287762e-01,
               3.05672475e-02,  5.44716467e-03, -1.04875923e-01,
              -5.77145430e-02, -5.33565354e-02, -1.46710908e

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
nb = NaiveBayes(featuresCol = 'features' , labelCol = 'label', smoothing=1.0, modelType="multinomial")

In [None]:
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

In [None]:
data = data.select('features', 'label')

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `features` cannot be resolved. Did you mean one of the following? [`_c0`, `_c1`, `_c10`, `_c11`, `_c12`].;
'Project ['features, 'label]
+- Relation [_c0#6193,_c1#6194,_c2#6195,_c3#6196,_c4#6197,_c5#6198,_c6#6199,_c7#6200,_c8#6201,_c9#6202,_c10#6203,_c11#6204,_c12#6205,_c13#6206,_c14#6207] csv


In [None]:
label = StringIndexer(inputCol = 'income', outputCol = 'label', handleInvalid = 'keep')

In [None]:
data = label.fit(data).transform(data)

In [None]:
data.show()

In [None]:
model = nb.fit(train)

In [None]:
predictions = model.transform(test)
predictions.show()

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

In [None]:
from pyspark.ml.classification import *

In [None]:
dt = pyspark.ml.classification.DecisionTreeClassifier(featuresCol = 'features' , labelCol = 'label',impurity= 'entropy',maxDepth= 8, maxBins = 45)

In [None]:
model = dt.fit(train)

In [None]:
predictions = model.transform(test)
predictions.show()

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

In [None]:
vector.show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+--------------------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|workclass_index|education_index|marital_index|occupation_index|relation_index|race_index|gender_index|country_index|income_index|            features|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+--------------------+
| 39|        

In [None]:
data = vector.select('features','income_index')

In [None]:
data.show()

+--------------------+------------+
|            features|income_index|
+--------------------+------------+
|[4.0,2.0,1.0,3.0,...|         0.0|
|(14,[0,1,3,8,9,10...|         0.0|
|(14,[2,3,4,8,9,10...|         0.0|
|(14,[1,3,5,8,9,10...|         0.0|
|[0.0,2.0,0.0,0.0,...|         0.0|
|(14,[1,3,4,6,8,9,...|         0.0|
|[0.0,10.0,5.0,5.0...|         0.0|
|(14,[0,3,8,9,10,1...|         1.0|
|[0.0,3.0,1.0,0.0,...|         1.0|
|(14,[1,3,8,9,10,1...|         1.0|
|(14,[1,3,5,8,9,10...|         1.0|
|(14,[0,1,5,7,8,9,...|         1.0|
|[0.0,2.0,1.0,3.0,...|         0.0|
|[0.0,6.0,1.0,4.0,...|         0.0|
|(14,[1,3,5,7,8,9,...|         1.0|
|(14,[1,3,5,7,8,9,...|         0.0|
|(14,[0,2,3,4,8,9,...|         0.0|
|(14,[2,3,4,8,9,10...|         0.0|
|(14,[1,3,8,9,10,1...|         0.0|
|[1.0,3.0,2.0,2.0,...|         1.0|
+--------------------+------------+
only showing top 20 rows



In [None]:
splits = data.randomSplit([0.8, 0.2], 1234)
train = splits[0]
test = splits[1]

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
rf = RandomForestClassifier(labelCol="income_index", featuresCol="features", numTrees=5, maxBins = 45)

In [None]:
model = rf.fit(train)

In [None]:
predictions = model.transform(test)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="income_index", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.846802637632265


In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="income_index", featuresCol="features", maxIter=10, maxBins = 45)

In [None]:
model = gbt.fit(train)
predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="income_index", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.8579972396871646


In [None]:
from xgboost.spark import SparkXGBClassifier

In [None]:
spark_reg_estimator = SparkXGBClassifier(
  features_col="features",
  label_col="income_index",
  num_workers=2,
)

In [None]:
model = spark_reg_estimator.fit(train)
predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="income_index", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 2 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


Test set accuracy = 0.8760926238307009


In [None]:
df.show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+
| 39|        State-gov| 77516|    Bachelors|     13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male| 2174|   0|       40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311|    Bachelors|     13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|    0|   0|       13| United-States| <=50K|
| 38|          Private|215646|      HS-grad|      9|            Divorced| Handlers-cleaners| Not-in-famil

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [None]:
layers = [14, 8, 6, 4, 2]

In [None]:
trainer = MultilayerPerceptronClassifier(featuresCol = 'features', labelCol = 'income_index' ,maxIter=100, layers=layers, blockSize=128, seed=1234)

In [None]:
model = trainer.fit(train)
predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="income_index", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.7636865511424629
