# To generate smaller dataset from a larger one
The source dataset contains about 170 million records representing household units and persons 
from the 1940 US Census. The data can be obtained from IPUMS since they are now made public 
(70 years after the Census was taken). This notebook builds smaller dataset for selected states. 
The dataset is fixed-width formatted.

The first character is either "H" for Househild Unit or "P" for Person
THe household record is followed by the person records belonging to the household
Househould record and person record has different fields but are linked by a serial field.
The serial field has 8 digits starting from 7th position.
The household record also has a field for state FIPS code. The FIPS code is 2 digits
and start from position 54. Wyoming's FIPS code is "56".

We will use pyspark dataframe and its join feature (semi left join)

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, count
import operator

In [2]:
IPUMS_FILE = '/data/input/EXT1940USCB.dat'

In [3]:
spark = SparkSession.builder \
    .appName("Jay")          \
    .master("spark://das11.mitre.org:7077")    \
    .config("spark.executor.memory", "24g")    \
    .config("spark.executor.instances", "3")    \
    .config("spark.executor.cores", "6")    \
    .getOrCreate()
spark

In [4]:
df = spark.read.text(IPUMS_FILE)
df.take(5)

[Row(value='H19400200000001022600000001000002700000000001198642810230001100000009999000000100000990000840199990011100000000200002000011000100107000000001000200000000100020'),
 Row(value='P19400200000001000100000001000000000000100101000000000000000009999010101207399500000110000000530530000000000000000000000000010114140050501102071275201198361125260009999999999990000000907302210077016402700009990000999999979999990000000F92A1965-DD8D-4DA5-9EDA-4DEF7D1F338C010'),
 Row(value='P1940020000000100020000000100000000000010020100000000000000000999912120310699960000011000000400400000000000000000000040000001011212003030110209889700001105220000000999999999999099999812008175039700300251000999000099999997999999000000069FA0B5D-4C58-4AD5-BDAC-0CA2EE215777020'),
 Row(value='H19400200000002047600000001000004300000000001198642810230001100000009999000000100000990000840199990011100000000500003001022200100107000000002000400000000200040'),
 Row(value='P19400200000002000100000001000000000000100102000020100000000

In [5]:
total = df.count()
total

170897729

In [26]:
# Wyoming FIPS Code is 56


wy_unit_df = df.filter((df.value.substr(1,1) == "H") & (df.value.substr(54,2) == "56"))
wy_unit_df.take(5)

[Row(value='H19400238412818036000000001000003100000000001198641685600101100000009999000000100000500000840199990012200020999999901011122100100102000000001000300000000100030'),
 Row(value='H19400238412819022800000001000002100000000011198641685600101100000009999000000100000500000840199990012200015999999901010011000100101000000002000200000000200020'),
 Row(value='H19400238412820018800000001000001600000000001198641685600101100000009999000000100000500000840199990012200010999999901000011000100101000000003000100000000300010'),
 Row(value='H19400238412821049600000001000004100000000001198641685600101100000009999000000100000500000840199990012200030999999901011122100100101000000004000400000000400040'),
 Row(value='H19400238412822024900000001000002100000000001198641685600101100000009999000000100000500000840199990012200030999999901010011000100101000000005000200000000500020')]

In [28]:
wy_unit_df2 = wy_unit_df.select(trim(wy_unit_df.value.substr(8, 8)).alias('SERIAL'), trim(wy_unit_df.value).alias("RECORD"))
wy_unit_df2.take(5)

[Row(SERIAL='38412818', RECORD='H19400238412818036000000001000003100000000001198641685600101100000009999000000100000500000840199990012200020999999901011122100100102000000001000300000000100030'),
 Row(SERIAL='38412819', RECORD='H19400238412819022800000001000002100000000011198641685600101100000009999000000100000500000840199990012200015999999901010011000100101000000002000200000000200020'),
 Row(SERIAL='38412820', RECORD='H19400238412820018800000001000001600000000001198641685600101100000009999000000100000500000840199990012200010999999901000011000100101000000003000100000000300010'),
 Row(SERIAL='38412821', RECORD='H19400238412821049600000001000004100000000001198641685600101100000009999000000100000500000840199990012200030999999901011122100100101000000004000400000000400040'),
 Row(SERIAL='38412822', RECORD='H19400238412822024900000001000002100000000001198641685600101100000009999000000100000500000840199990012200030999999901010011000100101000000005000200000000500020')]

In [29]:
df2 = df.select(trim(df.value.substr(8, 8)).alias('SERIAL'), trim(df.value).alias("RECORD"))
df2.take(2)

[Row(SERIAL='00000001', RECORD='H19400200000001022600000001000002700000000001198642810230001100000009999000000100000990000840199990011100000000200002000011000100107000000001000200000000100020'),
 Row(SERIAL='00000001', RECORD='P19400200000001000100000001000000000000100101000000000000000009999010101207399500000110000000530530000000000000000000000000010114140050501102071275201198361125260009999999999990000000907302210077016402700009990000999999979999990000000F92A1965-DD8D-4DA5-9EDA-4DEF7D1F338C010')]

In [30]:
df3 = df2.join(wy_unit_df2, on=["SERIAL"], how="left_semi")  
df3.take(20)

[Row(SERIAL='38412929', RECORD='H19400238412929015000000001000001600000001001198641685600102100000009999001060600106500001840199990011100000000400001000011000600101000000112000100000011200010'),
 Row(SERIAL='38412929', RECORD='P1940023841292900010000000100000000000012010100000000000000000999901010120899950000011000000023023000000000000000000000000001011515006060331109959990995000000000000999999999999000000020000000999999999999110990999919999500219999000000083E70BBF-17E1-4A00-BF43-F43FBC76D736010'),
 Row(SERIAL='38413026', RECORD='H19400238413026027400000001000002100000002001198641685600102100000009999001060600106500001840199990012200035999999901010011000600102000000209000200000020900020'),
 Row(SERIAL='38413026', RECORD='P19400238413026000100000001000000000000120102000000000000210009999010101107299100000110000004144140000000000000000000200000010101010000021102074077001278962225264059999999999990001200219091610315004301461109909999199995002199990000000D685F0E1-89E2-4D60-955B-90B3AF5B55B

In [None]:
df3.select("RECORD").write.save("/data/input/wyoming.dat", format="text")