In [None]:
import pyspark
import datetime
from datetime import timedelta,date
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
import pyspark
import psycopg2
import sys
from delta.tables import *
import pandas as pd
import urllib 
import io
import re
from zipfile import ZipFile
import google.cloud
from google.cloud import bigquery
from google.oauth2 import service_account

In [None]:
us_states = ['Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California', 'Colorado', 'Connecticut', 'Delaware', 'District_of_Columbia', 'Florida', 'Georgia', 'Hawaii', 'Idaho', 'Illinois', 'Indiana', 'Iowa', 'Kansas', 'Kentucky', 'Louisiana', 'Maine', 'Maryland', 'Massachusetts', 'Michigan', 'Minnesota', 'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada', 'New_Hampshire', 'New_Jersey', 'New_Mexico', 'New_York', 'North_Carolina', 'North_Dakota', 'Ohio', 'Oklahoma', 'Oregon', 'Pennsylvania', 'Puerto_Rico', 'Rhode_Island', 'South_Carolina', 'South_Dakota', 'Tennessee', 'Texas', 'Utah', 'Vermont', 'Virginia', 'Washington', 'West_Virginia', 'Wisconsin', 'Wyoming']
us_states_abb = ['al', 'ak', 'az', 'ar', 'ca', 'co', 'ct', 'de', 'dc', 'fl', 'ga', 'hi', 'id', 'il', 'in', 'ia', 'ks', 'ky', 'la', 'me', 'md', 'ma', 'mi', 'mn', 'ms', 'mo', 'mt', 'ne', 'nv', 'nh', 'nj', 'nm', 'ny', 'nc', 'nd', 'oh', 'ok', 'or', 'pa', 'pr', 'ri', 'sc', 'sd', 'tn', 'tx', 'ut', 'vt', 'va', 'wa', 'wv', 'wi', 'wy']
full_states_tuple = list(zip(us_states, us_states_abb))

zip_file = '2020.pl.zip'

ftp_index_url = 'https://www2.census.gov/programs-surveys/decennial/2020/data/01-Redistricting_File--PL_94-171/'

dbutils.fs.mkdirs("/dbfs/census20_tmp/")

In [None]:
# everything has to be read as a string - too many columns to specify schema individually
# only return pl_1 and geo because OOM otherwise
def extract_zip(input_zips_list, us_segment, section = '12020.pl'):
    pl = []
    for location in input_zips_list:
        tmp_location = ZipFile(location)
        for name in tmp_location.namelist():
            if section in name:
                pl_file = tmp_location.read(name)
                pl_df = pd.read_table(io.StringIO(pl_file.decode('ISO-8859-1')), 
                                      delimiter = "|", 
                                      header = None, 
                                      dtype = str)
                print(name)
                pl.append(pl_df)
        pl_df = pd.concat(pl, ignore_index = True)
        pl_df.to_csv("/dbfs/census20_tmp/" + us_segment + "_" + section[0:5] + ".csv")
        return

def dataset_check_or_create(dataset_name):
    key_path = "/dbfs/FileStore/tables/fidap_301014_de8cdbcaf7a0.json"

    credentials = service_account.Credentials.from_service_account_file(
        key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
  )

    client = bigquery.Client(credentials=credentials, project=credentials.project_id)
    
    dataset_id = credentials.project_id + "." + dataset_name
    try:
        client.get_dataset(dataset_id)  # Make an API request.
        print("Dataset {} already exists".format(dataset_id))
        return True
    except Exception:
        print("Dataset {} is not found".format(dataset_id))
        dataset = bigquery.Dataset(dataset_id)
        # TODO(developer): Specify the geographic location where the dataset should reside.
        dataset.location = "US"
        dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
        print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
        return False

def write_to_bq_new(df , dataset_name, table_name, mode = "overwrite"):
    print(f"Writing {table_name} with {mode} - mode into BigQuery ")
    dataset_check_or_create(dataset_name)
    
    df.write \
    .format("bigquery")\
    .option("credentialsFile", "/dbfs/FileStore/tables/fidap_301014_de8cdbcaf7a0.json")\
    .option("temporaryGcsBucket", "databricks-fidap")\
    .option("parentProject", 'fidap-301014')\
    .option("dataset", dataset_name)\
    .option("table", table_name)\
    .mode(mode).save(table_name)

    print(f"Successfully saved {table_name} with {mode} - mode into BigQuery ")
    
