In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local[4]") \
.appName("AdultIncomeClassification") \
.config("spark.executor.memory", "1g") \
.config("spark.driver.memory", "1g") \
.getOrCreate()

In [None]:
adult = spark.read \
.option("header", True) \
.option("inferSchema", True) \
.option("sep", ",") \
.csv("/home/alper/Spark/data/adult.csv")

In [None]:
adult.limit(5).toPandas().head()

In [None]:
adult.count()

In [None]:
adult.printSchema()

In [None]:
col_names = ["age", "workclass", "fnlwgt", "education", "education_num", "marital_status",
             "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss",
             "hours_per_week", "native_country", "income"]

In [None]:
adult = adult.toDF(*col_names)

In [None]:
numeric = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]

In [None]:
categorical = ["workclass", "education", "marital_status", "occupation", 
               "relationship", "race", "sex", "native_country", "income"]

In [None]:
adult.describe(numeric).toPandas()

In [None]:
from pyspark.sql import functions as F

In [None]:
adult.groupBy(F.col("workclass")).agg({"*": "count"}).toPandas()

In [None]:
adult.groupBy(F.col("education")).agg({"*": "count"}).toPandas()

In [None]:
adult.groupBy(F.col("marital_status")).agg({"*": "count"}).toPandas()

In [None]:
adult.groupBy(F.col("occupation")).agg({"*": "count"}).toPandas()

In [None]:
adult.groupBy(F.col("relationship")).agg({"*": "count"}).toPandas()

In [None]:
adult.groupBy(F.col("race")).agg({"*": "count"}).toPandas()

In [None]:
adult.groupBy(F.col("sex")).agg({"*": "count"}).toPandas()

In [None]:
adult.groupBy(F.col("native_country")).agg({"*": "count"}).toPandas()

In [None]:
adult.groupBy(F.col("income")).agg({"*": "count"}).toPandas()

In [None]:
# Trim (Bosluk Kontrolu)
adult = adult \
.withColumn("workclass", F.trim(F.col("workclass"))) \
.withColumn("education", F.trim(F.col("education"))) \
.withColumn("marital_status", F.trim(F.col("marital_status"))) \
.withColumn("occupation", F.trim(F.col("occupation"))) \
.withColumn("relationship", F.trim(F.col("relationship"))) \
.withColumn("race", F.trim(F.col("race"))) \
.withColumn("sex", F.trim(F.col("sex"))) \
.withColumn("native_country", F.trim(F.col("native_country"))) \
.withColumn("income", F.trim(F.col("income"))) 

In [None]:
adult.count()

In [None]:
null_count = 1
for column in adult.columns:
    if (adult.filter(F.col(column).isNull()).count() > 0):
        print(null_count, ". ", column, " has null.")
    else:
        print(null_count, ". ", column)
        
    null_count += 1

In [None]:
unknown_count = 1
for column in adult.columns:
    if (adult.filter(F.col(column).contains("?")).count() > 0):
        print(unknown_count, ". ", column, " has unknown.")
    else:
        print(unknown_count, ". ", column)
        
    unknown_count += 1

In [None]:
adult.select("workclass", "occupation", "native_country") \
.filter(
    F.col("workclass").contains("?") | 
    F.col("occupation").contains("?") | 
    F.col("native_country").contains("?")) \
.groupBy("workclass", "occupation", "native_country").count() \
.orderBy(F.col("count").desc()) \
.toPandas().head(50)

In [None]:
adult.count()

In [None]:
adult = adult.filter(~(
    F.col("workclass").contains("?") |
    F.col("occupation").contains("?") |
    F.col("native_country").contains("?")
))

adult.count()

In [None]:
32561 - 30162

In [None]:
adult = adult.filter(~(
    F.col("workclass").contains("never-worked") | 
    F.col("workclass").contains("without-pay") |
    F.col("occupation").contains("Armed-Forces") |
    F.col("native_country").contains("Holand-Netherlands")
))

adult.count()

In [None]:
30162 - 30152

