# Pre-processing for a dataset

<div dir="rtl">

در این بخش ما سعی می کنیم که هر ۱۰ دیتاست موجود طی ۱۰ سال را پیش پردازش کنیم تا دیتاست های ما برای مرحله بعدی یعنی پردازش آماده شوند.
همانطور که قبلا توضیح دادیم پیش پردازش شامل انجام متد ها و روش هایی برای تمیز کردن و پاکسازی دیتاست هاست تا قسمت هایی که به دلایلی خراب یا اشتباه هستند یا حذف شده اند یا وارد نشده اند از دیتاست حذف شوند و در نهایت ما دیتاست قابل اتکایی را برای پردازش داشته باشیم.

</div>

In [32]:
# Configuring Spark
import findspark
findspark.init()


In [33]:
# Import required modules
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import col, when, substring, concat_ws, lit, rand

In [34]:
# Create a Spark Session
spark = SparkSession.builder \
    .appName("raw") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()



In [35]:
# Read CSV file
file_path = "../../RawData/Extracted_data/LFS_RawData90.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)


In [36]:
# Display DataFrame in a tabular format, showing only the first 10 rows
df.show(10, truncate=False)

+----------+---------------+---------+---------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+---------+---------+---------+-------------+-------------+-------------+-------------+------+-------------+-------------+-------------+-------------+-------------+-------------+-----------+---------+------+------+------+------+------+------+------+------+---------+----------+------+------+------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+------+------+---------+---------+------+---------+---------+------+------+---------+---------+------+------+------+------+------+------+------+----------+---------+------+------+-----------+-----------+--------------+-------------+
|pkey      |F2Kol_DJAYGOZIN|F2Kol_D20|F2Kol_D21|F2_D01|F2_D03|F2_D04|F2_D05|F2_D06|F2_D07|F2_D08|F2_D09|F2_

In [37]:
# Now, we want to drop some unnecessary columns we mentioned in meta
# List of columns to drop:
columns_to_drop = ["pkey", "F2_D01","F3_D18SHANBEH", "F3_D18YEKSHAN", "F3_D18DOSHANB", "F3_D18SESHANB", "F3_D18CHARSHA", "F3_D18PANSHAN", "F3_D18JOMEH", "IW10_Yearly", "IW15_Yearly", "ActivityStatus", "NobatAmargiri"]

# Drop the specified columns and create a new DataFrame
df_after_drop = df.drop(*columns_to_drop)

In [38]:
# Display the DataFrame after dropping columns
print("DataFrame after dropping specified columns:")
df_after_drop.show()

DataFrame after dropping specified columns:
+---------------+---------+---------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+---------+---------+---------+-------------+-------------+-------------+-------------+------+---------+------+------+------+------+------+------+------+------+---------+----------+------+------+------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+------+------+---------+---------+------+---------+---------+------+------+---------+---------+------+------+------+------+------+------+------+----------+---------+------+------+
|F2Kol_DJAYGOZIN|F2Kol_D20|F2Kol_D21|F2_D03|F2_D04|F2_D05|F2_D06|F2_D07|F2_D08|F2_D09|F2_D10|F2_D11|F2_D12|F2_D13|F2_D14|F2_D15|F2_D16|F2_D17|F2_D18|F2_D19|F3_D01|F3_D02|F3_D03|F3_D04|F3_D05|F3_D06|F3_D07|F3_D08|F3_D09|F3_D10|F3_D

In [39]:
# Then, we want to rename the columns in a way that is more understandable. First, we need to create a dictionary. 
column_name_mapping = {
    "F2Kol_DJAYGOZIN": "F2_replaced_family",
    "F2Kol_D20": "F2_completed_form",
    "F2Kol_D21": "F2_why_not_completed_form",
    "F2_D03": "F2_households",
    "F2_D04": "F2_gender",
    "F2_D05": "F2_month_birth",
    "F2_D06": "F2_year_birth",
    "F2_D07": "F2_age_at_time",
    "F2_D08": "F2_nationality",
    "F2_D09": "F2_residence_status",
    "F2_D10": "F2_time_in_place",
    "F2_D11": "F2_previous_residence",
    "F2_D12": "F2_cause_change_residence",
    "F2_D13": "F2_previous_residence_province",
    "F2_D14": "F2_country_residence",
    "F2_D15": "F2_education_status",
    "F2_D16": "F2_literacy_status",
    "F2_D17": "F2_education_degree",
    "F2_D18": "F2_education_field",
    "F2_D19": "F2_marriage_status",
    "F3_D01": "F3_wsw_salary",
    "F3_D02": "F3_wsw_money",
    "F3_D03": "F3_wsw_work_family",
    "F3_D04": "F3_wsw_product_family",
    "F3_D05": "F3_wsw_intership",
    "F3_D06": "F3_wsw_skip_work",
    "F3_D07": "F3_wsw_why_skipped",
    "F3_D08": "F3_wsw_have_another_job",
    "F3_D09": "F3_cej_job_title",
    "F3_D10": "F3_cej_workplace_characteristics",
    "F3_D11": "F3_cej_job_status",
    "F3_D12": "F3_cej_workers_number",
    "F3_D13": "F3_cej_insurance",
    "F3_D14SAL": "F3_cej_years_in_job",
    "F3_D14MAH": "F3_cej_months_in_job",
    "F3_D15SAL": "F3_cej_years_whole_work",
    "F3_D15MAH": "F3_cej_months_whole_work",
    "F3_D16SHASLIR": "F3_hwj_season_prime_days_in_week",
    "F3_D16SHASLIS": "F3_hwj_season_prime_hours_in_week",
    "F3_D16SHHAMRO": "F3_hwj_season_all_days_in_week",
    "F3_D16SHHAMSA": "F3_hwj_season_all_hours_in_week",
    "F3_D17": "F3_hwj_why_hours_in_week",
    "F3_D18JAM": "F3_hwj_sum_hours_worked",
    "F3_D19": "F3_hwj_why_less_worked",
    "F3_D20": "F3_hwj_why_more_worked",
    "F3_D21": "F3_dws_wanted_increase_hours",
    "F3_D22": "F3_dws_ability_increase_hours",
    "F3_D23": "F3_dws_how_increase_hours",
    "F3_D24": "F3_dws_search_for_work",
    "F3_D25": "F3_dws_why_search_for_work",
    "F3_D26": "F3_dws_how_search_for_work",
    "F3_D27ROZ": "F3_dws_season_desire_work_days",
    "F3_D27SAAT": "F3_dws_season_desire_work_hours",
    "F3_D28": "F3_cesj_job_title",
    "F3_D29": "F3_cesj_workplace_characteristics",
    "F3_D30": "F3_cesj_job_status",
    "F3_D31": "F3_sj_searched_in_previous_week",
    "F3_D33": "F3_sj_cause_not_searched",
    "F3_D34": "F3_sj_ability_start_job",
    "F3_D35SAL": "F3_sj_duration_searching_job_year",
    "F3_D35MAH": "F3_sj_duration_searching_job_month",
    "F3_D36": "F3_sj_previous_status",
    "F3_D37SAL": "F3_sj_worked_duration_year",
    "F3_D37MAH": "F3_sj_worked_duration_month",
    "F3_D38": "F3_sj_unemployment_insurance_status",
    "F3_D39": "F3_pje_2weeks_worked",
    "F3_D40SAL": "F3_pje_left_job_duration_year",
    "F3_D40MAH": "F3_pje_left_job_duration_month",
    "F3_D41": "F3_pje_job_title",
    "F3_D42": "F3_pje_workplace_characteristics",
    "F3_D43": "F3_pje_job_status",
    "F3_D44": "F3_pje_why_left_job",
    "F3_D45": "F3_pje_ability_start_job",
    "F3_D46": "F3_pje_why_not_start_job",
    "F3_D47": "F3_pje_previous_week_status",
    "F3_D48SAAT": "F3_cdj_work_hours",
    "F3_D48ROZ": "F3_cdj_work_days",
    "F3_D49": "F3_cdj_which_job_status",
    "F3_D50": "F3_cdj_which_industry"
}

