In [19]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


# Class DataPreprocessing

In [20]:
class DataPreprocessing:

    # Constructor
    def __init__(self) -> None:
        pass


# Function to Read CSV file

In [5]:
    def readCsv(self, path, spark=None):
        from pyspark.sql import SparkSession  # Module to Create Spark Session

        if spark is None:
            spark = SparkSession.builder.appName(
                "DataPreprocessing").getOrCreate()

        # Read the CSV file with header and infer schema
        df = spark.read.csv(path, header=True, inferSchema=True)

        for col in df.columns:  # Rename columns with dots
            df = df.withColumnRenamed(col, col.replace('.', '_'))
        return df

# Function to Fill Missing Values

In [6]:
    def fillMissingValues(self, df, numeric_cols, categorical_cols):

      import pyspark.sql.functions as F # Module to perform various operations on DataFrame

      df = df.replace(['?', 'NA', 'Nan', 'na', 'NaN'], None)  # Replace missing values with None

      for col in categorical_cols:  # Fill with the most frequent value
        mode_value = df.select(F.mode(df[col])).first()[0]  # Calculate the mode value
        df = df.fillna(mode_value, subset=[col])  # Fill with the mode value

      for col in numeric_cols:  # Fill with the mean value
        mean_value = df.select(F.mean(df[col])).first()[0]  # Calculate the mean value
        df = df.fillna(mean_value, subset=[col])  # Fill with the mean value

      return df

# Function to Remove Outliers using IQR(Interquartile Range)

In [7]:
    def removeOutliers(self, df, cols):
        
        from pyspark.sql.functions import col # Module to perform various operations on DataFrame

        # Calculate the IQR for each column
        for col_name in cols:
          q1 = df.approxQuantile(col_name, [0.25], 0.05)[0]
          q3 = df.approxQuantile(col_name, [0.75], 0.05)[0]
          iqr = q3 - q1
          # Remove outliers using IQR
          lower_bound = q1 - 1.5 * iqr
          upper_bound = q3 + 1.5 * iqr
          # Filter out the outliers
          df = df.filter((col(col_name) >= lower_bound) & (col(col_name) <= upper_bound))
        return df

# Function to Remove Outliers using Z-Score Method

In [8]:
    def RemoveOutliers_ZScore(self, df, columns):

        from pyspark.sql.functions import col # Module to perform various operations on DataFrame

        for col_name in columns:
            # Calculate mean and standard deviation for the column
            mean_value = df.selectExpr(f'mean({col_name}) as mean').collect()[0]['mean']
            stddev_value = df.selectExpr(f'stddev({col_name}) as stddev').collect()[0]['stddev']

            # Remove outliers using Z-score
            threshold = 3 * stddev_value
            # Filter out the outliers
            df = df.filter(col(col_name).between(mean_value - threshold, mean_value + threshold))

        return df

# Function to Normalize Numeric Columns using MinMaxScalar

In [9]:
    def NormalizeNumericColumns(self, df, cols):

      from pyspark.ml import Pipeline # Module to create a pipeline of stages

      from pyspark.ml.feature import VectorAssembler, MinMaxScaler # Module to perform various feature engineering operations

      # VectorAssembler to Assemble Columns in a Single Column named Features
      assembler = VectorAssembler(inputCols=cols, outputCol="features")

      # Apply MinMaxScaler to the vector column
      scaler = MinMaxScaler(inputCol="features", outputCol="Scaledfeatures")

      # Create a pipeline
      pipeline = Pipeline(stages=[assembler, scaler])

      # Fit the pipeline and transform the data
      pipeline_model = pipeline.fit(df)
      df = pipeline_model.transform(df)

      df = df.drop("features")

      return df

# Function to Standardize Numeric Columns using Standard Scaler