In [None]:
adult = adult.withColumn("education_merged",
F.when(F.col("education").isin("1st-4th", "5th-6th", "7th-8th"), "Elementary-School") 
.when(F.col("education").isin("9th", "10th", "11th", "12th"), "High-School") 
.when(F.col("education").isin("Masters", "Doctorate"), "Postgraduate")
.when(F.col("education").isin("Bachelors", "Some-college"), "Undergraduate")
.otherwise(F.col("education")))

In [None]:
adult.select("education", "education_merged").toPandas().head(10)

In [None]:
orders = ["workclass", "education", "education_merged", "marital_status", "occupation", "relationship",
          "race", "sex", "native_country", "age", "fnlwgt", "education_num", "capital_gain", "capital_loss",
          "hours_per_week", "income"]

In [None]:
adult = adult.select(orders)
adult.toPandas().head(10)

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

In [None]:
categorical

In [None]:
workclass_indexer = StringIndexer() \
.setInputCol("workclass") \
.setOutputCol("workclass_index") \
.setHandleInvalid("skip")

In [None]:
education_merged_indexer = StringIndexer() \
.setInputCol("education_merged") \
.setOutputCol("education_merged_index") \
.setHandleInvalid("skip")

In [None]:
marital_status_indexer = StringIndexer() \
.setInputCol("marital_status") \
.setOutputCol("marital_status_index") \
.setHandleInvalid("skip")

In [None]:
occupation_indexer = StringIndexer() \
.setInputCol("occupation") \
.setOutputCol("occupation_index") \
.setHandleInvalid("skip")

In [None]:
relationship_indexer = StringIndexer() \
.setInputCol("relationship") \
.setOutputCol("relationship_index") \
.setHandleInvalid("skip")

In [None]:
race_indexer = StringIndexer() \
.setInputCol("race") \
.setOutputCol("race_index") \
.setHandleInvalid("skip")

In [None]:
native_country_indexer = StringIndexer() \
.setInputCol("native_country") \
.setOutputCol("native_country_index") \
.setHandleInvalid("skip")

In [None]:
encoder = OneHotEncoder() \
.setInputCols(["workclass_index", "education_merged_index", "marital_status_index", "occupation_index",
               "relationship_index", "race_index", "native_country_index"]) \
.setOutputCols(["workclass_index_enc", "education_merged_index_enc", "marital_status_index_enc",
                "occupation_index_enc", "relationship_index_enc", "race_index_enc", 
                "native_country_index_enc"])

In [None]:
features_list = ["workclass_index_enc", "education_merged_index_enc", "marital_status_index_enc",
                 "occupation_index_enc", "relationship_index_enc", "race_index_enc", 
                 "native_country_index_enc", "age", "fnlwgt", "education_num", "capital_gain",
                 "capital_loss", "hours_per_week"]

In [None]:
assembler = VectorAssembler() \
.setInputCols(features_list) \
.setOutputCol("vectorized_features")

In [None]:
label_indexer = StringIndexer() \
.setInputCol("income") \
.setOutputCol("label")

In [None]:
scaler = StandardScaler() \
.setInputCol("vectorized_features") \
.setOutputCol("features")

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
logreg_object = LogisticRegression() \
.setFeaturesCol("features") \
.setLabelCol("label") \
.setPredictionCol("prediction")

In [None]:
train_df, test_df = adult.randomSplit([0.8, 0.2], seed=4242)

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipe = Pipeline() \
.setStages([workclass_indexer, education_merged_indexer, marital_status_indexer, occupation_indexer,
            relationship_indexer, race_indexer, native_country_indexer, encoder, assembler, label_indexer,
            scaler, logreg_object])

In [None]:
model = pipe.fit(train_df)

In [None]:
result = model.transform(test_df).select("label", "prediction")
result.limit(5).toPandas().head()

In [None]:
from sklearn.metrics import accuracy_score

In [None]:
y_test = result.select("label").toPandas()
y_pred = result.select("prediction").toPandas()

In [None]:
accuracy_score(y_test, y_pred)