# Rename columns using withColumnRenamed
df_renamed = df_after_drop
for current_col, new_col in column_name_mapping.items():
    df_renamed = df_renamed.withColumnRenamed(current_col, new_col)

# Display the DataFrame after renaming columns
print("DataFrame after renaming columns:")
df_renamed.show()

DataFrame after renaming columns:
+------------------+-----------------+-------------------------+-------------+---------+--------------+-------------+--------------+--------------+-------------------+----------------+---------------------+-------------------------+------------------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------+------------+------------------+---------------------+----------------+----------------+------------------+-----------------------+----------------+--------------------------------+-----------------+---------------------+----------------+-------------------+--------------------+-----------------------+------------------------+--------------------------------+---------------------------------+------------------------------+-------------------------------+------------------------+-----------------------+----------------------+----------------------+------------------

<div dir="rtl">

خب تا به اینجای کار اسم های ستون ها را عوض کردیم و به اسم هایی قابل فهم تر برای راحتی کار در حین تحلیل، تبدیل کردیم. حال فقط مانده که طول برخی از فیلد هارا عوض کنیم و چند ستون را به یک ستون کاربردی تبدیل کنیم.

</div>

In [40]:
# ONLY, and ONLY for datasets before 1397.

# Specify the column to be modified
column_to_map = "F2_education_degree"

# Define the mapping using the when function
mapping_expr = (
    when(col(column_to_map) == 11, 1)
    .when(col(column_to_map) == 21, 2)
    .when(col(column_to_map) == 31, 3)
    .when(col(column_to_map) == 41, 4)
    .when(col(column_to_map) == 51, 5)
    .when(col(column_to_map) == 52, 6)
    .when(col(column_to_map) == 53, 7)
    .when(col(column_to_map) == 61, 8)
    .when(col(column_to_map) == 71, 9)
    .otherwise(col(column_to_map))  # If no condition is met, keep the original value
)

# Apply the mapping and cast the column to IntegerType
df_mapped = df_renamed.withColumn(column_to_map, mapping_expr.cast(IntegerType()))


# Display the DataFrame after applying the mapping
print(f"DataFrame after mapping values in {column_to_map}:")
df_mapped.show()

DataFrame after mapping values in F2_education_degree:
+------------------+-----------------+-------------------------+-------------+---------+--------------+-------------+--------------+--------------+-------------------+----------------+---------------------+-------------------------+------------------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------+------------+------------------+---------------------+----------------+----------------+------------------+-----------------------+----------------+--------------------------------+-----------------+---------------------+----------------+-------------------+--------------------+-----------------------+------------------------+--------------------------------+---------------------------------+------------------------------+-------------------------------+------------------------+-----------------------+----------------------+--------------------

In [41]:
# Length of fields modifying

# Specify the columns to be modified and their new length
columns_to_modify = {
    "F2_education_degree": 1,
    "F2_education_field": 4,
    "F3_cej_workplace_characteristics": 5,
    "F3_cesj_workplace_characteristics": 5,
    "F3_pje_workplace_characteristics": 5
}

# Loop through each column and modify the length
df_modified = df_mapped

for column, new_length in columns_to_modify.items():
    # Check if the column exists in the DataFrame before attempting modification
    if column in df_modified.columns:
        # Apply the mapping and cast the column to IntegerType
        df_modified = df_modified.withColumn(column, substring(col(column), 1, new_length).cast(IntegerType()))
    else:
        print(f"Warning: Column '{column}' not found in the DataFrame. Skipping modification.")

# Display the DataFrame after modifying the column lengths
print("DataFrame after modifying column lengths:")
df_modified.show()


DataFrame after modifying column lengths:
+------------------+-----------------+-------------------------+-------------+---------+--------------+-------------+--------------+--------------+-------------------+----------------+---------------------+-------------------------+------------------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------+------------+------------------+---------------------+----------------+----------------+------------------+-----------------------+----------------+--------------------------------+-----------------+---------------------+----------------+-------------------+--------------------+-----------------------+------------------------+--------------------------------+---------------------------------+------------------------------+-------------------------------+------------------------+-----------------------+----------------------+----------------------+----------

In [42]:
# Assume these are the columns representing "yes" (1) or "no" (2) answers
columns_to_merge = ["F3_D1_32", "F3_D2_32", "F3_D3_32", "F3_D4_32", "F3_D5_32", "F3_D6_32", "F3_D7_32", "F3_D8_32", "F3_D9_32"]

# Create a new column containing a string representation of merged values
df_merged = df_modified.withColumn(
    "F3_sj_which_ways_searched",
    concat_ws(
        "",
        *[
            when(col(col_name) == 1, "1")
            .when(col(col_name) == 2, "2")
            .otherwise("0")
            for col_name in columns_to_merge
        ]
    )
)

# Add a lit(null) to make the column nullable
df_merged = df_merged.withColumn("F3_sj_which_ways_searched", when(df_merged["F3_sj_which_ways_searched"] == "", lit(None)).otherwise(df_merged["F3_sj_which_ways_searched"]))

# Then we want to drop the unuseful "F3_D1_32" to "F3_D9_32" columns.
columns_to_drop = ["F3_D1_32", "F3_D2_32", "F3_D3_32", "F3_D4_32", "F3_D5_32", "F3_D6_32", "F3_D7_32", "F3_D8_32", "F3_D9_32"]
# Drop the specified columns and create a new DataFrame
df_merged_after_drop = df_merged.drop(*columns_to_drop)