In [10]:
    def StandardizeNumericColumns(self, df, cols):

      from pyspark.ml import Pipeline # Module to create a pipeline of stages

      from pyspark.ml.feature import VectorAssembler, StandardScaler # Module to perform various feature engineering operations

      # VectorAssembler to Assemble Columns in a Single Column named Features
      assembler = VectorAssembler(inputCols=cols, outputCol="features")

      # Apply StandardScaler to the vector column
      scaler = StandardScaler(inputCol="features", outputCol="Scaledfeatures", withMean=True, withStd=True)

      # Create a pipeline
      pipeline = Pipeline(stages=[assembler, scaler])

      # Fit the pipeline and transform the data
      pipeline_model = pipeline.fit(df)
      df = pipeline_model.transform(df)

      df = df.drop("features")

      return df

# Function to Encode Categorical Columns using Label Encoding

In [11]:
    def LabelEncoding(self, df, categorical_cols):

      from pyspark.ml import Pipeline # Module to create a pipeline of stages

      from pyspark.ml.feature import StringIndexer # Module to perform various feature engineering operations

      from pyspark.sql.types import BooleanType # Module to specify the data type of a column

      for column in categorical_cols:
        # Check if the column has only two unique values
        if df.schema[column].dataType == BooleanType():
          df = df.withColumn(column, df[column].cast('int'))
        else:
          # StringIndexer to convert categorical column to numerical indices
          indexer = StringIndexer(inputCol=column, outputCol=f"{column}Index")

          # Create a pipeline
          pipeline = Pipeline(stages=[indexer])

          # Fit the pipeline and transform the data
          pipeline_model = pipeline.fit(df)
          df = pipeline_model.transform(df)

          df = df.withColumn(column, df[f"{column}Index"]).drop(df[f"{column}Index"])

          # indexer_model = pipeline_model.stages[0]  # Get the StringIndexerModel from the pipeline
          # labels = indexer_model.labels  # Get the list of labels
          # print("Category to Index Mapping:")
          # for index, label in enumerate(labels):
          #   print(f"{label}: {index}")

      return df

# Function to Encode Categorical Columns using One Hot Encoding

In [12]:
    def oneHotEncoding(self, df, categorical_cols):

      from pyspark.ml import Pipeline # Module to create a pipeline of stages

      from pyspark.ml.feature import StringIndexer, OneHotEncoder # Module to perform various feature engineering operations

      from pyspark.sql.types import BooleanType # Module to specify the data type of a column

      for column in categorical_cols:
        # Check if the column has only two unique values
        if df.schema[column].dataType == BooleanType():
          df = df.withColumn(column, df[column].cast('int'))
        else:
          # String Indexing
          indexer = StringIndexer(inputCol=column, outputCol=f"{column}Index")

          # Hot Encoding
          encoder = OneHotEncoder(inputCol=f"{column}Index", outputCol=f"{column}Vec")

          # Create a pipeline
          pipeline = Pipeline(stages=[indexer, encoder])

          # Fit the pipeline and transform the data
          pipeline_model = pipeline.fit(df)
          df = pipeline_model.transform(df)

          df = df.withColumn(column, df[f"{column}Vec"]).drop(df[f"{column}Index"], df[f"{column}Vec"])

          # indexer_model = pipeline_model.stages[0]  # Get the StringIndexerModel from the pipeline
          # labels = indexer_model.labels  # Get the list of labels
          # print("Category to Index Mapping:")
          # for index, label in enumerate(labels):
          #   print(f"{label}: {index}")

      return df

# Main

In [None]:
if __name__ == "__main__":
  from pyspark.sql import SparkSession  # Module to Create Spark Session
  import matplotlib.pyplot as plt  # Module to plot graphs


  spark = SparkSession.builder.master("local").appName("DataPreprocessing").getOrCreate()
  
  dp = DataPreprocessing()


  df = dp.readCsv("//adult.csv", spark)



  target = df.columns[-1]

In [20]:
  if 'id' in df.columns:
    df = df.drop('id')

In [None]:
  df.printSchema()

In [None]:
  distinct_values = df.groupBy(target).count()
  distinct_values.show()

