In [1]:
from io import StringIO
import pandas
import glob

Download NCHS linked data (and discard non-NHIS linked data)

In [None]:
!wget --recursive ftp://ftp.cdc.gov/pub/Health_Statistics/NCHS/datalinkage/linked_mortality
!mkdir NCHS_linked_data
!mv ./ftp.cdc.gov/pub/Health_Statistics/NCHS/datalinkage/linked_mortality/NHIS_*.dat ./NCHS_linked_data/
!rm -r ./ftp.cdc.gov

Read in all the NCHS linked mortality datafiles

In [2]:
nchs_file_path_pattern = './NCHS_linked_data/NHIS_*.dat'
nchs_file_paths = glob.glob('./NCHS_linked_data/NHIS_*.dat')

In [3]:
# Sanity check
assert len(nchs_file_paths) > 0, 'No NCHS files available!'

In [4]:
nchs_column_widths = [14,1,1,3,1,1,1,4,8,8]
nchs_column_names = ["PUBLICID", "ELIGSTAT", "MORTSTAT", "UCOD_LEADING", "DIABETES", "HYPERTEN", "DODQTR", "DODYEAR", "WGT_NEW", "SA_WGT_NEW"]

nchs_dataframes = [
    pandas.read_fwf(
        file_path,
        widths=nchs_column_widths,
        names=nchs_column_names,
        dtype=False, #{"PUBLICID": "object"}
        na_values=['.']
    )
    for file_path
    in nchs_file_paths
]

In [5]:
# Sanity check
total_row_count = sum([dataframe.shape[0] for dataframe in nchs_dataframes])
total_line_count_string = !wc -l {nchs_file_path_pattern} | grep total | cut -f2 -d' '
total_line_count = int(total_line_count_string[0])

failure_message = 'Expected count of {} rows loaded to equal {} lines in linked data files'.format(total_row_count, total_line_count)
assert total_row_count == total_line_count, failure_message

In [6]:
# Debug info
for dataframe in nchs_dataframes:
   print("{:8d}".format(dataframe.shape[0]))

print("=" * 8)
print("{:8d}".format(total_row_count))

  120032
   89976
  116179
   62052
   75764
   93386
  116929
   74236
  101875
  119631
  102467
  112053
   88446
  122310
   75716
  122859
   92148
  100618
   98649
   97059
   63402
  108131
  109671
  100760
   94460
  103477
  128412
  104520
   98785
 2894003


Combine NCHS dataframes into one big dataframe

In [7]:
nchs_data = pandas.concat(nchs_dataframes)

In [8]:
# Sanity check
separate_row_count = sum([dataframe.shape[0] for dataframe in nchs_dataframes])
combined_row_count = nchs_data.shape[0]

failure_message = 'Expected count of {} rows in separate dataframes to equal {} combined rows'.format(separate_row_count, combined_row_count)
assert separate_row_count == combined_row_count, failure_message

Read in the NHIS data extract

In [9]:
# Set the path of your NHIS data extract (csv)
nhis_file_path = '../NHIS/nhis_test.csv.gz'

In [10]:
nhis_chunks = pandas.read_csv(
    nhis_file_path,
    compression='gzip',
    chunksize=250000
)

Join the NCHS linked data with the raw NHIS data one chunk at a time, discarding unlinked data. Write merged dataframes to files.

In [11]:
total_rows_processed = 0
total_rows_merged = 0

for chunk_index, chunk in enumerate(nhis_chunks):
    print('Merged {} of {} rows. Processing CHUNK {}...'.format(total_rows_merged, total_rows_processed, chunk_index))

    merged_dataframe = pandas.merge(
        chunk,
        nchs_data,
        left_on='NHISPID',
        right_on='PUBLICID',
        how='inner'#, suffixes=('_ldf', '_rdf')
    )

    # No need to zip these as they will be discarded later
    merged_dataframe.to_csv(
        '/tmp/NCHS_NHIS_linked_{}.csv'.format(chunk_index),
        index=None,
        header=True
    )
    
    total_rows_merged += len(merged_dataframe)
    total_rows_processed += len(chunk)

print('=' * 8)
print('Merging complete. Merged {} of {} rows.'.format(total_rows_merged, total_rows_processed))

Merged 0 of 0 rows. Processing CHUNK 0...


  interactivity=interactivity, compiler=compiler, result=result)


Merged 0 of 250000 rows. Processing CHUNK 1...


  interactivity=interactivity, compiler=compiler, result=result)