def add_dataset_from_csv(csv, dataset_schema_name, table_name, is_csv_pyspark_df = False):
    """
    reads a data from csv and creates a dataset in bq
    required params: csv
    """

    if is_csv_pyspark_df == False:
        pyspark_df= spark.createDataFrame(csv)
    else:
        pyspark_df = csv
        
    #write to big query
    write_to_bq_new(pyspark_df, dataset_schema_name, table_name, mode = "overwrite")
    # dbutils.fs.rm("dbfs:/tmp/{}".format(file_name))
  
    return display(pyspark_df)

In [None]:
tmp_location = []
for states_abb in us_states_abb:
    zip_location = "/dbfs/census20_tmp/{}".format(states_abb + "2020.pl.zip")
    tmp_location.append(zip_location)

# extract the files into dfs  
extract_zip(tmp_location, us_segment = "us_states_abb", section = '12020.pl')

In [None]:
sqlContext = SQLContext(sc)
all_pl_1_filepath = '/census20_tmp/us_states_abb_12020.csv'
pl_1_df = spark.read.format("csv") \
   .option("header", "true") \
   .option("sep",",") \
   .load(all_pl_1_filepath)
pl_1_df = pl_1_df.drop('_c0')
pl_1_df.count()

all_pl_2_filepath = '/census20_tmp/*22020.csv'
pl_2_df = spark.read.format("csv") \
   .option("header", "true") \
   .option("sep",",") \
   .load(all_pl_2_filepath)
pl_2_df = pl_2_df.drop('_c0')

all_pl_3_filepath = '/census20_tmp/*32020.csv'
pl_3_df = spark.read.format("csv") \
   .option("header", "true") \
   .option("sep",",") \
   .load(all_pl_3_filepath)
pl_3_df = pl_3_df.drop('_c0')

geo_filepath = '/census20_tmp/us_*_geo.csv'
geo_df = spark.read.format("csv") \
   .option("header", "true") \
   .option("sep",",") \
   .load(geo_filepath)
geo_df = geo_df.drop('_c0')

In [None]:
# column names
geo_cols = ["FILEID", "STUSAB", "SUMLEV", "GEOVAR", "GEOCOMP", "CHARITER", "CIFSN", "LOGRECNO", "GEOID", 
  "GEOCODE", "REGION", "DIVISION", "STATE", "STATENS", "COUNTY", "COUNTYCC", "COUNTYNS", "COUSUB",
  "COUSUBCC", "COUSUBNS", "SUBMCD", "SUBMCDCC", "SUBMCDNS", "ESTATE", "ESTATECC", "ESTATENS", 
  "CONCIT", "CONCITCC", "CONCITNS", "PLACE", "PLACECC", "PLACENS", "TRACT", "BLKGRP", "BLOCK", 
  "AIANHH", "AIHHTLI", "AIANHHFP", "AIANHHCC", "AIANHHNS", "AITS", "AITSFP", "AITSCC", "AITSNS",
  "TTRACT", "TBLKGRP", "ANRC", "ANRCCC", "ANRCNS", "CBSA", "MEMI", "CSA", "METDIV", "NECTA",
  "NMEMI", "CNECTA", "NECTADIV", "CBSAPCI", "NECTAPCI", "UA", "UATYPE", "UR", "CD116", "CD118",
  "CD119", "CD120", "CD121", "SLDU18", "SLDU22", "SLDU24", "SLDU26", "SLDU28", "SLDL18", "SLDL22",
  "SLDL24", "SLDL26", "SLDL28", "VTD", "VTDI", "ZCTA", "SDELM", "SDSEC", "SDUNI", "PUMA", "AREALAND",
  "AREAWATR", "BASENAME", "NAME", "FUNCSTAT", "GCUNI", "POP100", "HU100", "INTPTLAT", "INTPTLON", 
  "LSADC", "PARTFLAG", "UGA"]

first_cols = ["FILEID", "STUSAB", "CHARITER", "CIFSN", "LOGRECNO"]
suffix_pl1_1 = ['P00' + str(i) for i in range(10001, 10072)]
suffix_pl1_2 = ['P00' + str(i) for i in range(20001, 20074)]
suffix_pl2_1 = ['P00' + str(i) for i in range(30001, 30072)]
suffix_pl2_2 = ['P00' + str(i) for i in range(40001, 40074)]
suffix_pl2_3 = ['H00' + str(i) for i in range(10001, 10004)]
suffix_pl3_1 = ['P00' + str(i) for i in range(50001, 50011)]


pl_1_cols = first_cols + suffix_pl1_1 + suffix_pl1_2
pl_2_cols = first_cols + suffix_pl2_1 + suffix_pl2_2 + suffix_pl2_3
pl_3_cols = first_cols + suffix_pl3_1

