# Parsing Stats Can data

In [1]:
import pandas as pd
import numpy as np
import re
import os

In [2]:
pd.set_option('mode.chained_assignment', None)
pd.options.display.max_colwidth = 100

### Get list of files to parse

In [9]:
files_dir = 'dissemination_area'
files = [f'{files_dir}/' + f for f in os.listdir(files_dir) if f.startswith('98-401-X2016044')]

In [8]:
# regex to extract region from file name
file_re = re.compile("98-401-X2016044_(.+)_English_CSV_data\.csv")

In [9]:
files

['dissemination_area/98-401-X2016044_ONTARIO_English_CSV_data.csv',
 'dissemination_area/98-401-X2016044_QUEBEC_English_CSV_data.csv',
 'dissemination_area/98-401-X2016044_ATLANTIC_English_CSV_data.csv',
 'dissemination_area/98-401-X2016044_TERRITORIES_English_CSV_data.csv',
 'dissemination_area/98-401-X2016044_BRITISH_COLUMBIA_English_CSV_data.csv',
 'dissemination_area/98-401-X2016044_PRAIRIES_English_CSV_data.csv']

# functions for parsing files

##### Clean input CSV file

In [10]:
def read_and_clean_csv(file, geo_level=4):
    df = pd.read_csv(file,
                 dtype={
                     'GEO_CODE (POR)': 'int64',
                     'GEO_LEVEL': 'int64',
                     'DIM: Profile of Dissemination Areas (2247)': 'str',
                     'Member ID: Profile of Dissemination Areas (2247)': 'int64',
                     'Dim: Sex (3): Member ID: [1]: Total - Sex': 'str',
                     'Dim: Sex (3): Member ID: [2]: Male': 'str',
                     'Dim: Sex (3): Member ID: [3]: Female': 'str'
                 },
                 usecols=[
                    'GEO_CODE (POR)',
                    'GEO_LEVEL',
                    'DIM: Profile of Dissemination Areas (2247)',
                    'Member ID: Profile of Dissemination Areas (2247)',
                    'Dim: Sex (3): Member ID: [1]: Total - Sex',
                    'Dim: Sex (3): Member ID: [2]: Male',
                    'Dim: Sex (3): Member ID: [3]: Female'
                ])
    
    df = df.loc[df.GEO_LEVEL == geo_level, 
                ['GEO_CODE (POR)',
                 'DIM: Profile of Dissemination Areas (2247)',
                 'Member ID: Profile of Dissemination Areas (2247)',
                 'Dim: Sex (3): Member ID: [1]: Total - Sex',
                 'Dim: Sex (3): Member ID: [2]: Male',
                 'Dim: Sex (3): Member ID: [3]: Female'
                ]]
    
    # Rename columns for easier access
    df.columns = ['da', 'feature_desc', 'feature_num', 'total', 'male', 'female']

    #Set columns to numeric
    for c in ['total', 'male', 'female']:
        df[c] = pd.to_numeric(df[c], errors='coerce')

    return df

##### Extract relavant rows

In [11]:
def parse_features(g,
                   feature_nums, 
                   name, 
                   alt_feature_names=None, 
                   agg=False):
    df = g[g.feature_num.isin(feature_nums)]
    if alt_feature_names:
        df['feature_desc'] = alt_feature_names
    df = pd.melt(df, id_vars=['da', 'feature_desc', 'feature_num'], var_name='sex', value_name='count')
    df = df.dropna()
    df['feature'] = name
    
    if agg:
        df = df.assign(pct=df['count'] / df.groupby('sex')['count'].transform('sum'))
    return df

##### User Level features

