In [1]:

!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 35.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=81bc7f315aef6f3eb4e447401813788bb3c15aad20f7ec069e83129f933e5c3d
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [1]:
#importing relevant libs/packages
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DateType

In [2]:
#instantiate a spark session
spark = SparkSession.builder \
.appName('ISW_assessment') \
.master('local[*]') \
.config('spark.driver.memory','32G') \
.config('spark.ui.showConsoleProgress', True) \
.config('spark.sql.repl.eagerEval.enabled', True) \
.getOrCreate()

In [3]:
#load the raw .dat file into a spark DataFrames
df = spark.read.csv("dec17pub.dat")

In [4]:
#quick preview of the raw DataFrames
df.show(4)

+--------------------+
|                 _c0|
+--------------------+
|00000479511071912...|
|00000479511071912...|
|00007169100494112...|
|00007169100494112...|
+--------------------+
only showing top 4 rows



In [5]:
#split the raw DataFrames into columns, according to the data dictionary, into a new DataFrames 
census = df.select(F.concat_ws("",F.substring("_c0", 1,15),F.substring("_c0", 71,5)).alias("Household_id"),\
                        F.concat_ws("/",F.substring("_c0", 18,4),F.date_format(F.to_date(F.substring("_c0", 16,2), "MM"),"MMM")).alias("Time_of_interview"),\
                        F.substring("_c0", 24,3).alias("Final_outcome"),\
                        F.substring("_c0", 31,2).alias("Type_of_HH_unit"),\
                        F.substring("_c0", 33,2).alias("Telephone_in_house"),\
                        F.substring("_c0", 35,2).alias("Access_to_Tel"),\
                        F.substring("_c0", 37,2).alias("Phone_interview_acc"),\
                        F.substring("_c0", 39,2).alias("Family_income_range"),\
                        F.substring("_c0", 61,2).alias("Household_type"),\
                        F.substring("_c0", 65,2).alias("Type_of_interview"),\
                        F.substring("_c0", 91,1).alias("Geo_division"),\
                        F.substring("_c0", 139,2).alias("Race"))

In [6]:
#Cleaning up these columns
census = census.withColumn("Access_to_Tel", F.expr("CASE WHEN Access_to_Tel  LIKE '%1%' THEN 'YES' " +
           "WHEN Access_to_Tel LIKE '%2%' THEN 'NO' END")) \
           .withColumn("Telephone_in_house", F.expr("CASE WHEN Telephone_in_house LIKE '%1%' THEN 'YES' " +
           "WHEN Telephone_in_house LIKE '%2%' THEN 'NO' END")) \
           .withColumn("Phone_interview_acc", F.expr("CASE WHEN Phone_interview_acc LIKE '%1%' THEN 'YES' " +
           "WHEN Phone_interview_acc LIKE '%2%' THEN 'NO' END"))

In [7]:
#Display the top 10 rows
census.show(10)

+--------------------+-----------------+-------------+---------------+------------------+-------------+-------------------+-------------------+--------------+-----------------+------------+----+
|        Household_id|Time_of_interview|Final_outcome|Type_of_HH_unit|Telephone_in_house|Access_to_Tel|Phone_interview_acc|Family_income_range|Household_type|Type_of_interview|Geo_division|Race|
+--------------------+-----------------+-------------+---------------+------------------+-------------+-------------------+-------------------+--------------+-----------------+------------+----+
|00000479511071906011|         2017/Dec|          201|              1|               YES|          YES|                YES|                  9|             1|                2|           6|   1|
|00000479511071906011|         2017/Dec|          201|              1|               YES|          YES|                YES|                  9|             1|                2|           6|   1|
|00007169100494106111|   

In [8]:
#Display the DataFrames schema
census.printSchema()

root
 |-- Household_id: string (nullable = false)
 |-- Time_of_interview: string (nullable = false)
 |-- Final_outcome: string (nullable = true)
 |-- Type_of_HH_unit: string (nullable = true)
 |-- Telephone_in_house: string (nullable = true)
 |-- Access_to_Tel: string (nullable = true)
 |-- Phone_interview_acc: string (nullable = true)
 |-- Family_income_range: string (nullable = true)
 |-- Household_type: string (nullable = true)
 |-- Type_of_interview: string (nullable = true)
 |-- Geo_division: string (nullable = true)
 |-- Race: string (nullable = true)



In [9]:
#Create a spark DataFrames to interprete the family income column
family_income = [(1,"LESS THAN $5,000"),
(2,"5,000 TO 7,499"),
(3,"7,500 TO 9,999"),
(4,"10,000 TO 12,499"),
(5,"12,500 TO 14,999"),
(6,"15,000 TO 19,999"),
(7,"20,000 TO 24,999"),
(8,"25,000 TO 29,999"),
(9,"30,000 TO 34,999"),
(10,"35,000 TO 39,999"),
(11,"40,000 TO 49,999"),
(12,"50,000 TO 59,999"),
(13,"60,000 TO 74,999"),
(14,"75,000 TO 99,999"),
(15,"100,000 TO 149,999"),
(16,"150,000 OR MORE")]
columns = ["code", "income_range"]
family_income_df = spark.createDataFrame(family_income, columns)

