In [1]:
import pyspark
from pyspark.sql import SparkSession
from itertools import chain
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pyspark.sql.functions as F
from pyspark.sql.functions import col, expr, substring, create_map, lit, when

StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 3, Finished, Available, Finished)

In [2]:

spark = SparkSession.builder.appName("DataAnalysisAssessment").getOrCreate()


StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 4, Finished, Available, Finished)

In [3]:
# Define the schema with metadata
schema = StructType([
    StructField("HRHHID", StringType(), nullable=True, metadata={"description": "Household Identifier (Part 1)"}),
    StructField("HRHHID2", StringType(), nullable=True, metadata={"description": "Household Identifier (Part 2)"}),
    StructField("QSTNUM", StringType(), nullable=True, metadata={"description": "Unique Household Identifier (Valid only within a specific month)"}),
    StructField("HRMONTH", StringType(), nullable=True, metadata={"description": "Month of Interview"}),
    StructField("HRYEAR4", StringType(), nullable=True, metadata={"description": "Year of Interview"}),
    StructField("HUFINAL", StringType(), nullable=True, metadata={"description": "Final Outcome Code"}),
    StructField("HEHOUSUT", StringType(), nullable=True, metadata={"description": "Type of Housing Unit"}),
    StructField("HRHTYPE", StringType(), nullable=True, metadata={"description": "Household Type"}),
    StructField("HETELHHD", StringType(), nullable=True, metadata={"description": "Is there a telephone in this house/apartment?"}),
    StructField("HETELAVL", StringType(), nullable=True, metadata={"description": "Is there a telephone elsewhere which people in this household can be contacted?"}),
    StructField("HEPHONEO", StringType(), nullable=True, metadata={"description": "Is a telephone interview acceptable?"}),
    StructField("HUINTTYP", StringType(), nullable=True, metadata={"description": "Type of Interview"}),
    StructField("HEFAMINC", StringType(), nullable=True, metadata={"description": "Family Income"}),
    StructField("GEREG", StringType(), nullable=True, metadata={"description": "Region"}),
    StructField("GEDIV", StringType(), nullable=True, metadata={"description": "Division"}),
    StructField("PTDTRACE", StringType(), nullable=True, metadata={"description": "Race"})
])


StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 5, Finished, Available, Finished)

In [4]:
# Datapath

data_path = "./data/dec17pub.dat"

StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 6, Finished, Available, Finished)

In [None]:

# importing the zipfile module 
from zipfile import ZipFile 

try:
    # loading the temp.zip and creating a zip object 
    with ZipFile("./data/dec17pub.zip", 'r') as zObject: 
    
        # Extracting all the members of the zip  
        # into a specific location. 
        zObject.extractall( 
            path="data_path") 
except:
    pass

In [5]:
def strim_cols(df: pyspark.sql.DataFrame)->pyspark.sql.DataFrame:
    """
    Trim leading and trailing whitespace from all string columns in the DataFrame.

    This function iterates over all columns in the DataFrame and applies the `trim` function to
    columns that are of string type. Leading and trailing whitespace characters are removed from
    the values in these columns. Columns that are not of string type are left unchanged.

    Parameters:
    ----------
    df : DataFrame
        The input DataFrame on which the trimming operation is to be performed.

    Returns:
    -------
    DataFrame
        A new DataFrame with leading and trailing whitespace removed from all string columns.

    """    
    for column in df.columns:
        df = df.withColumn(column, F.trim(column))

    return df



StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 7, Finished, Available, Finished)