In [23]:
  numeric_cols = []
  categorical_cols = []

  for column in df.dtypes:  # Check the data type of each column
    if column[1] == 'string' or column[1] == 'boolean':  # If the data type is string, it is categorical
      categorical_cols.append(column[0])
    else:  # If the data type is numeric, it is numeric
      numeric_cols.append(column[0])
  print(numeric_cols)
  print(categorical_cols)

['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country', 'income']


In [24]:
  print(df.count())

32561


In [25]:
  df = dp.fillMissingValues(df, numeric_cols, categorical_cols)

In [None]:
  for column in numeric_cols:
    mean_value_before = df.selectExpr(f'mean({column}) as mean').collect()[0]['mean']
    stddev_value_before = df.selectExpr(f'stddev({column}) as stddev').collect()[0]['stddev']
    # Plotting before outlier removal
    plt.figure(figsize=(10, 6))
    plt.hist(df.select(column).rdd.flatMap(lambda x: x).collect(), bins=20, color='skyblue', edgecolor='black')
    plt.axvline(x=mean_value_before, color='red', linestyle='--', label='Mean')
    plt.axvline(x=mean_value_before + 3 * stddev_value_before, color='orange', linestyle='--', label='3 Std Dev')
    plt.axvline(x=mean_value_before - 3 * stddev_value_before, color='orange', linestyle='--')
    plt.title('Distribution Before Outlier Removal')
    plt.xlabel(column)
    plt.ylabel('Frequency')
    plt.legend()
    plt.grid(True)
    plt.show()

In [22]:
  columns = numeric_cols[:]
  if target in columns:
    columns.remove(target)

In [23]:
  df = dp.RemoveOutliers_ZScore(df, columns)
  print(df.count())

29829


In [None]:
  for column in numeric_cols:
    mean_value_after = df.selectExpr(f'mean({column}) as mean').collect()[0]['mean']
    stddev_value_after = df.selectExpr(f'stddev({column}) as stddev').collect()[0]['stddev']
    # Plotting before outlier removal
    plt.figure(figsize=(10, 6))
    plt.hist(df.select(column).rdd.flatMap(lambda x: x).collect(), bins=20, color='skyblue', edgecolor='black')
    plt.axvline(x=mean_value_after, color='red', linestyle='--', label='Mean')
    plt.axvline(x=mean_value_after + 3 * stddev_value_after, color='orange', linestyle='--', label='3 Std Dev')
    plt.axvline(x=mean_value_after - 3 * stddev_value_after, color='orange', linestyle='--')
    plt.title('Distribution after Outlier Removal')
    plt.xlabel(column)
    plt.ylabel('Frequency')
    plt.legend()
    plt.grid(True)
    plt.show()

In [None]:
  df.show()

In [None]:
  df = dp.oneHotEncoding(df, categorical_cols)

In [None]:
  df.show()

In [26]:
  df = dp.LabelEncoding(df, categorical_cols)

In [None]:
  df.show()

In [28]:
  columns.extend(categorical_cols)
  if target in columns:
    columns.remove(target)
  df = dp.NormalizeNumericColumns(df, columns)

In [None]:
  df.select("Scaledfeatures").show(truncate=False)

In [None]:
  for column in numeric_cols:
    mean_value_after = df.selectExpr(f'mean({column}) as mean').collect()[0]['mean']
    stddev_value_after = df.selectExpr(f'stddev({column}) as stddev').collect()[0]['stddev']
    # Plotting before outlier removal
    plt.figure(figsize=(10, 6))
    plt.hist(df.select(column).rdd.flatMap(lambda x: x).collect(), bins=20, color='skyblue', edgecolor='black')
    plt.axvline(x=mean_value_after, color='red', linestyle='--', label='Mean')
    plt.axvline(x=mean_value_after + 3 * stddev_value_after, color='orange', linestyle='--', label='3 Std Dev')
    plt.axvline(x=mean_value_after - 3 * stddev_value_after, color='orange', linestyle='--')
    plt.title('Distribution after Outlier Removal')
    plt.xlabel(column)
    plt.ylabel('Frequency')
    plt.legend()
    plt.grid(True)
    plt.show()

In [None]:
  spark.stop()