# Display the DataFrame after merging the columns
print("DataFrame after merging columns:")
df_merged_after_drop.show()


DataFrame after merging columns:
+------------------+-----------------+-------------------------+-------------+---------+--------------+-------------+--------------+--------------+-------------------+----------------+---------------------+-------------------------+------------------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------+------------+------------------+---------------------+----------------+----------------+------------------+-----------------------+----------------+--------------------------------+-----------------+---------------------+----------------+-------------------+--------------------+-----------------------+------------------------+--------------------------------+---------------------------------+------------------------------+-------------------------------+------------------------+-----------------------+----------------------+----------------------+-------------------

<div dir="rtl">
می دانیم بسیاری از مقادیری که در داخل دیتاست داریم به صورت عددی هستند. اما این اعداد واقعا اعداد نیستند و در واقع نماینده گزینه هایی از پاسخ سوالات پرسشنامه هستند. این اعداد اگر همین طور به صورت int باقی بمانند، در اینده در هنگام انجام رگرسیون لجستیک و استفاده از متغییر های ساختگی، ما را دچار مشکل می کنند و باعث جانبداری تحلیل ما می شوند. پس همین حالا تایپ هر ستون را اصلاح می کنیم.
</div>

In [43]:
# Print the schema
df_merged_after_drop.printSchema()

root
 |-- F2_replaced_family: integer (nullable = true)
 |-- F2_completed_form: integer (nullable = true)
 |-- F2_why_not_completed_form: string (nullable = true)
 |-- F2_households: string (nullable = true)
 |-- F2_gender: integer (nullable = true)
 |-- F2_month_birth: string (nullable = true)
 |-- F2_year_birth: string (nullable = true)
 |-- F2_age_at_time: string (nullable = true)
 |-- F2_nationality: integer (nullable = true)
 |-- F2_residence_status: integer (nullable = true)
 |-- F2_time_in_place: string (nullable = true)
 |-- F2_previous_residence: string (nullable = true)
 |-- F2_cause_change_residence: string (nullable = true)
 |-- F2_previous_residence_province: string (nullable = true)
 |-- F2_country_residence: string (nullable = true)
 |-- F2_education_status: string (nullable = true)
 |-- F2_literacy_status: string (nullable = true)
 |-- F2_education_degree: integer (nullable = true)
 |-- F2_education_field: integer (nullable = true)
 |-- F2_marriage_status: string (nulla

In [44]:
# Function to convert column data types to integer
def convert_to_int(df, columns):
    for column in columns:
        df = df.withColumn(column, col(column).cast("int"))
    return df

# Function to convert column data types to string
def convert_to_string(df, columns):
    for column in columns:
        df = df.withColumn(column, col(column).cast("string"))
    return df

In [45]:
# Get a list of columns with int type
string_columns = [col_name for col_name, data_type in df_merged_after_drop.dtypes if data_type == "int"]
string_columns

['F2_replaced_family',
 'F2_completed_form',
 'F2_gender',
 'F2_nationality',
 'F2_residence_status',
 'F2_education_degree',
 'F2_education_field',
 'F3_wsw_salary',
 'F3_cej_workplace_characteristics',
 'F3_cesj_workplace_characteristics',
 'F3_pje_workplace_characteristics']

In [46]:
# List of columns to convert to string
columns_to_convert = ['F2_replaced_family',
 'F2_completed_form',
 'F2_gender',
 'F2_nationality',
 'F2_residence_status',
 'F2_education_degree',
 'F2_education_field',
 'F3_wsw_salary',
 'F3_cej_workplace_characteristics',
 'F3_cesj_workplace_characteristics',
 'F3_pje_workplace_characteristics'] 

# Convert columns and data to string
df_merged_after_drop = convert_to_string(df_merged_after_drop, columns_to_convert)

# Print the schema
df_merged_after_drop.printSchema()

root
 |-- F2_replaced_family: string (nullable = true)
 |-- F2_completed_form: string (nullable = true)
 |-- F2_why_not_completed_form: string (nullable = true)
 |-- F2_households: string (nullable = true)
 |-- F2_gender: string (nullable = true)
 |-- F2_month_birth: string (nullable = true)
 |-- F2_year_birth: string (nullable = true)
 |-- F2_age_at_time: string (nullable = true)
 |-- F2_nationality: string (nullable = true)
 |-- F2_residence_status: string (nullable = true)
 |-- F2_time_in_place: string (nullable = true)
 |-- F2_previous_residence: string (nullable = true)
 |-- F2_cause_change_residence: string (nullable = true)
 |-- F2_previous_residence_province: string (nullable = true)
 |-- F2_country_residence: string (nullable = true)
 |-- F2_education_status: string (nullable = true)
 |-- F2_literacy_status: string (nullable = true)
 |-- F2_education_degree: string (nullable = true)
 |-- F2_education_field: string (nullable = true)
 |-- F2_marriage_status: string (nullable = t

In [47]:
# Now some columns needed to be int and not string, lets correct them too.
# List of columns to convert to int
columns_to_convert = ['F2_month_birth',
 'F2_year_birth',
 'F2_age_at_time',
 'F3_cej_years_in_job',
 'F3_cej_months_in_job',
 'F3_cej_years_whole_work',
 'F3_cej_months_whole_work',
 'F3_hwj_season_prime_days_in_week',
 'F3_hwj_season_prime_hours_in_week',
 'F3_hwj_season_all_days_in_week',
 'F3_hwj_season_all_hours_in_week',
 'F3_hwj_sum_hours_worked',
 'F3_dws_season_desire_work_days',
 'F3_dws_season_desire_work_hours',
 'F3_sj_duration_searching_job_year',
 'F3_sj_duration_searching_job_month',
 'F3_sj_worked_duration_year',
 'F3_sj_worked_duration_month',
 'F3_pje_left_job_duration_year',
 'F3_pje_left_job_duration_month',
 'F3_cdj_work_hours',
 'F3_cdj_work_days'] 

# Convert columns and data to int
df_merged_after_drop = convert_to_int(df_merged_after_drop, columns_to_convert)

# Print the schema
df_merged_after_drop.printSchema()


root
 |-- F2_replaced_family: string (nullable = true)
 |-- F2_completed_form: string (nullable = true)
 |-- F2_why_not_completed_form: string (nullable = true)
 |-- F2_households: string (nullable = true)
 |-- F2_gender: string (nullable = true)
 |-- F2_month_birth: integer (nullable = true)
 |-- F2_year_birth: integer (nullable = true)
 |-- F2_age_at_time: integer (nullable = true)
 |-- F2_nationality: string (nullable = true)
 |-- F2_residence_status: string (nullable = true)
 |-- F2_time_in_place: string (nullable = true)
 |-- F2_previous_residence: string (nullable = true)
 |-- F2_cause_change_residence: string (nullable = true)
 |-- F2_previous_residence_province: string (nullable = true)
 |-- F2_country_residence: string (nullable = true)
 |-- F2_education_status: string (nullable = true)
 |-- F2_literacy_status: string (nullable = true)
 |-- F2_education_degree: string (nullable = true)
 |-- F2_education_field: string (nullable = true)
 |-- F2_marriage_status: string (nullable 

<div dir="rtl">
چون دیتابیس ما بسیار شرطی است، هر خانه ایی که داده ایی در آن نیست الزاما به معنی اشتباه بودن آن نیست و ما نمی توانیم به سادگی آن سطر را حذف کنیم. چراکه اگر سلولی خالی است، شاید شرط پر بودن آن برقرار نشده است. پس این خانه ها متفاوت هستند و باید نام متفاوتی نیز به جای null داشته باشند.
</div>

In [48]:
# Change the name
df_en = df_merged_after_drop

# Conditions

## Condition No. 1
Delete all rows that is said they are incomplete \
meaning : if "F2_completed_form" is "2" then delete that row \
\
Then drop "F2_completed_form" and "F2_why_not_completed_form"

In [49]:
# Filter out rows where F2_completed_form is not equal to '2'
df_en = df_en.filter(df_en.F2_completed_form != '2')

# Drop "F2_completed_form" and "F2_why_not_completed_form"
# List of columns to drop:
columns_to_drop = ["F2_completed_form", "F2_why_not_completed_form"]

# Drop the specified columns and create a new DataFrame
df_en = df_en.drop(*columns_to_drop)

# Show the DataFrame after filtering
df_en.show()

+------------------+-------------+---------+--------------+-------------+--------------+--------------+-------------------+----------------+---------------------+-------------------------+------------------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------+------------+------------------+---------------------+----------------+----------------+------------------+-----------------------+----------------+--------------------------------+-----------------+---------------------+----------------+-------------------+--------------------+-----------------------+------------------------+--------------------------------+---------------------------------+------------------------------+-------------------------------+------------------------+-----------------------+----------------------+----------------------+----------------------------+-----------------------------+-------------------------+-----------

## Condition No. 2
P1- if Q1 is "1" then Q2 - Q7 is "NFC" \
P2- if Q2 is "1" then Q3 - Q7 is "NFC" \
P3- if Q3 is "1" then Q4 - Q7 is "NFC" \
P4- if Q4 is "1" then Q5 - Q7 is "NFC" \
P5- if Q5 is "1" then Q6 - Q7 is "NFC" \

## Condition No. 3
if Q6 is "2" then Q7 - Q30 is "NFC"


In [50]:
# Condition No. 2
# Part 1:
# Specify the list of columns - from Q2 to Q7
list_of_columns = ["F3_wsw_money",
                   "F3_wsw_work_family",
                   "F3_wsw_product_family",
                   "F3_wsw_intership",
                   "F3_wsw_skip_work",
                   "F3_wsw_why_skipped"]

for col_name in list_of_columns:
        df_en = df_en.withColumn(col_name,
                    when((col("F3_wsw_salary") == "1") & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name))) 

# Part 2:
# Specify the list of columns - from Q3 to Q7
list_of_columns = ["F3_wsw_work_family",
                   "F3_wsw_product_family",
                   "F3_wsw_intership",
                   "F3_wsw_skip_work",
                   "F3_wsw_why_skipped"]

for col_name in list_of_columns:
        df_en = df_en.withColumn(col_name,
                    when((col("F3_wsw_money") == "1") & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name))) 
        
# Part 3:
# Specify the list of columns - from Q4 to Q7
list_of_columns = ["F3_wsw_product_family",
                   "F3_wsw_intership",
                   "F3_wsw_skip_work",
                   "F3_wsw_why_skipped"]

for col_name in list_of_columns:
        df_en = df_en.withColumn(col_name,
                    when((col("F3_wsw_work_family") == "1") & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name))) 
        