In [6]:
def load_data(data_path: str = data_path)->pyspark.sql.DataFrame:
    """----------
    data_path : str
        The path to the CSV file that contains the raw data.

    Returns:
    -------
    DataFrame
        A Spark DataFrame with the following columns extracted and renamed:
        - 'HRHHID': Household Identifier (Part 1) - Extracted from characters 1 to 15.
        - 'HRHHID2': Household Identifier (Part 2) - Extracted from characters 71 to 75.
        - 'QSTNUM': Unique Household Identifier - Extracted from characters 815 to 819.
        - 'HRMONTH': Month of Interview - Extracted from characters 16 to 17.
        - 'HRYEAR4': Year of Interview - Extracted from characters 18 to 21.
        - 'HUFINAL': Final Outcome Code - Extracted from characters 24 to 26.
        - 'HEHOUSUT': Type of Housing Unit - Extracted from characters 31 to 32.
        - 'HRHTYPE': Household Type - Extracted from characters 61 to 62.
        - 'HETELHHD': Is There a Telephone in This House/Apartment? - Extracted from characters 33 to 34.
        - 'HETELAVL': Is There a Telephone Elsewhere on Which People in This Household Can Be Contacted? - Extracted from characters 35 to 36.
        - 'HEPHONEO': Is a Telephone Interview Acceptable? - Extracted from characters 37 to 38.
        - 'HUINTTYP': Type of Interview - Extracted from characters 65 to 66.
        - 'HEFAMINC': Family Income - Extracted from characters 39 to 40.
        - 'GEREG': Region - Extracted from characters 89 to 90.
        - 'GEDIV': Division - Extracted from character 91.
        - 'PTDTRACE': Race - Extracted from characters 139 to 140.

    Notes:
    -----
    - The function assumes that the fat file contains a single column named '_c0' with concatenated raw data.
    - The function drops the original raw column '_c0' after extracting the relevant substrings.
    - The function requires a Spark session to be active for reading and processing the data.
    """

    df = spark.read.format("csv").load(data_path)
    df = df.withColumn("HRHHID", substring("_c0", 1, 15))
    df = df.withColumn("HRHHID2", substring("_c0", 71, 5))
    df = df.withColumn("QSTNUM", substring("_c0", 815, 5))
    df = df.withColumn("HRMONTH", substring("_c0", 16, 2))
    df = df.withColumn("HRYEAR4", substring("_c0", 18, 4))
    df = df.withColumn("HUFINAL", substring("_c0", 24, 3))
    df = df.withColumn("HEHOUSUT", substring("_c0", 31, 2))
    df = df.withColumn("HRHTYPE", substring("_c0", 61, 2))
    df = df.withColumn("HETELHHD", substring("_c0", 33, 2))
    df = df.withColumn("HETELAVL", substring("_c0", 35, 2))
    df = df.withColumn("HEPHONEO", substring("_c0", 37, 2))
    df = df.withColumn("HUINTTYP", substring("_c0", 65, 2))
    df = df.withColumn("HEFAMINC", substring("_c0", 39, 2))
    df = df.withColumn("GEREG", substring("_c0", 89, 2))
    df = df.withColumn("GEDIV", substring("_c0", 91, 1))
    df = df.withColumn("PTDTRACE", substring("_c0", 139, 2))

    # Drop the raw column
    df = df.drop("_c0")

    return  strim_cols(df)


StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 8, Finished, Available, Finished)

In [7]:
# income range mapping
income_range_mapping = {
    "1": "Less $5,000",
    "2": "$5,000 - $7,499",
    "3": "$7,500 - $9,999",
    "4": "$10,000 - $12,499",
    "5": "$12,500 - $14,999",
    "6": "$15,000 - $19,999",
    "7": "$20,000 - $24,999",
    "8": "$25,000 - $29,999",
    "9": "$30,000 - $34,999",
    "10": "$35,000 - $39,999",
    "11": "$40,000 - $49,999",
    "12": "$50,000 - $59,999",
    "13": "$60,000 - $74,999",
    "14": "$75,000 - $99,999",
    "15": "$100,000 - $149,999",
    "16": "$150,000 or more"
}

# Define the division mapping
division_mapping = {
    "1": "New England",
    "2": "Middle Atlantic",
    "3": "East North Central",
    "4": "West North Central",
    "5": "South Atlantic",
    "6": "East South Central",
    "7": "West South Central",
    "8": "Mountain",
    "9": "Pacific"
}

# Define the race mapping
race_mapping = {
    "1": "White Only",
    "2": "Black Only",
    "3": "American Indian, Alaskan Native Only",
    "4": "Asian Only",
    "5": "Hawaiian/Pacific Islander Only",
    "6": "White-Black",
    "7": "White-AI",
    "8": "White-Asian",
    "9": "White-HP",
    "10": "Black-AI",
    "11": "Black-Asian",
    "12": "Black-HP",
    "13": "AI-Asian",
    "14": "AI-HP",
    "15": "Asian-HP",
    "16": "W-B-AI",
    "17": "W-B-A",
    "18": "W-B-HP",
    "19": "W-AI-A",
    "20": "W-AI-HP",
    "21": "W-A-HP",
    "22": "B-AI-A",
    "23": "W-B-AI-A",
    "24": "W-AI-A-HP",
    "25": "Other 3 Race Combinations",
    "26": "Other 4 and 5 Race Combinations"
}



StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 9, Finished, Available, Finished)

# Mapper

In [8]:
# defmapper(df):Mapp
    # return df.withColumn(c, when(df[c].isin(list(race_mapping.keys())),  df[c]).otherwise('Others'))

def Mapper(df:pyspark.sql.DataFrame, column_name:str , dict_mapper:dict):

    """
     Trim leading and trailing whitespace from all string columns in the DataFrame.

    This function iterates over all columns in the DataFrame and applies the `trim` function to
    columns that are of string type. Leading and trailing whitespace characters are removed from
    the values in these columns. Columns that are not of string type are left unchanged.

    Parameters:
    ----------
    df : DataFrame
        The input DataFrame on which the trimming operation is to be performed.

    Returns:
    -------
    DataFrame
        A new DataFrame with leading and trailing whitespace removed from all string columns.

    """
    # Convert the dictionary into a Spark DataFrame for the mapping
    cols = df.columns
    mapping_df = spark.createDataFrame(dict_mapper.items(), ["code", "value"])

    # Join the original DataFrame with the mapping DataFrame
    df2 = df.join(mapping_df, df[column_name] == mapping_df["code"], "left")

    # Replace null values with 'Others' for unmatched codes
    df2 = df2.withColumn(column_name, when(col("value").isNull(), 'Others').otherwise(col("value")))

    df2 = df2.select(cols)

    return df2    

StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 10, Finished, Available, Finished)

# LOad dataset from the source

In [9]:
df = load_data(data_path)

StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 11, Finished, Available, Finished)

# Income Range by responders



    Mapping Definition: The income_range_mapping dictionary provides a mapping from the numeric income codes to their descriptive ranges.

    Transform Data: Using expr in PySpark, we create a new column FamilyIncomeRange based on the HEFAMINC column values.

    Group and Count: We then group the DataFrame by the FamilyIncomeRange column and count the occurrences.

    Display Results: Finally, the results are ordered and displayed to show the count of responders per income range.

    This will give you a clear count of responders across each defined income range, making the data more interpretable.


In [10]:
column_name = 'HEFAMINC'


income_mapped = Mapper(df, column_name, income_range_mapping)

income_counts = income_mapped.groupBy(column_name).count().orderBy('count', ascending = False)

income_counts = income_counts.withColumnRenamed('coumt', 'IncomeCount')
income_counts.show(truncate=False)


StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 12, Finished, Available, Finished)

+-------------------+-----+
|HEFAMINC           |count|
+-------------------+-----+
|Others             |20391|
|$100,000 - $149,999|17794|
|$75,000 - $99,999  |16557|
|$150,000 or more   |15704|
|$60,000 - $74,999  |13442|
|$50,000 - $59,999  |9971 |
|$40,000 - $49,999  |9788 |
|$30,000 - $34,999  |6743 |
|$35,000 - $39,999  |6620 |
|$20,000 - $24,999  |6312 |
|$25,000 - $29,999  |5803 |
|$15,000 - $19,999  |4518 |
|$10,000 - $12,499  |3161 |
|Less $5,000        |3136 |
|$12,500 - $14,999  |2614 |
|$7,500 - $9,999    |2277 |
|$5,000 - $7,499    |1625 |
+-------------------+-----+



# TOP 10 count of responders per geographical division/location and race    
    
    Mappings: Define mappings for geographical divisions and races.

    Transform Data: Use expr to create new columns GeographicalDivision and Race with their descriptive names based on encoded values.

    Group and Count: Group the DataFrame by GeographicalDivision and Race, then count the occurrences.

    Display Results: The show(10, truncate=False) function displays the top 10 results without truncating the output, so you can see the full names.

    This will provide a detailed view of the number of responders grouped by geographical division and race, with meaningful names rather than encoded values.

In [11]:
divivsion_race_columns = {"GEDIV":division_mapping, "PTDTRACE": race_mapping}

division_race = df.select("HRHHID", "PTDTRACE", "GEDIV")


division_race_counts = division_race.groupBy("GEDIV", "PTDTRACE").count()

division_race_counts = Mapper(division_race_counts, "PTDTRACE", race_mapping)

division_race_counts = Mapper(division_race_counts, "GEDIV", division_mapping)


division_race_counts.orderBy("count", ascending=False).show(10)



StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 13, Finished, Available, Finished)

+------------------+----------+-----+
|             GEDIV|  PTDTRACE|count|
+------------------+----------+-----+
|    South Atlantic|White Only|16999|
|          Mountain|White Only|14343|
|           Pacific|White Only|13214|
|East North Central|White Only|11325|
|West South Central|White Only|11248|
|West North Central|White Only| 9884|
|   Middle Atlantic|White Only| 8487|
|       New England|White Only| 8410|
|East South Central|White Only| 6580|
|    South Atlantic|Black Only| 4899|
+------------------+----------+-----+
only showing top 10 rows



# Count of Responders with No Home Telephone but Access to Telephone Elsewhere and Accepting Telephone Interviews



In [12]:
telephone_access_counts = df.filter(
    (col("HETELHHD") == "2") &  # No telephone in house
    (col("HETELAVL") == "1") &  # Telephone elsewhere
    (col("HEPHONEO") == "1")    # Telephone interview accepted
).count()

print(f"Number of responders without telephone at home but can access telephone elsewhere and accept telephone interview: {telephone_access_counts}")


StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 14, Finished, Available, Finished)

Number of responders without telephone at home but can access telephone elsewhere and accept telephone interview: 633


# Number of responders who can access a telephone but telephone interview is not accepted

In [13]:
telephone_no_interview_counts = df.filter(
    (col("HETELHHD") == "1") &  # Telephone in house
    (col("HEPHONEO") == "2")    # Telephone interview not accepted
).count()

print(f"Number of responders who can access a telephone but telephone interview is not accepted: {telephone_no_interview_counts}")


StatementMeta(, 050ec653-b802-4622-a117-fb16475f7f2e, 15, Finished, Available, Finished)

Number of responders who can access a telephone but telephone interview is not accepted: 0
