#### Objective:- Randomly assign race to individuals using computed race migration counts

In [3]:
import cudf, cupy as cp
import os,random
import pandas as pd, numpy as np

#### Race assignment

In [2]:
races = cudf.read_csv('data/mapped_data_with_race.csv',usecols=['ID20','P_delta','R1_diff','R2_diff','R3_diff','R4_diff','R5_diff','R6_diff','R7_diff'])
races.head()

Unnamed: 0,ID20,P_delta,R1_diff,R2_diff,R3_diff,R4_diff,R5_diff,R6_diff,R7_diff
0,10010201001000,-10,-16,2,0,0,0,0,4
1,10010201001001,4,-10,9,0,2,0,1,2
2,10010201001002,-23,-21,-2,0,0,0,0,0
3,10010201001003,4,5,-2,0,0,0,0,1
4,10010201001005,-8,-7,0,0,0,0,0,-1


In [3]:
races = races.to_pandas()

In [4]:
races['R1_diff'] =races['R1_diff'].apply(lambda x : [1]*abs(x) if x>0 else ([-1]*abs(x) if x<0 else []) )
races['R2_diff'] =races['R2_diff'].apply(lambda x : [2]*abs(x) if x>0 else ([-2]*abs(x) if x<0 else []) )
races['R3_diff'] =races['R3_diff'].apply(lambda x : [3]*abs(x) if x>0 else ([-3]*abs(x) if x<0 else []) )
races['R4_diff'] =races['R4_diff'].apply(lambda x : [4]*abs(x) if x>0 else ([-4]*abs(x) if x<0 else []) )
races['R5_diff'] =races['R5_diff'].apply(lambda x : [5]*abs(x) if x>0 else ([-5]*abs(x) if x<0 else []) )
races['R6_diff'] =races['R6_diff'].apply(lambda x : [6]*abs(x) if x>0 else ([-6]*abs(x) if x<0 else []) )
races['R7_diff'] =races['R7_diff'].apply(lambda x : [7]*abs(x) if x>0 else ([-7]*abs(x) if x<0 else []) )
races['pop'] = races['R1_diff']+races['R2_diff']+races['R3_diff']+races['R4_diff']+races['R5_diff']+races['R6_diff']+races['R7_diff']
races['pop'] = races['pop'].apply(lambda x: random.sample(x,len(x))) # shuffle races
races.head()