# Part 4:
# Specify the list of columns - from Q5 to Q7
list_of_columns = ["F3_wsw_intership",
                   "F3_wsw_skip_work",
                   "F3_wsw_why_skipped"]

for col_name in list_of_columns:
        df_en = df_en.withColumn(col_name,
                    when((col("F3_wsw_product_family") == "1") & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name)))
        
# Part 5:
# Specify the list of columns - Q6 and Q7
list_of_columns = ["F3_wsw_skip_work",
                   "F3_wsw_why_skipped"]

for col_name in list_of_columns:
        df_en = df_en.withColumn(col_name,
                    when((col("F3_wsw_intership") == "1") & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name)))


# Condition No. 3
# Specify the list of columns - from Q7 to Q30
list_of_columns = [
        "F3_wsw_why_skipped",
        "F3_wsw_have_another_job",
        "F3_cej_job_title",
        "F3_cej_workplace_characteristics",
        "F3_cej_job_status",
        "F3_cej_workers_number",
        "F3_cej_insurance",
        "F3_cej_years_in_job",
        "F3_cej_months_in_job",
        "F3_cej_years_whole_work",
        "F3_cej_months_whole_work",
        "F3_hwj_season_prime_days_in_week",
        "F3_hwj_season_prime_hours_in_week",
        "F3_hwj_season_all_days_in_week",
        "F3_hwj_season_all_hours_in_week",
        "F3_hwj_why_hours_in_week",
        "F3_hwj_sum_hours_worked",
        "F3_hwj_why_less_worked",
        "F3_hwj_why_more_worked",
        "F3_dws_wanted_increase_hours",
        "F3_dws_ability_increase_hours",
        "F3_dws_how_increase_hours",
        "F3_dws_search_for_work",
        "F3_dws_why_search_for_work",
        "F3_dws_how_search_for_work",
        "F3_dws_season_desire_work_days",
        "F3_dws_season_desire_work_hours",
        "F3_cesj_job_title",
        "F3_cesj_workplace_characteristics",
        "F3_sj_which_ways_searched",
        "F3_cesj_job_status"]

for col_name in list_of_columns:
        if df_en.schema[col_name].dataType == StringType():
            df_en = df_en.withColumn(col_name,
                    when((col("F3_wsw_skip_work") == "2") & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name)))
        elif df_en.schema[col_name].dataType == IntegerType():
            df_en = df_en.withColumn(col_name,
                    when((col("F3_wsw_skip_work") == "2") & (col(col_name).isNull()), -1)
                   .otherwise(col(col_name)))    

## Condition No. 4
if "F3_wsw_have_another_job" is "2" \
then all of questions in "cesj" and "sj" and "pje" is "NFC" \