In [12]:
def parse_user_level_features(g):
    
    parsed_dict = {
        # individual stats
        'age': parse_features(g, [10,11,12,14,15,16,17,18,19,20,21,22,23,25,26,27,28,30,31,32,33], 'age', agg=True),
        
        'martial_status': parse_features(g, [61, 62, 64, 65, 66, 67], 'martial status', alt_feature_names=['Married', 'Common law', 'Never married', 'Separated', 'Divorced', 'Widowed'], agg=True),

        'official_lang': parse_features(g, [101, 102, 103, 104], 'knowledge of official language', agg=True),

        'mother_tongue': parse_features(g, [115, 116, 117, 377, 378, 379, 380], 'mother tongue', agg=True),

        'home_language': parse_features(g, [384, 385, 386, 646, 647, 648, 649], 'language spoken at home', agg=True),

        'ttl_income': parse_features(g, [695, 696, 697, 698, 699, 700, 701, 702, 703, 704, 706, 707], name='total income', agg=True),

        'employment_income': parse_features(g, [728, 729, 730, 731, 732, 733, 734, 735, 736, 738, 739, 740], name='employment income', agg=True),

        'low_income': parse_features(g, [853, 855, 856], name='low income', agg=True),

        'citizenship': parse_features(g, [1136,1139], name='citizenship', agg=True),

        'immigrants': parse_features(g, [1141, 1142, 1150], name='immigrants', agg=True),

        'minority': parse_features(g, [i for i in np.arange(1325, 1338)], name='visible minority', agg=True),

        'education': parse_features(g, [1684, 1685, 1687, 1690, 1691, 1693, 1694, 1695, 1696, 1697], name='education', agg=True),

        'employment': parse_features(g, [1867, 1868, 1867, 1869], name='employment', agg=True),
        
        
        # household stats
        'dwelling': parse_features(g, [42, 43, 44, 50], name='dwelling', agg=True),

        'household_size': parse_features(g, [52, 53, 54, 55, 56], 'household size', agg=True),

        'household_income': parse_features(g, np.concatenate([np.arange(760, 775), np.arange(776, 780)]), name='household income', agg=True),

        'family': parse_features(g, [94, 95, 98], name='family', agg=True, alt_feature_names=['Family without children', 'Family with children', 'One-person household']),
        
        # Individual, but each individual can be in more than 1 category
        'ethnicorigin': parse_features(g, [1339, 1343, 1353, 1427, 1448, 1473, 1541, 1607], name='ethnic origin', agg=True)
    }

    final_df = pd.concat(list(parsed_dict.values()))
    return final_df[['da', 'feature', 'feature_num', 'feature_desc', 'sex', 'count', 'pct']].reset_index(drop=True)

#### Overall features

In [13]:
def parse_overall_features(g):
    
    parsed_dict = {
         'overall_stats': parse_features(g, [1, 6, 7, 58, 73], 'overall stats'),
         'age': parse_features(g, [39, 40], 'age', alt_feature_names= ['Average', 'Median']),
         'indiv_income': parse_features(g, [663, 671, 674, 682], name='individual income', 
               alt_feature_names=['Median total income (2015)', 'Median employment income (2015)', 'Average total income (2015)', 'Average employment income']),
         'household_income': parse_features(g, [742, 745, 748, 751, 754, 757], name='household income',
              alt_feature_names=['Median total income', 'Median total income of one-person households', 'Median total income of two-or-more households',
                                 'Average total income', 'Average total income of one-person households', 'Average total income of two-or-more households']),
        'dwelling_value': parse_features(g, [1676, 1677], name='dwelling value', alt_feature_names=['Median', 'Average']),
        'dwelling_rent': parse_features(g, [1681, 1682], name='dwelling rent', alt_feature_names=['Median', 'Average']),
        'employment_rate': parse_features(g, [1870, 1871, 1872], name='employment_rate')
    }

    final_df = pd.concat(list(parsed_dict.values()))
    return final_df[['da', 'feature', 'feature_num', 'feature_desc', 'sex', 'count']].reset_index(drop=True)

# Clean dataframes first before processing

In [19]:
# dissemination area
for file in files:
    print(f"processing {file}")
    region = file_re.findall(file)[0]
    df = read_and_clean_csv(file, geo_level=4)
    df.to_parquet(f"{files_dir}/raw_{region}.parquet", index=False)

processing dissemination_area/98-401-X2016044_ONTARIO_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_QUEBEC_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_ATLANTIC_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_TERRITORIES_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_BRITISH_COLUMBIA_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_PRAIRIES_English_CSV_data.csv


In [10]:
# province
for file in files:
    print(f"processing {file}")
    region = file_re.findall(file)[0]
    df = read_and_clean_csv(file, geo_level=1)
    df.to_parquet(f"{files_dir}/raw_prov_{region}.parquet", index=False)

processing dissemination_area/98-401-X2016044_ONTARIO_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_QUEBEC_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_ATLANTIC_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_TERRITORIES_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_BRITISH_COLUMBIA_English_CSV_data.csv
processing dissemination_area/98-401-X2016044_PRAIRIES_English_CSV_data.csv


# Use Dask for multicore processing

In [14]:
input_files = [f for f in os.listdir(files_dir) if f.startswith('raw') and not f.startswith('raw_prov')]
input_files

['raw_PRAIRIES.parquet',
 'raw_ATLANTIC.parquet',
 'raw_TERRITORIES.parquet',
 'raw_ONTARIO.parquet',
 'raw_QUEBEC.parquet',
 'raw_BRITISH_COLUMBIA.parquet']

In [16]:
from time import time
import dask
import dask.dataframe as dd
import shutil

In [17]:
with dask.config.set({'temporary_directory': '/home/ec2-user/SageMaker/tmp'}):
    for file in input_files:
        print(f'processing {file}...')
        
        START = time()
        
        print("- reading in parquet file")
        now = time()
        region = file[4:-8]
        df = pd.read_parquet(f"{files_dir}/{file}")
        ddf = dd.from_pandas(df, npartitions=64)
        print(f"Took {time() - now:.2f} seconds")