# rename the columns
pl_1_df = pl_1_df.toDF(*pl_1_cols)
pl_2_df = pl_2_df.toDF(*pl_2_cols)
pl_3_df = pl_3_df.toDF(*pl_3_cols)
geo_df = geo_df.toDF(*geo_cols)

In [None]:
headers = []
headers_census_bureau_pl_1 = pd.read_excel("https://www2.census.gov/programs-surveys/decennial/rdo/about/2020-census-program/Phase3/SupportMaterials/2020_PLSummaryFile_FieldNames.xlsx", sheet_name = "2020 P.L. Segment 1 Definitions", usecols = 'A,B')
headers_census_bureau_pl_1.columns = ['' + cols for cols in ['FIELD', 'DATA_DICTIONARY_REFERENCE']]
headers.append(headers_census_bureau_pl_1)
headers_census_bureau_pl_2 = pd.read_excel("https://www2.census.gov/programs-surveys/decennial/rdo/about/2020-census-program/Phase3/SupportMaterials/2020_PLSummaryFile_FieldNames.xlsx", sheet_name = "2020 P.L. Segment 2 Definitions", usecols = 'A,B')
headers_census_bureau_pl_2.columns = ['' + cols for cols in ['FIELD', 'DATA_DICTIONARY_REFERENCE']]
headers.append(headers_census_bureau_pl_2)
headers_census_bureau_pl_3 = pd.read_excel("https://www2.census.gov/programs-surveys/decennial/rdo/about/2020-census-program/Phase3/SupportMaterials/2020_PLSummaryFile_FieldNames.xlsx", sheet_name = "2020 P.L. Segment 3 Definitions", usecols = 'A,B')
headers_census_bureau_pl_3.columns = ['' + cols for cols in ['FIELD', 'DATA_DICTIONARY_REFERENCE']]
headers.append(headers_census_bureau_pl_3)
headers_census_bureau_geo = pd.read_excel("https://www2.census.gov/programs-surveys/decennial/rdo/about/2020-census-program/Phase3/SupportMaterials/2020_PLSummaryFile_FieldNames.xlsx", sheet_name = "2020 P.L. Geoheader Definitions", usecols = 'A,B')
headers_census_bureau_geo.columns = ['' + cols for cols in ['FIELD', 'DATA_DICTIONARY_REFERENCE']]
headers.append(headers_census_bureau_geo)

all_headers = pd.concat(headers, ignore_index = True)
all_headers = all_headers.dropna()
all_headers = all_headers.drop_duplicates()


p = re.compile(r'[^\w\s]+')
all_headers['FIELD'] = [p.sub('', x) for x in all_headers['FIELD'].tolist()]
all_headers['DATA_DICTIONARY_REFERENCE'] = [p.sub('', x) for x in all_headers['DATA_DICTIONARY_REFERENCE'].tolist()]

In [None]:
# remove certain columns before merging
geo_df = geo_df.drop('CIFSN')
pl_1_df = pl_1_df.drop('CIFSN')
pl_2_df = pl_2_df.drop('CIFSN')
pl_3_df = pl_3_df.drop('CIFSN')

# writing to dbfs first
#geo_df.write.csv('/census20_tmp/census2020_geo.csv')
#pl_1_df.write.csv('/census20_tmp/census2020_pl_1.csv')
#pl_2_df.write.csv('/census20_tmp/census2020_pl_2.csv')
#pl_3_df.write.csv('/census20_tmp/census2020_pl_3.csv')

# merge dfs together
merged_df = geo_df.join(pl_1_df, on = ["STUSAB", "CHARITER", "FILEID", "LOGRECNO"], how = 'inner')
merged_df_all = merged_df.join(pl_2_df, on = ["STUSAB", "CHARITER", "FILEID", "LOGRECNO"], how = 'inner')
merged_df_all = merged_df.join(pl_3_df, on = ["STUSAB", "CHARITER", "FILEID", "LOGRECNO"], how = 'inner')

# filter for geovar
merged_df_all = merged_df_all.filter(merged_df_all.GEOVAR == '00')

# count number of rows
merged_df_all.count()

# filter census
census_tract_df = merged_df.filter(size_(col("GEOID")) == 20)
census_tract_df = census_tract_df.filter("TRACT IS NOT NULL AND BLOCK IS NULL AND BLKGRP IS NULL")
columns = ['GEOID', 'STATE', 'AREALAND', 'HU100', 'P0010001', 'P0010003',  'P0010004', 'P0010005', 'P0010006',  'P0010007', 'P0010008', 'P0010009',  'P0020002', 'P0020003']
census_tract_df = census_tract_df.select(*columns)