if "F3_wsw_have_another_job" is "1" \
then all of questions in "sj" and "pje" is "NFC"

if "F3_wsw_have_another_job" is "NFC" \
then all of questions in "cesj" and "cej" and "hwj" and "dws" is "NFC"

In [51]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import StringType, IntegerType

# Specify the list of columns for condition 2
list_of_columns_2 = ["F3_cesj_job_title", "F3_cesj_workplace_characteristics", "F3_cesj_job_status", "F3_sj_searched_in_previous_week", "F3_sj_which_ways_searched", "F3_sj_cause_not_searched", "F3_sj_ability_start_job", "F3_sj_previous_status", "F3_sj_unemployment_insurance_status", "F3_sj_duration_searching_job_year", "F3_sj_duration_searching_job_month", "F3_sj_worked_duration_year", "F3_sj_worked_duration_month", "F3_pje_2weeks_worked", "F3_pje_left_job_duration_year", "F3_pje_left_job_duration_month", "F3_pje_job_title", "F3_pje_workplace_characteristics", "F3_pje_job_status", "F3_pje_why_left_job", "F3_pje_ability_start_job", "F3_pje_why_not_start_job", "F3_pje_previous_week_status"]

# Specify the list of columns for condition 1
list_of_columns_1 = ["F3_sj_searched_in_previous_week", "F3_sj_which_ways_searched", "F3_sj_cause_not_searched", "F3_sj_ability_start_job", "F3_sj_previous_status", "F3_sj_unemployment_insurance_status", "F3_sj_duration_searching_job_year", "F3_sj_duration_searching_job_month", "F3_sj_worked_duration_year", "F3_sj_worked_duration_month", "F3_pje_2weeks_worked", "F3_pje_left_job_duration_year", "F3_pje_left_job_duration_month", "F3_pje_job_title", "F3_pje_workplace_characteristics", "F3_pje_job_status", "F3_pje_why_left_job", "F3_pje_ability_start_job", "F3_pje_why_not_start_job", "F3_pje_previous_week_status"]

# Specify the list of columns for condition "NFC"
list_of_columns_NFC = ["F3_cesj_job_title", "F3_cesj_workplace_characteristics", "F3_cesj_job_status", "F3_cej_job_title", "F3_cej_workplace_characteristics", "F3_cej_job_status", "F3_cej_workers_number", "F3_cej_insurance", "F3_cej_years_in_job", "F3_cej_months_in_job", "F3_cej_years_whole_work", "F3_cej_months_whole_work", "F3_hwj_season_prime_days_in_week", "F3_hwj_season_prime_hours_in_week", "F3_hwj_season_all_days_in_week", "F3_hwj_season_all_hours_in_week", "F3_hwj_why_hours_in_week", "F3_hwj_sum_hours_worked", "F3_hwj_why_less_worked", "F3_hwj_why_more_worked", "F3_dws_wanted_increase_hours", "F3_dws_ability_increase_hours", "F3_dws_how_increase_hours", "F3_dws_search_for_work", "F3_dws_why_search_for_work", "F3_dws_how_search_for_work", "F3_dws_season_desire_work_days", "F3_dws_season_desire_work_hours"]

# Update columns for condition 2
for col_name in list_of_columns_2:
    if df_en.schema[col_name].dataType == StringType():
        df_en = df_en.withColumn(col_name, when((col("F3_wsw_have_another_job") == "2") & (col(col_name).isNull()), "NFC").otherwise(col(col_name)))
    elif df_en.schema[col_name].dataType == IntegerType():
        df_en = df_en.withColumn(col_name, when((col("F3_wsw_have_another_job") == "2") & (col(col_name).isNull()), -1).otherwise(col(col_name)))

# Update columns for condition 1
for col_name in list_of_columns_1:
    if df_en.schema[col_name].dataType == StringType():
        df_en = df_en.withColumn(col_name, when((col("F3_wsw_have_another_job") == "1") & (col(col_name).isNull()), "NFC").otherwise(col(col_name)))
    elif df_en.schema[col_name].dataType == IntegerType():
        df_en = df_en.withColumn(col_name, when((col("F3_wsw_have_another_job") == "1") & (col(col_name).isNull()), -1).otherwise(col(col_name)))

# Update columns for condition "NFC"
for col_name in list_of_columns_NFC:
    if df_en.schema[col_name].dataType == StringType():
        df_en = df_en.withColumn(col_name, when((col("F3_wsw_have_another_job") == "NFC") & (col(col_name).isNull()), "NFC").otherwise(col(col_name)))
    elif df_en.schema[col_name].dataType == IntegerType():
        df_en = df_en.withColumn(col_name, when((col("F3_wsw_have_another_job") == "NFC") & (col(col_name).isNull()), -1).otherwise(col(col_name)))

# Show the updated DataFrame for condition 2
df_en.select(*list_of_columns_2, "F3_wsw_have_another_job").show()

# Show the updated DataFrame for condition 1
df_en.select(*list_of_columns_1, "F3_wsw_have_another_job").show()

# Show the updated DataFrame for condition "NFC"
df_en.select(*list_of_columns_NFC, "F3_wsw_have_another_job").show()


+-----------------+---------------------------------+------------------+-------------------------------+-------------------------+------------------------+-----------------------+---------------------+-----------------------------------+---------------------------------+----------------------------------+--------------------------+---------------------------+--------------------+-----------------------------+------------------------------+----------------+--------------------------------+-----------------+-------------------+------------------------+------------------------+---------------------------+-----------------------+
|F3_cesj_job_title|F3_cesj_workplace_characteristics|F3_cesj_job_status|F3_sj_searched_in_previous_week|F3_sj_which_ways_searched|F3_sj_cause_not_searched|F3_sj_ability_start_job|F3_sj_previous_status|F3_sj_unemployment_insurance_status|F3_sj_duration_searching_job_year|F3_sj_duration_searching_job_month|F3_sj_worked_duration_year|F3_sj_worked_duration_month|F3_pj

## Condition No. 5
if "F3_cej_job_status" is "5" or "7" or "8" \
then "F3_cej_workers_number" is "NFC"

In [52]:
df_en = df_en.withColumn("F3_cej_workers_number",
                   when((col("F3_cej_job_status").isin("05", "07", "08")) & (col("F3_cej_workers_number").isNull()), "NFC")
                   .otherwise(col("F3_cej_workers_number")))

# Show the updated DataFrame
df_en.select("F3_cej_job_status", "F3_cej_workers_number").show()