In [10]:
#Create a spark DataFrames to interprete the Geographical division column
geo_division = [(1,"NEW ENGLAND"),
(2,"MIDDLE ATLANTIC"),
(3,"EAST NORTH CENTRAL"),
(4,"WEST NORTH CENTRAL"),
(5,"SOUTH ATLANTIC"),
(6,"EAST SOUTH CENTRAL"),
(7,"WEST SOUTH CENTRAL"),
(8,"MOUNTAIN"),
(9,"PACIFIC")]
columns = ["code", "geo_div"]
geo_division_df = spark.createDataFrame(geo_division, columns)

In [11]:
#Create a spark DataFrames to interprete the race column
race = [(1,"White Only"),
(2,"Black Only"),
(3,"American Indian, Alaskan Native Only"),
(4,"Asian Only"),
(5,"Hawaiian/Pacific Islander Only"),
(6,"White-Black"),
(7,"White-AI"),
(8,"White-Asian"),
(9,"White-HP"),
(10,"Black-AI"),
(11,"Black-Asian"),
(12,"Black-HP"),
(13,"AI-Asian"),
(14,"AI-HP"),
(15,"Asian-HP"),
(16,"W-B-AI"),
(17,"W-B-A"),
(18,"W-B-HP"),
(19,"W-AI-A"),
(20,"W-AI-HP"),
(21,"W-A-HP"),
(22,"B-AI-A"),
(23,"W-B-AI-A"),
(24,"W-AI-A-HP"),
(25,"Other 3 Race Combinations"),
(26 ,"Other 4 and 5 Race Combinations")]
columns = ["code", "races"]
race_df = spark.createDataFrame(race, columns)

In [12]:
#Create a local temporary view with these DataFrames
census.createOrReplaceTempView("census_tab")
geo_division_df.createOrReplaceTempView("geo_division")
family_income_df.createOrReplaceTempView("family_income")
race_df.createOrReplaceTempView("race")
#lookup_df.createOrReplaceTempView("lookup")

In [13]:
#	What is the count of responders per family income range (show top 10)?
question_1 = spark.sql("""
        SELECT income_range, count(*) cnt
        FROM census_tab c
        JOIN Family_income f
        ON c.Family_income_range = f.code
        group by income_range
        order by cnt desc
        limit 10;
""").show()

+------------------+----+
|      income_range| cnt|
+------------------+----+
|100,000 TO 149,999|4964|
|   150,000 OR MORE|4881|
|  75,000 TO 99,999|4073|
|  60,000 TO 74,999|3526|
|  50,000 TO 59,999|2671|
|  40,000 TO 49,999|2621|
|  35,000 TO 39,999|1800|
|  20,000 TO 24,999|1793|
|  30,000 TO 34,999|1735|
|  25,000 TO 29,999|1517|
+------------------+----+



In [14]:
#What is the count of responders per geographical division/location and race (show top 10)?
Q2 = spark.sql("""
        SELECT g.geo_div, r.races, count(*) count
        FROM census_tab c
        JOIN geo_division g ON c.Geo_division = g.code
        JOIN race r ON c.Race = r.code
        GROUP BY g.geo_div, r.races
        ORDER BY count DESC
        LIMIT 10;
""").show()

+------------------+----------+-----+
|           geo_div|     races|count|
+------------------+----------+-----+
|           PACIFIC|White Only| 9112|
|    SOUTH ATLANTIC|White Only| 8280|
|          MOUNTAIN|White Only| 3217|
|    SOUTH ATLANTIC|Black Only| 2860|
|           PACIFIC|Asian Only| 1932|
|WEST SOUTH CENTRAL|White Only| 1927|
|EAST SOUTH CENTRAL|White Only| 1834|
|       NEW ENGLAND|White Only| 1043|
|           PACIFIC|Black Only|  633|
|EAST SOUTH CENTRAL|Black Only|  583|
+------------------+----------+-----+



In [15]:
#How many responders do not have telephone in their house, but can access a telephone elsewhere and telephone interview is accepted?
q3 = spark.sql("""
        SELECT COUNT(*) 
        FROM census_tab c
        WHERE Telephone_in_house = "NO"
        AND Access_to_Tel = "YES"
        AND Phone_interview_acc = "YES";
""").show()

+--------+
|count(1)|
+--------+
|     154|
+--------+



In [16]:
#How many responders can access to a telephone, but telephone interview is not accepted?
q4 = spark.sql("""
        SELECT COUNT(*) 
        FROM census_tab c
        WHERE Access_to_Tel = "YES"
        AND Phone_interview_acc = "NO";
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+