#         print("- processing individual stats")
#         now = time()
#         final_df = (ddf
#                 .groupby('da')
#                 .apply(parse_user_level_features, 
#                        meta={'da':'int64', 'feature': 'object', 'feature_num': 'int64', 'feature_desc': 'object', 'sex': 'object', 'count': 'float', 'pct': 'float'}))

#         final_df = final_df.compute(scheduler='processes')
#         final_df = final_df.reset_index(drop=True)
#         final_df = final_df.assign(region=region)
#         final_df.to_parquet(f"{files_dir}/{region}.parquet", index=False)
#         del final_df
#         print(f"Took {time() - now:.2f} seconds")
        
        print("- processing overall stats")
        now = time()
        final_df2 = (ddf
                .groupby('da')
                .apply(parse_overall_features, 
                       meta={'da':'int64', 'feature': 'object', 'feature_num': 'int64', 'feature_desc': 'object', 'sex': 'object', 'count': 'float'}))

        final_df2 = final_df2.compute(scheduler='processes')
        final_df2 = final_df2.reset_index(drop=True)
        final_df2 = final_df2.assign(region=region)
        final_df2.to_parquet(f"{files_dir}/{region}_overall.parquet", index=False)
        del final_df2
        print(f"Took {time() - now:.2f} seconds")
        
        del df
        print(f"Done! {file} took {time() - START:.2f} seconds")

        # Remove temp files generated by Dask to free up space
        shutil.rmtree('/home/ec2-user/SageMaker/tmp')
        os.makedirs('/home/ec2-user/SageMaker/tmp')

processing raw_PRAIRIES.parquet...
- reading in parquet file
Took 14.08 seconds
- processing overall stats
Took 377.27 seconds
Done! raw_PRAIRIES.parquet took 391.36 seconds
processing raw_ATLANTIC.parquet...
- reading in parquet file
Took 3.84 seconds
- processing overall stats
Took 161.76 seconds
Done! raw_ATLANTIC.parquet took 165.60 seconds
processing raw_TERRITORIES.parquet...
- reading in parquet file
Took 0.22 seconds
- processing overall stats
Took 10.22 seconds
Done! raw_TERRITORIES.parquet took 10.44 seconds
processing raw_ONTARIO.parquet...
- reading in parquet file
Took 13.99 seconds
- processing overall stats
Took 749.94 seconds
Done! raw_ONTARIO.parquet took 763.95 seconds
processing raw_QUEBEC.parquet...
- reading in parquet file
Took 10.83 seconds
- processing overall stats
Took 493.63 seconds
Done! raw_QUEBEC.parquet took 504.48 seconds
processing raw_BRITISH_COLUMBIA.parquet...
- reading in parquet file
Took 7.11 seconds
- processing overall stats
Took 278.29 seconds


# Import parsed files to HDFS (Spark)

In [5]:
import sys
sys.path.insert(1, '/home/ec2-user/SageMaker')
import ClouderaSpark

In [6]:
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import col
from pyspark.sql.types import *