Merged 0 of 500000 rows. Processing CHUNK 2...


  interactivity=interactivity, compiler=compiler, result=result)


Merged 0 of 750000 rows. Processing CHUNK 3...
Merged 0 of 1000000 rows. Processing CHUNK 4...


  interactivity=interactivity, compiler=compiler, result=result)


Merged 0 of 1250000 rows. Processing CHUNK 5...
Merged 0 of 1500000 rows. Processing CHUNK 6...
Merged 0 of 1750000 rows. Processing CHUNK 7...
Merged 0 of 2000000 rows. Processing CHUNK 8...
Merged 0 of 2250000 rows. Processing CHUNK 9...
Merged 0 of 2500000 rows. Processing CHUNK 10...
Merged 0 of 2750000 rows. Processing CHUNK 11...
Merged 0 of 3000000 rows. Processing CHUNK 12...
Merged 0 of 3250000 rows. Processing CHUNK 13...
Merged 0 of 3500000 rows. Processing CHUNK 14...
Merged 0 of 3750000 rows. Processing CHUNK 15...
Merged 71826 of 4000000 rows. Processing CHUNK 16...
Merged 321826 of 4250000 rows. Processing CHUNK 17...
Merged 571826 of 4500000 rows. Processing CHUNK 18...
Merged 821826 of 4750000 rows. Processing CHUNK 19...
Merged 1071826 of 5000000 rows. Processing CHUNK 20...
Merged 1321826 of 5250000 rows. Processing CHUNK 21...
Merged 1571826 of 5500000 rows. Processing CHUNK 22...
Merged 1710059 of 5750000 rows. Processing CHUNK 23...
Merging complete. Merged 171005

In [12]:
# Sanity check
nhis_file_line_count_strings = !gzcat {nhis_file_path} | wc -l
nhis_file_line_count = int(nhis_file_line_count_strings[0])

failure_message = 'Only processed {} rows out of {} lines of {}'.format(total_rows_processed, nhis_file_line_count, nhis_file_path)
assert total_rows_processed == (nhis_file_line_count - 1), failure_message

Convert merged files into one big zipped csv and discard temp csv files.

In [13]:
merged_data_file_paths_pattern = '/tmp/NCHS_NHIS_linked_*.csv'
merged_data_file_paths = glob.glob(merged_data_file_paths_pattern)

In [14]:
# Sanity check
assert len(merged_data_file_paths) > 0, 'No merged files available!'

In [15]:
merged_dataframes = [
    pandas.read_csv(file_path)
    for file_path
    in merged_data_file_paths
]

In [16]:
# Sanity check
total_line_count_string = !wc -l {merged_data_file_paths_pattern} | grep total | cut -f2 -d' '
total_header_count_string = !ls -1q {merged_data_file_paths_pattern} | wc -l
total_line_count = int(total_line_count_string[0]) - int(total_header_count_string[0])
total_separate_row_count = sum([dataframe.shape[0] for dataframe in merged_dataframes])

failure_message = 'Expected count of {} rows loaded to equal {} lines in merged data files'.format(total_separate_row_count, total_line_count)
assert total_line_count == total_separate_row_count, failure_message

In [17]:
combined_merged_dataframe = pandas.concat(merged_dataframes)

In [18]:
# Sanity check
total_separate_row_count = sum([dataframe.shape[0] for dataframe in merged_dataframes])
total_combined_row_count = combined_merged_dataframe.shape[0]

failure_message = 'Expected count of {} rows in separate dataframes to equal {} combined rows'.format(total_separate_row_count, total_combined_row_count)
assert total_separate_row_count == total_combined_row_count, failure_message

In [20]:
merged_file_path = '/tmp/NCHS_NHIS_linked_data.csv.gz'
combined_merged_dataframe.to_csv(
    merged_file_path,
    index=None,
    header=True,
    compression='gzip'
)

In [21]:
# Sanity check
total_combined_row_count = combined_merged_dataframe.shape[0]
merged_file_line_count_strings = !gzcat {merged_file_path} | wc -l
merged_file_line_count = int(merged_file_line_count_strings[0])

failure_message = 'Expected count of {} rows loaded to equal {} lines in {}'.format(merged_file_line_count, total_combined_row_count, merged_file_path)
assert total_combined_row_count == (merged_file_line_count - 1), failure_message

In [None]:
# Cleanup
!rm /tmp/NCHS_NHIS_linked_[0-9]*.csv

In [None]:
print('Merging of NCHS and NHIS data completed. Output file: {}'.format(merged_file_path))