+-----------------+---------------------+
|F3_cej_job_status|F3_cej_workers_number|
+-----------------+---------------------+
|                 |                     |
|               04|                    4|
|                 |                     |
|             NULL|                 NULL|
|                 |                     |
|                 |                     |
|               04|                    3|
|                 |                     |
|                 |                     |
|                 |                     |
|               04|                    2|
|               04|                    1|
|               04|                    4|
|               04|                    2|
|                 |                     |
|             NULL|                 NULL|
|               04|                    1|
|                 |                     |
|               05|                     |
|                 |                     |
+-----------------+---------------

## Condition No. 6
if "F3_hwj_season_all_hours_in_week" is more than 44 \
then "F3_hwj_why_hours_in_week" is "NFC"

In [53]:
df_en = df_en.withColumn("F3_hwj_why_hours_in_week",
                   when((col("F3_hwj_season_all_hours_in_week") >= 44) & (col("F3_hwj_why_hours_in_week").isNull()), "NFC")
                   .otherwise(col("F3_hwj_why_hours_in_week")))

# Show the updated DataFrame
df_en.select("F3_hwj_season_all_hours_in_week", "F3_hwj_why_hours_in_week").show()

+-------------------------------+------------------------+
|F3_hwj_season_all_hours_in_week|F3_hwj_why_hours_in_week|
+-------------------------------+------------------------+
|                             -1|                        |
|                             48|                        |
|                             -1|                        |
|                           NULL|                    NULL|
|                             -1|                        |
|                             -1|                        |
|                             16|                       6|
|                             -1|                        |
|                             -1|                        |
|                             -1|                        |
|                             48|                        |
|                             48|                        |
|                             40|                       1|
|                             25|                       

## Condition No. 7
if "F3_hwj_sum_hours_worked" is == "F3_hwj_season_all_hours_in_week" \
then "F3_hwj_why_less_worked" and "F3_hwj_why_more_worked" is "NFC"

if "F3_hwj_sum_hours_worked" is > "F3_hwj_season_all_hours_in_week" \
then "F3_hwj_why_less_worked" is "NFC"

if "F3_hwj_sum_hours_worked" is < "F3_hwj_season_all_hours_in_week" \
then "F3_hwj_why_more_worked" is "NFC"

In [54]:
# Specify the list of columns:
list_of_columns = ["F3_hwj_why_more_worked", "F3_hwj_why_less_worked"]

df_en = df_en.withColumn(
    list_of_columns[0],
    when(
        (col("F3_hwj_sum_hours_worked") == col("F3_hwj_season_all_hours_in_week")) & (col(list_of_columns[0]).isNull()),
        "NFC"
    ).otherwise(col(list_of_columns[0]))
)

df_en = df_en.withColumn(
    list_of_columns[1],
    when(
        (col("F3_hwj_sum_hours_worked") == col("F3_hwj_season_all_hours_in_week")) & (col(list_of_columns[1]).isNull()),
        "NFC"
    ).otherwise(col(list_of_columns[1]))
)

df_en = df_en.withColumn("F3_hwj_why_less_worked",
                   when(((col("F3_hwj_sum_hours_worked") > col("F3_hwj_season_all_hours_in_week"))) & (col("F3_hwj_why_less_worked").isNull()), "NFC")
                   .otherwise(col("F3_hwj_why_less_worked")))

df_en = df_en.withColumn("F3_hwj_why_more_worked",
                   when(((col("F3_hwj_sum_hours_worked") < col("F3_hwj_season_all_hours_in_week"))) & (col("F3_hwj_why_more_worked").isNull()), "NFC")
                   .otherwise(col("F3_hwj_why_more_worked")))

# Show the updated DataFrame
df_en.select("F3_hwj_season_all_hours_in_week", "F3_hwj_why_hours_in_week", "F3_hwj_why_less_worked", "F3_hwj_why_more_worked").show()


+-------------------------------+------------------------+----------------------+----------------------+
|F3_hwj_season_all_hours_in_week|F3_hwj_why_hours_in_week|F3_hwj_why_less_worked|F3_hwj_why_more_worked|
+-------------------------------+------------------------+----------------------+----------------------+
|                             -1|                        |                      |                      |
|                             48|                        |                      |                      |
|                             -1|                        |                      |                      |
|                           NULL|                    NULL|                  NULL|                  NULL|
|                             -1|                        |                      |                      |
|                             -1|                        |                      |                      |
|                             16|                      

## Condition No. 8
Part 1:
if "F3_dws_wanted_increase_hours" is "2" \
then "F3_dws_ability_increase_hours" and "F3_dws_how_increase_hours" is "NFC"

Part 2:
if "F3_dws_ability_increase_hours" is "2" \
then "F3_dws_how_increase_hours" is "NFC"

## Condition No. 9
if "F3_dws_search_for_work" is "2" \
then "F3_dws_why_search_for_work" and "F3_dws_how_search_for_work" is "NFC"

In [55]:
# Condition 8
# Part 1
# Specify the list of columns
list_of_columns = ["F3_dws_ability_increase_hours", "F3_dws_how_increase_hours"]

for col_name in list_of_columns:
        df_en = df_en.withColumn(col_name,
                    when((col("F3_dws_wanted_increase_hours") == "2") & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name))) 

# Part 2
df_en = df_en.withColumn("F3_dws_how_increase_hours",
                        when((col("F3_dws_ability_increase_hours") == "2") & (col("F3_dws_how_increase_hours").isNull()), "NFC")
                        .otherwise(col("F3_dws_how_increase_hours")))

# Show the updated DataFrame
df_en.select("F3_dws_wanted_increase_hours", "F3_dws_wanted_increase_hours", "F3_dws_how_increase_hours").show()

# Condition 9
# Specify the list of columns:
list_of_columns = ["F3_dws_why_search_for_work", "F3_dws_how_search_for_work"]

df_en = df_en.withColumn(
    list_of_columns[0],
    when(
        (col("F3_dws_search_for_work") == "2") & (col(list_of_columns[0]).isNull()),
        "NFC"
    ).otherwise(col(list_of_columns[0]))
)

df_en = df_en.withColumn(
    list_of_columns[1],
    when(
        (col("F3_dws_search_for_work") == "2") & (col(list_of_columns[1]).isNull()),
        "NFC"
    ).otherwise(col(list_of_columns[1]))
)

# Show the updated DataFrame
df_en.select("F3_dws_why_search_for_work", "F3_dws_how_search_for_work", "F3_dws_search_for_work").show()

+----------------------------+----------------------------+-------------------------+
|F3_dws_wanted_increase_hours|F3_dws_wanted_increase_hours|F3_dws_how_increase_hours|
+----------------------------+----------------------------+-------------------------+
|                            |                            |                         |
|                           2|                           2|                         |
|                            |                            |                         |
|                        NULL|                        NULL|                     NULL|
|                            |                            |                         |
|                            |                            |                         |
|                           1|                           1|                        1|
|                            |                            |                         |
|                            |                        