Unnamed: 0,ID20,P_delta,R1_diff,R2_diff,R3_diff,R4_diff,R5_diff,R6_diff,R7_diff,pop
0,10010201001000,-10,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -...","[2, 2]",[],[],[],[],"[7, 7, 7, 7]","[-1, -1, -1, -1, -1, -1, -1, 7, 7, 2, -1, -1, ..."
1,10010201001001,4,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1]","[2, 2, 2, 2, 2, 2, 2, 2, 2]",[],"[4, 4]",[],[6],"[7, 7]","[-1, -1, -1, 7, -1, -1, 6, 7, 2, -1, -1, 2, 2,..."
2,10010201001002,-23,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -...","[-2, -2]",[],[],[],[],[],"[-1, -1, -2, -1, -1, -1, -1, -1, -1, -1, -1, -..."
3,10010201001003,4,"[1, 1, 1, 1, 1]","[-2, -2]",[],[],[],[],[7],"[1, 1, 7, 1, 1, -2, 1, -2]"
4,10010201001005,-8,"[-1, -1, -1, -1, -1, -1, -1]",[],[],[],[],[],[-7],"[-1, -1, -1, -7, -1, -1, -1, -1]"


In [5]:
gpu_races = cudf.from_pandas(races[['ID20','pop']])
del(races)

In [6]:
gpu_races = gpu_races.explode('pop').reset_index(drop=True)
gpu_races.head()

Unnamed: 0,ID20,pop
0,10010201001000,-1
1,10010201001000,-1
2,10010201001000,-1
3,10010201001000,-1
4,10010201001000,-1


In [7]:
# gpu_races.to_pandas().to_csv('data/full_races_assigned.csv')

#### Concat races

In [2]:
races = pd.read_csv('data/full_races_assigned.csv').drop('Unnamed: 0',axis=1)
races = cudf.from_pandas(races)
races.head()

Unnamed: 0,ID20,pop
0,10010201001000,-1
1,10010201001000,-1
2,10010201001000,-1
3,10010201001000,-1
4,10010201001000,-1


In [3]:
population = pd.read_csv('data/final_data_with_race.csv').drop('Unnamed: 0',axis=1)
population = cudf.from_pandas(population)
population.head()

Unnamed: 0,ID20,x,y
0,10010201001000,-86.48059,32.469173
1,10010201001000,-86.47814,32.470337
2,10010201001000,-86.478485,32.47149
3,10010201001000,-86.479645,32.469475
4,10010201001000,-86.47991,32.47194


In [4]:
population_with_race = cudf.concat([population,races['pop']],axis=1)
population_with_race.head()

Unnamed: 0,ID20,x,y,pop
0,10010201001000,-86.48059,32.469173,-1
1,10010201001000,-86.47814,32.470337,-1
2,10010201001000,-86.478485,32.47149,-1
3,10010201001000,-86.479645,32.469475,-1
4,10010201001000,-86.47991,32.47194,-1


In [5]:
temp = population_with_race.to_pandas()

In [7]:
temp.to_csv('data/population_race_concatenated.csv')

#### Prepare final dataset

In [4]:
pop = dask_cudf.read_csv('data/population_race_concatenated.csv',usecols=['ID20','x','y','pop'],dtype={'ID20':'int64','x':'float32','y':'float32','pop':'int32'})
# pop = cudf.from_pandas(pop)
pop.head()

Unnamed: 0,ID20,x,y,pop
0,10010201001000,-86.480598,32.469173,-1
1,10010201001000,-86.478142,32.470341,-1
2,10010201001000,-86.478485,32.471493,-1
3,10010201001000,-86.479652,32.469475,-1
4,10010201001000,-86.479912,32.471939,-1


In [5]:
df = pd.read_csv('data/blocks_with_attr.csv',encoding='unicode_escape',usecols=['ID20','STATE','COUNTY','P_delta'],dtype={'ID20':'int64','STATE':'int32','COUNTY':'str','P_delta':'int32'})
# df = cudf.from_pandas(df)
df.head()

Unnamed: 0,ID20,STATE,COUNTY,P_delta
0,10010201001000,1,Autauga County,-10
1,10010201001001,1,Autauga County,4
2,10010201001002,1,Autauga County,-23
3,10010201001003,1,Autauga County,4
4,10010201001005,1,Autauga County,-8


In [7]:
df.COUNTY.replace({r'[^\x00-\x7F]+':''},regex=True,inplace=True)
df.COUNTY.replace({r'([A-Z][a-z]+)([A-Z]+)':r'\1'},regex=True,inplace=True)

In [8]:
df = cudf.from_pandas(df)

In [9]:
df = dask_cudf.from_cudf(df,npartitions=33)

In [10]:
len(df)

6194258

In [6]:
# Split dataset to manage OOM
# concat_data1 = pd.DataFrame()
# concat_data2 = pd.DataFrame()
# concat_data3 = pd.DataFrame()
# concat_data4 = pd.DataFrame()
# concat_data5 = pd.DataFrame()
# concat_data12 = None
# concat_data34 = None
# concat_data12345 = None

# def prepare_final_data(i,pop,df):
#     global concat_data1
#     global concat_data2
#     global concat_data3
#     global concat_data4
#     global concat_data5
#     global concat_data12
#     global concat_data34
#     global concat_data12345

#     pop = cudf.from_pandas(pop)
#     df = cudf.from_pandas(df)                      
#     merged_data = pop.merge(df,on='ID20',how='left').sort_values('ID20')
#     # print(merged_data.head())
#     del(pop,df)
#     if i <= 12:
#         concat_data1 = pd.concat([concat_data1,merged_data.to_pandas()])
#     elif i <= 24:
#         concat_data2 = pd.concat([concat_data2,merged_data.to_pandas()])
#         if i== 24:
#             concat_data12 = pd.concat([concat_data1,concat_data2])
#             concat_data12.to_csv('data/concat_data12.csv')
#             del(concat_data1,concat_data2)    
#     elif i <= 36:
#         concat_data3 = pd.concat([concat_data3,merged_data.to_pandas()])
#     elif i <= 47:
#         concat_data4 = pd.concat([concat_data4,merged_data.to_pandas()])
#         if i == 47:
#             concat_data34 = pd.concat([concat_data3,concat_data4])
#             concat_data34.to_csv('data/concat_data34.csv')
#             del(concat_data3,concat_data4)
#     else: 
#         concat_data5 = pd.concat([concat_data5,merged_data.to_pandas()]) 
#         if i == 72:
#             concat_data5.to_csv('data/concat_data5.csv')
#             concat12345 = pd.concat([concat_data34,concat_data5])
#             del(concat_data12,concat_data34,concat_data5)

# states = {25:"MA",26:"MI",27:"MN",
#           28:"MS",29:"MO",30:"MT",31:"NE",32:"NV",33:"NH",34:"NJ",35:"NM",36:"NY",37:"NC",38:"ND",39:"OH",
#           40:"OK",41:"OR",42:"PA",44:"RI",45:"SC",46:"SD",47:"TN",48:"TX",49:"UT",50:"VT",51:"VA",53:"WA",
#           54:"WV",55:"WI",56:"WY",72:"PR"}

# for i in states.keys():
#     print(i)
#     l1 = int(str(i)+'0'*13)
#     l2 = int(str(i+1)+'0'*13)
#     pop1 = pop[(pop.ID20>=l1) & (pop.ID20<=l2)]
#     df1 = df[df.STATE==i]
#     prepare_final_data(i,pop1,df1)

In [11]:
dataset = pop.merge(df,on='ID20',how='left')
dataset.head()

Unnamed: 0,ID20,x,y,pop,STATE,COUNTY,P_delta
0,10730129081005,-86.79953,33.390617,-6,1,Jefferson County,-16
1,10730141062002,-87.081055,33.341869,1,1,Jefferson County,49
2,10550108002031,-85.94001,34.133358,1,1,Etowah County,26
3,10810418022002,-85.227203,32.743073,7,1,Lee County,-20
4,10730120031020,-86.7976,33.596527,6,1,Jefferson County,24


In [19]:
dataset['P_net'] = dataset['P_delta'].apply(lambda x: -1 if x < 0 else ( 1 if x>0 else 0))
dataset = dataset.reset_index(drop=True)
dataset = dataset.rename(columns ={'pop':'race','ID20':'blockid','STATE':'state','P_delta':'block_diff','COUNTY':'county','P_net':'block_net'})

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('P_delta', 'int64'))



In [20]:
print(len(dataset))
dataset.head()

182532663


Unnamed: 0,blockid,x,y,race,state,county,block_diff,block_net
0,10330207031015,-87.647072,34.736504,-1,1,Colbert County,-42,-1
1,10030116011047,-87.660904,30.471989,1,1,Baldwin County,122,1
2,10439654023028,-86.754951,34.006104,1,1,Cullman County,28,1
3,10510305002045,-85.966774,32.588291,2,1,Elmore County,11,1
4,10150003001064,-85.829102,33.66732,-1,1,Calhoun County,-16,-1


In [21]:
# dataset.to_parquet('data/census_migration_dataset.parquet')