In [7]:
spark = (SparkSession.builder
    .master("yarn")
    .appName("stats_can")
    .config("spark.driver.memory", "8g")
    .config("spark.rpc.message.maxSize", "1024")
    .config("spark.sql.execution.arrow.enabled", "true")
    .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")
hc = HiveContext(spark.sparkContext)

#### Upload individual stats

In [31]:
f = 'QUEBEC_overall.parquet'

In [39]:
f[-15:-8]

'overall'

In [19]:
parsed_files = [f for f in os.listdir(files_dir) if f.endswith('.parquet') and f[:3] != 'raw' and f[-15:-8] != 'overall' and f[:4] != 'prov']
parsed_files

['PRAIRIES.parquet',
 'BRITISH_COLUMBIA.parquet',
 'ONTARIO.parquet',
 'QUEBEC.parquet',
 'TERRITORIES.parquet',
 'ATLANTIC.parquet']

In [22]:
mySchema = StructType([StructField("da", IntegerType(), True),
                       StructField("feature", StringType(), True),
                       StructField("feature_num", IntegerType(), True),
                       StructField("feature_desc", StringType(), True),
                       StructField("sex", StringType(), True),
                       StructField("count", FloatType(), True),
                       StructField("pct", FloatType(), True),
                       StructField("region", StringType(), True)])

In [23]:
hc.sql('DROP TABLE IF EXISTS hiveaeprodanuser.statscan_demographics_prov')

for file in parsed_files:
    print(f"uploading {file}")
    df = pd.read_parquet(f"{files_dir}/{file}")
    s_df = spark.createDataFrame(df, schema=mySchema)
    s_df.write.mode('append').saveAsTable('hiveaeprodanuser.statscan_demographics_prov')

uploading prov_ONTARIO.parquet
uploading prov_ATLANTIC.parquet
uploading prov_TERRITORIES.parquet
uploading prov_PRAIRIES.parquet
uploading prov_QUEBEC.parquet
uploading prov_BRITISH_COLUMBIA.parquet


#### Upload overall

In [13]:
overall_parsed_files = [f for f in os.listdir(files_dir) if f.endswith('.parquet') and f[:3] != 'raw' and f[-15:-8] == 'overall' and f[:4] != 'prov']
overall_parsed_files

['QUEBEC_overall.parquet',
 'TERRITORIES_overall.parquet',
 'ONTARIO_overall.parquet',
 'BRITISH_COLUMBIA_overall.parquet',
 'ATLANTIC_overall.parquet',
 'PRAIRIES_overall.parquet']

In [14]:
mySchema = StructType([StructField("da", IntegerType(), True),
                       StructField("feature", StringType(), True),
                       StructField("feature_num", IntegerType(), True),
                       StructField("feature_desc", StringType(), True),
                       StructField("sex", StringType(), True),
                       StructField("count", FloatType(), True),
                       StructField("region", StringType(), True)])

In [22]:
hc.sql('DROP TABLE IF EXISTS hiveaeprodanuser.statscan_demographics_overall')


DataFrame[]

In [26]:

for file in overall_parsed_files:
    print(f"uploading {file}")
    df = pd.read_parquet(f"{files_dir}/{file}")
    s_df = spark.createDataFrame(df, schema=mySchema)
    s_df.write.mode('append').saveAsTable('hiveaeprodanuser.statscan_demographics_overall')

uploading QUEBEC_overall.parquet
uploading TERRITORIES_overall.parquet
uploading ONTARIO_overall.parquet
uploading BRITISH_COLUMBIA_overall.parquet
uploading ATLANTIC_overall.parquet
uploading PRAIRIES_overall.parquet


In [27]:
x = hc.table('hiveaeprodanuser.statscan_demographics_overall')

In [28]:
x.select('feature').distinct().show()

+-----------------+
|          feature|
+-----------------+
| household income|
|  employment_rate|
|   dwelling value|
|individual income|
|    overall stats|
|              age|
|    dwelling rent|
+-----------------+



### Test and QA

In [47]:
pccf = hc.table('hiveaeprodref.stats_can_pccf')
statscan = hc.table('hiveaeprodanuser.statscan_demographics')

In [48]:
statscan.groupby('region').count().orderBy('count').show()

+----------------+-------+
|          region|  count|
+----------------+-------+
|     TERRITORIES|  65315|
|        ATLANTIC|1570275|
|BRITISH_COLUMBIA|2595644|
|        PRAIRIES|3547586|
|          QUEBEC|4860946|
|         ONTARIO|7198246|
+----------------+-------+



In [49]:
# What dissemination areas are missing?
pccf_da = pccf.selectExpr('cast(dissemination_area_uid as int) as da').distinct()
sc_da = statscan.select('da').distinct()

In [50]:
print(f"There are {pccf_da.count()} unique DAs in the PCCF table")

There are 47429 unique DAs in the PCCF table


In [51]:
print(f"There are {sc_da.count()} unique DAs in the parsed statscan demographics table")

There are 55044 unique DAs in the parsed statscan demographics table


In [52]:
# Number of dissemination areas that aren't in the parsed Census data
missing_das = pccf_da.subtract(sc_da)
print(f"There are {missing_das.count()} DAs missing from the parsed demographics table that are in the PCCF table")

There are 147 DAs missing from the parsed demographics table that are in the PCCF table


In [53]:
affected_row_cnt = pccf.where(col('dissemination_area_uid').isin([x.da for x in missing_das.collect()])).count()
print(f"This affects {affected_row_cnt} rows in the PCCF table.")

This affects 2158 rows in the PCCF table.


In [54]:
pccf.count()

862778

In [55]:
statscan.groupby('region').count().orderBy('count').show()

+----------------+-------+
|          region|  count|
+----------------+-------+
|     TERRITORIES|  65315|
|        ATLANTIC|1570275|
|BRITISH_COLUMBIA|2595644|
|        PRAIRIES|3547586|
|          QUEBEC|4860946|
|         ONTARIO|7198246|
+----------------+-------+



In [19]:
import pandas as pd

In [20]:
d = pd.read_parquet(f"{files_dir}/QUEBEC_overall.parquet")

In [21]:
d.feature.unique()

array(['overall stats', 'age', 'individual income', 'household income',
       'dwelling value', 'dwelling rent', 'employment_rate'], dtype=object)