## Condition No. 10
Part 1:\
if "F3_sj_searched_in_previous_week" is "2" \
then "F3_sj_which_ways_searched" is "NFC"

Part 2:\
if "F3_sj_which_ways_searched" is "00000000" \
then "F3_sj_which_ways_searched" is "NFC"

## Condition No. 11
if "F3_sj_searched_in_previous_week" is "1" \
then "F3_sj_cause_not_searched" is "NFC"

In [56]:
# Condition 10
# Part 1
df_en = df_en.withColumn("F3_sj_which_ways_searched",
                   when((col("F3_sj_searched_in_previous_week") == "2") & (col("F3_sj_which_ways_searched").isNull()), "NFC")
                   .otherwise(col("F3_sj_which_ways_searched")))

# Show the updated DataFrame
df_en.select("F3_sj_searched_in_previous_week", "F3_sj_which_ways_searched").show()

# Part 2
df_en = df_en.withColumn("F3_sj_which_ways_searched",
                   when((col("F3_sj_which_ways_searched") == "00000000"), "NFC")
                   .otherwise(col("F3_sj_which_ways_searched")))

# Show the updated DataFrame
df_en.select("F3_sj_which_ways_searched").show()



# Condition 11
df_en = df_en.withColumn("F3_sj_cause_not_searched",
                   when((col("F3_sj_searched_in_previous_week") == "1") & (col("F3_sj_cause_not_searched").isNull()), "NFC")
                   .otherwise(col("F3_sj_cause_not_searched")))

# Show the updated DataFrame
df_en.select("F3_sj_cause_not_searched", "F3_sj_searched_in_previous_week").show()

+-------------------------------+-------------------------+
|F3_sj_searched_in_previous_week|F3_sj_which_ways_searched|
+-------------------------------+-------------------------+
|                              2|                000000000|
|                               |                000000000|
|                              2|                000000000|
|                           NULL|                000000000|
|                              2|                000000000|
|                              2|                000000000|
|                               |                000000000|
|                              2|                000000000|
|                              2|                000000000|
|                              2|                000000000|
|                               |                000000000|
|                               |                000000000|
|                               |                000000000|
|                               |       

## Condition No. 12
if "F3_sj_cause_not_searched" is "03" or "04" or "05" or "06" or "07" \
then Q34 - Q44 is "NFC"

## Condition No. 13
if "F3_sj_cause_not_searched" is "08" or "09" or "10" or "11" or "12" \
then Q34 - Q46 is "NFC"

## Condition No. 14
if F3_sj_ability_start_job is "2" \
then Q35 - Q45 is "NFC"

In [57]:
# Condition No. 13
# Specify the list of columns - from Q34 to Q46
list_of_columns = ["F3_sj_ability_start_job",
                   "F3_sj_duration_searching_job_year",
                   "F3_sj_duration_searching_job_month",
                   "F3_sj_previous_status",
                   "F3_sj_worked_duration_year",
                   "F3_sj_worked_duration_month",
                   "F3_sj_unemployment_insurance_status",
                   "F3_pje_2weeks_worked",
                   "F3_pje_left_job_duration_year",
                   "F3_pje_left_job_duration_month",
                   "F3_pje_job_title",
                   "F3_pje_workplace_characteristics",
                   "F3_pje_job_status",
                   "F3_pje_why_left_job",
                   "F3_pje_ability_start_job",
                   "F3_pje_why_not_start_job"]

for col_name in list_of_columns:
    if df_en.schema[col_name].dataType == StringType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_sj_cause_not_searched").isin("08", "09", "10", "11", "12")) & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name)))
    elif df_en.schema[col_name].dataType == IntegerType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_sj_cause_not_searched").isin("08", "09", "10", "11", "12")) & (col(col_name).isNull()), -1)
                   .otherwise(col(col_name)))   

# Condition No. 12
# Specify the list of columns - from Q34 to Q44
list_of_columns = ["F3_sj_ability_start_job",
                   "F3_sj_duration_searching_job_year",
                   "F3_sj_duration_searching_job_month",
                   "F3_sj_previous_status",
                   "F3_sj_worked_duration_year",
                   "F3_sj_worked_duration_month",
                   "F3_sj_unemployment_insurance_status",
                   "F3_pje_2weeks_worked",
                   "F3_pje_left_job_duration_year",
                   "F3_pje_left_job_duration_month",
                   "F3_pje_job_title",
                   "F3_pje_workplace_characteristics",
                   "F3_pje_job_status",
                   "F3_pje_why_left_job"]

for col_name in list_of_columns:
    if df_en.schema[col_name].dataType == StringType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_sj_cause_not_searched").isin("03", "04", "05", "06", "07")) & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name)))
    elif df_en.schema[col_name].dataType == IntegerType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_sj_cause_not_searched").isin("03", "04", "05", "06", "07")) & (col(col_name).isNull()), -1)
                   .otherwise(col(col_name)))   

# Condition No. 14
# Specify the list of columns - from Q35 to Q45
list_of_columns = ["F3_sj_duration_searching_job_year",
                   "F3_sj_duration_searching_job_month",
                   "F3_sj_previous_status",
                   "F3_sj_worked_duration_year",
                   "F3_sj_worked_duration_month",
                   "F3_sj_unemployment_insurance_status",
                   "F3_pje_2weeks_worked",
                   "F3_pje_left_job_duration_year",
                   "F3_pje_left_job_duration_month",
                   "F3_pje_job_title",
                   "F3_pje_workplace_characteristics",
                   "F3_pje_job_status",
                   "F3_pje_why_left_job",
                   "F3_pje_ability_start_job"]

for col_name in list_of_columns:
    if df_en.schema[col_name].dataType == StringType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_sj_ability_start_job") == "2") & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name)))
    elif df_en.schema[col_name].dataType == IntegerType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_sj_ability_start_job") == "2") & (col(col_name).isNull()), -1)
                   .otherwise(col(col_name)))  



## Condition No. 15
if "F3_sj_previous_status" is "2" or "3" or "4" or "5" or "6" or \
then "F3_sj_unemployment_insurance_status" and "F3_sj_worked_duration_year" and "F3_sj_worked_duration_month" is "NFC" or -1

## Condition No. 16
if "F3_pje_2weeks_worked" is "2" \
then Q40 - Q47 is "NFC"

## Condition No. 17
if "F3_pje_why_left_job" is from "01" to "14" \
then "F3_pje_ability_start_job" and " F3_pje_why_not_start_job" and " F3_pje_previous_week_status" is "NFC"

## Condition No. 18
if "F3_pje_ability_start_job" is "1" \
then "F3_pje_why_not_start_job" is "NFC"

In [58]:
# Condition No. 15
# Specify the list of columns
list_of_columns = ["F3_sj_unemployment_insurance_status",
                   "F3_sj_worked_duration_year",
                   "F3_sj_worked_duration_month"]

for col_name in list_of_columns:
    if df_en.schema[col_name].dataType == StringType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_sj_previous_status").isin("2", "3", "4", "5", "6")) & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name)))
    elif df_en.schema[col_name].dataType == IntegerType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_sj_previous_status").isin("2", "3", "4", "5", "6")) & (col(col_name).isNull()), -1)
                   .otherwise(col(col_name)))
        

# Condition No. 16
# Specify the list of columns - from Q40 to Q47
list_of_columns = ["F3_pje_left_job_duration_year",
                   "F3_pje_left_job_duration_month",
                   "F3_pje_job_title",
                   "F3_pje_workplace_characteristics",
                   "F3_pje_job_status",
                   "F3_pje_why_left_job",
                   "F3_pje_ability_start_job",
                   "F3_pje_why_not_start_job",
                   "F3_pje_previous_week_status"]

for col_name in list_of_columns:
    if df_en.schema[col_name].dataType == StringType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_pje_2weeks_worked") == "2") & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name)))
    elif df_en.schema[col_name].dataType == IntegerType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_pje_2weeks_worked") == "2") & (col(col_name).isNull()), -1)
                   .otherwise(col(col_name)))


# Condition No. 17
# Specify the list of columns
list_of_columns = ["F3_pje_ability_start_job", "F3_pje_why_not_start_job", "F3_pje_previous_week_status"]

for col_name in list_of_columns:
    if df_en.schema[col_name].dataType == StringType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_pje_why_left_job").isin("01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14")) & (col(col_name).isNull()), "NFC")
                   .otherwise(col(col_name)))
    elif df_en.schema[col_name].dataType == IntegerType():
        df_en = df_en.withColumn(col_name,
                   when((col("F3_pje_why_left_job").isin("01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14")) & (col(col_name).isNull()), -1)
                   .otherwise(col(col_name)))
        

# Condition 18
df_en = df_en.withColumn("F3_pje_why_not_start_job",
                   when((col("F3_pje_ability_start_job") == "1") & (col("F3_pje_why_not_start_job").isNull()), "NFC")
                   .otherwise(col("F3_pje_why_not_start_job")))

# Show the updated DataFrame
df_en.select("F3_pje_why_not_start_job", "F3_pje_ability_start_job").show()

+------------------------+------------------------+
|F3_pje_why_not_start_job|F3_pje_ability_start_job|
+------------------------+------------------------+
|                        |                        |
|                        |                        |
|                        |                        |
|                    NULL|                    NULL|
|                        |                        |
|                        |                        |
|                        |                        |
|                        |                        |
|                        |                        |
|                        |                        |
|                        |                        |
|                        |                        |
|                        |                        |
|                        |                        |
|                        |                        |
|                    NULL|                    NULL|
|           

In [59]:
# Use the summary() method to get statistics for all columns
all_columns_summary = df_en.summary()

# Show the results
all_columns_summary.show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+---------------------+-------------------------+------------------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+---------------------+--------------------+-------------------+------------------+-----------------------+------------------+--------------------------------+------------------+---------------------+-------------------+-------------------+--------------------+-----------------------+------------------------+--------------------------------+---------------------------------+------------------------------+-------------------------------+------------------------+-----------------------+----------------------+----------------------+--------------------------

In [60]:
# we dont consider F2 when dropping
columns_to_check = [
    "F3_wsw_salary",
    "F3_wsw_money",
    "F3_wsw_work_family",
    "F3_wsw_product_family",
    "F3_wsw_intership",
    "F3_wsw_skip_work",
    "F3_wsw_why_skipped",
    "F3_wsw_have_another_job",
    "F3_cej_job_title",
    "F3_cej_workplace_characteristics",
    "F3_cej_job_status",
    "F3_cej_workers_number",
    "F3_cej_insurance",
    "F3_cej_years_in_job",
    "F3_cej_months_in_job",
    "F3_cej_years_whole_work",
    "F3_cej_months_whole_work",
    "F3_hwj_season_prime_days_in_week",
    "F3_hwj_season_prime_hours_in_week",
    "F3_hwj_season_all_days_in_week",
    "F3_hwj_season_all_hours_in_week",
    "F3_hwj_why_hours_in_week",
    "F3_hwj_sum_hours_worked",
    "F3_hwj_why_less_worked",
    "F3_hwj_why_more_worked",
    "F3_dws_wanted_increase_hours",
    "F3_dws_ability_increase_hours",
    "F3_dws_how_increase_hours",
    "F3_dws_search_for_work",
    "F3_dws_why_search_for_work",
    "F3_dws_how_search_for_work",
    "F3_dws_season_desire_work_days",
    "F3_dws_season_desire_work_hours",
    "F3_cesj_job_title",
    "F3_cesj_workplace_characteristics",
    "F3_cesj_job_status",
    "F3_sj_searched_in_previous_week",
    "F3_sj_cause_not_searched",
    "F3_sj_ability_start_job",
    "F3_sj_duration_searching_job_year",
    "F3_sj_duration_searching_job_month",
    "F3_sj_previous_status",
    "F3_sj_worked_duration_year",
    "F3_sj_worked_duration_month",
    "F3_sj_unemployment_insurance_status",
    "F3_pje_2weeks_worked",
    "F3_pje_left_job_duration_year",
    "F3_pje_left_job_duration_month",
    "F3_pje_job_title",
    "F3_pje_workplace_characteristics",
    "F3_pje_job_status",
    "F3_pje_why_left_job",
    "F3_pje_ability_start_job",
    "F3_pje_why_not_start_job",
    "F3_pje_previous_week_status",
    "F3_cdj_work_hours",
    "F3_cdj_work_days",
    "F3_cdj_which_job_status",
    "F3_cdj_which_industry",
    "F3_sj_which_ways_searched"
]


df_en = df_en.dropna(subset=columns_to_check)

In [61]:
# Use the summary() method to get statistics for all columns
all_columns_summary = df_en.summary()

# Show the results
all_columns_summary.show()

In [None]:
# Now beacause we have 10 datasets, we must add a new column containing the year of questionnaire
# Add a nullable integer column with a default value of 90
new_column_name = "questionnaire_year"
default_value = 90
df_en = df_en.withColumn(new_column_name, lit(default_value).cast("int"))

# Display the DataFrame with the new column
#df_en.show()

In [None]:
# Save the dataframe
# Specify the output path for the CSV file
csv_path = "./Data90.csv"

# Write DataFrame to CSV with overwrite mode and header
df_en.write.mode("overwrite").option("header", "true").csv(csv_path)

