Skip to content

Commit

Permalink
added upload sections
Browse files Browse the repository at this point in the history
  • Loading branch information
nthieme committed Dec 17, 2018
1 parent 024622b commit d3470bf
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 68 deletions.
46 changes: 23 additions & 23 deletions mapbox_pipeline_wrapper.R
Expand Up @@ -19,7 +19,7 @@ library(jsonlite)

setwd("C:/Users/nickt/Desktop/USB_folder")
### setwd("Set this to the directory")
load("MLab_data_census_tract")
load("MLab_data_census_tract1")
load("D_477_2017_jun_prov")
load("D_477_2016_dec_prov")
load("D_477_2016_jun_prov")
Expand All @@ -31,6 +31,7 @@ load("totalpop_sf_tract")
load("legislative_mlab")
source("pipeline_functions.R")


##################################
#Prepare census county-level data#
##################################
Expand Down Expand Up @@ -67,7 +68,9 @@ names(D_477)[3]<-"GEOID"
D_joined<-left_join(D, totalpop_sf, by = "tract")

D_joined_summed<-D_joined%>%mutate(GEOID=str_sub(GEOID,1,5))%>%group_by(GEOID, date_range)%>%
summarise(mlab_speed=median(med_speed, na.rm = TRUE), counts = sum(count_ip, na.rm = TRUE))
summarise(mlab_speed=median(med_speed, na.rm = TRUE),
mlab_up_speed=median(med_speed, na.rm = TRUE),
counts = sum(count_ip, na.rm = TRUE))

rm(D)

Expand Down Expand Up @@ -95,6 +98,7 @@ D_plot_f<-D_plot_f%>%mutate(speed_diff_perc = speed_477/mlab_speed)

D_data_county<-data.frame(county = D_plot_f$GEOID ,
speed_mlab=D_plot_f$mlab_speed ,
speed_up=D_plot_f$mlab_up_speed,
speed_477=D_plot_f$speed_477,
speed_diff=D_plot_f$diff,
speed_diff_perc=D_plot_f$speed_diff_perc,
Expand Down Expand Up @@ -123,39 +127,33 @@ m_lab_final_leg_dis<-m_lab_477_final_leg_sf%>%group_by(GEOID.y, FUNCSTAT,date_ra
summarise(
speed_477=median(speed_477),
### Calculate average speeds, weighted by test counts
speed_mlab=sum(na.omit(mlab_speed *counts))/(sum(na.omit(counts))),
speed_mlab=sum(mlab_speed *counts, na.rm = TRUE)/(sum(counts, na.rm = TRUE)),
speed_up_mlab = sum(mlab_up_speed *counts, na.rm = TRUE)/(sum(counts, na.rm = TRUE)),
counts_avg = median(counts, na.rm=TRUE)

)%>%mutate(
speed_diff= speed_477-speed_mlab
)

simplepolys <- ms_simplify(m_lab_final_leg_dis)%>%st_as_sf

D_lower<-data.frame(geom = simplepolys$geom, house=simplepolys$FUNCSTAT,
house_num = simplepolys$GEOID)%>%
filter(as.character(house)=="lower")%>%st_as_sf()

D_upper<-data.frame(geom = simplepolys$geom, house=simplepolys$FUNCSTAT,
house_num = simplepolys$GEOID)%>%
filter(house=="upper")%>%st_as_sf()

### This section does the legistlative data by actual spatial joins which makes the mlab
### accurate but makes comparisons between 477 and mlab not quite even.

load("MLab_data_state_house")
load("MLab_data_state_senate")
load("house_counts")
load("senate_counts")
load("MLab_data_state_house_2")
load("MLab_data_state_senate_2")
load("house_counts_2")
load("senate_counts_2")

D_house<-data.frame(D_state_house, house=rep("lower", nrow(D_state_house)))%>%
group_by(GEOID, house,date_range)%>%summarise(
med_speed =median(med_speed, rm.na=TRUE)
med_speed =median(med_speed, na.rm=TRUE),
med_up_speed =median(med_up_speed, na.rm=TRUE)
)

day_range<-D_state_house%>%select(day, date_range)%>%distinct

D_house_counts<-left_join(day_range, house_count)%>%group_by(GEOID,date_range)%>%
summarise(counts = sum(count_ip, rm.na=TRUE))
summarise(counts = sum(count_ip, na.rm=TRUE))

D_house_joined <- left_join(D_house, D_house_counts)

Expand All @@ -164,20 +162,22 @@ house_df<-left_join(D_house_joined, house_shape)

D_senate<-data.frame(D_state_senate, house=rep("upper", nrow(D_state_senate)))%>%
group_by(GEOID, house,date_range)%>%summarise(
med_speed =median(med_speed, rm.na=TRUE)
med_speed =median(med_speed, na.rm = TRUE),
med_up_speed =median(med_up_speed, na.rm=TRUE)
)

day_range<-D_state_senate%>%select(day, date_range)%>%distinct

D_senate_counts<-left_join(day_range, senate_count)%>%group_by(GEOID,date_range)%>%
summarise(counts = sum(count_ip, rm.na=TRUE))
summarise(counts = sum(count_ip, na.rm=TRUE))

D_senate_joined <- left_join(D_senate, D_senate_counts)

legis_df <-bind_rows(D_senate_joined, D_house_joined)
legis_comp_df <-left_join(m_lab_final_leg_dis, legis_df, by = c("GEOID.y"="GEOID","date_range","FUNCSTAT"="house"))%>%
mutate(speed_diff_perc = speed_477/speed_mlab)

na(joined_ests$med_speed), joined_ests$speed_up_mlab, joined_ests$med_speed)

#########################
#Output the Mapbox jsons#
Expand All @@ -194,10 +194,10 @@ st_write(D_new, "full_speed_data_census_base_use.geojson")

### This section makes up the data layer.

st_geometry(legis_comp_df) <- "geom"
D_data<-data.frame(house_num=legis_comp_df$GEOID.y,
house=legis_comp_df$FUNCSTAT,
speed_mlab=legis_comp_df$med_speed,
speed_mlab=legis_comp_df$med_speed,
speed_up=legis_comp_df$med_up_speed,
speed_477=legis_comp_df$speed_477,
speed_diff=legis_comp_df$speed_diff,
speed_diff_perc=legis_comp_df$speed_diff_perc,
Expand All @@ -222,6 +222,6 @@ D_data<-data.frame(house_num=legis_comp_df$GEOID.y,
data_layer_json<-toJSON(json_list)

writeLines(data_layer_json,
con=paste(c("mapbox_leg_county_counts.json"),collapse = "")
con=paste(c("mapbox_leg_county_counts_new_agg_up.json"),collapse = "")
)

62 changes: 44 additions & 18 deletions pipeline_functions.R
Expand Up @@ -3,7 +3,7 @@
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#

project <- "mlab-sandbox"

library(tidyverse)
###A helper function for getting dataflow strings into the right format
stringify = function(x){
z=apply(x, 1, paste, collapse = ", ")
Expand All @@ -16,25 +16,51 @@ to_dataflow_string = function(dataflow_sf, file_name){
require(sf)
require(jsonlite)
library(tools)
ln_list<-nrow(dataflow_sf)
dataflow_list <-vector(mode = "list",length=ln_list)

dataflow_list <-vector(mode = "list",length=nrow(dataflow_sf))

for(i in 1:length(dataflow_list)){
val <- tryCatch(stringify(dataflow_sf[i,]$geometry[[1]][[1]][[1]]), error=function(e){return("e")})
if(val=="e"){
val<-stringify(dataflow_sf[i,]$geometry[[1]][[1]])
for(i in 1:ln_list){
num_polys<-length(dataflow_sf[i,]$geometry[[1]])
if(num_polys==1){
val <- tryCatch(stringify(dataflow_sf[i,]$geometry[[1]][[1]][[1]]),
error=function(e){return("e")})

if(length(val)==1){
val<-stringify(dataflow_sf[i,]$geometry[[1]][[1]])
}

dataflow_list[[i]]<-list("district"=dataflow_sf[i,]$GEOID,
"Values"=val,
"state"=str_c(" ",toTitleCase(dataflow_sf[i,]$state))
)
}else{
for(j in 1:num_polys){
val <- tryCatch(stringify(dataflow_sf[i,]$geometry[[1]][[j]][[1]]),
error=function(e){return("e")})

if(length(val)==1){
val<-stringify(dataflow_sf[i,]$geometry[[1]][[j]])
}

curr_list<-list("district"=dataflow_sf[i,]$GEOID,
"Values"=val,
"state"=str_c(" ",toTitleCase(dataflow_sf[i,]$state))
)

dataflow_list<-append(dataflow_list, list(curr_list), i-1)
}
}
dataflow_list[[i]]<-list("district"=dataflow_sf[i,]$GEOID,
"Values"=val,
"state"=str_c(" ",toTitleCase(dataflow_sf[i,]$state))
)
}
lengths<-lapply(dataflow_list, length)
dataflow_list_short<-dataflow_list[which(lengths==3)]

dataflow_json_lower<-toJSON(dataflow_list, auto_unbox = TRUE)
dataflow_json_lower<-toJSON(dataflow_list_short, auto_unbox = TRUE)
x <- fromJSON(dataflow_json_lower) # convert to data.frame
stream_out(x, file(paste(c(file_name, ".json"), collapse= "")))
}



###process 477 data
process_477 <-function(D){
D$med_dl<-as.numeric(D$med_dl)
Expand Down Expand Up @@ -70,12 +96,12 @@ load_477_data<-function(query,table){
load_time_chunks<-function(query){

start_loc<-str_locate(query,"GROUP")
dec_14<-"WHERE day BETWEEN '2014-07-01' AND '2014-12-01'"
jun_15<-"WHERE day BETWEEN '2014-12-01' AND '2015-07-01'"
dec_15<-"WHERE day BETWEEN '2015-07-01' AND '2015-12-01'"
jun_16<-"WHERE day BETWEEN '2015-12-01' AND '2016-07-01'"
dec_16<-"WHERE day BETWEEN '2016-07-01' AND '2016-12-01'"
jun_17<-"WHERE day BETWEEN '2016-12-01' AND '2017-07-01'"
dec_14<-"WHERE day BETWEEN '2014-06-30' AND '2014-12-31'"
jun_15<-"WHERE day BETWEEN '2014-12-31' AND '2015-06-30'"
dec_15<-"WHERE day BETWEEN '2015-06-30' AND '2015-12-31'"
jun_16<-"WHERE day BETWEEN '2015-12-31' AND '2016-06-30'"
dec_16<-"WHERE day BETWEEN '2016-06-30' AND '2016-12-31'"
jun_17<-"WHERE day BETWEEN '2016-12-31' AND '2017-06-30'"

query_list <- list(dec_14=dec_14, jun_15=jun_15,
dec_15=dec_15, jun_16=jun_16, dec_16=dec_16,jun_17=jun_17)
Expand Down
37 changes: 18 additions & 19 deletions post_dataflow_dataset_import.R
Expand Up @@ -13,15 +13,17 @@ load("legislative_mlab")
##################
#BigQuery queries#
##################
#`thieme.D_deserts_final_minrtt_state`

query_agg_ip<-"#standardSQL
SELECT day, APPROX_QUANTILES(min_rtt, 1000)[OFFSET(500)] as min_rtt,
SELECT day,
APPROX_QUANTILES(med_rtt, 1000)[OFFSET(500)] as med_rtt,
APPROX_QUANTILES(med_speed, 1000)[OFFSET(500)] as med_speed,
APPROX_QUANTILES(upload_med_speed, 1000)[OFFSET(500)] as upload_med_speed,
SUM(count_ip) as tract_test_counts, tract
FROM
`thieme.D_deserts_final_minrtt_state`
`thieme.dataflow_county_final_copy`
GROUP BY
day, tract"
Expand All @@ -31,10 +33,11 @@ query_house<-"#standardSQL
SELECT day,
APPROX_QUANTILES(med_rtt, 1000)[OFFSET(500)] as med_rtt,
APPROX_QUANTILES(med_speed, 1000)[OFFSET(500)] as med_speed,
APPROX_QUANTILES(upload_med_speed, 1000)[OFFSET(500)] as med_up_speed,
SUM(count_ip) as tract_test_counts,
client_lat, client_lon, tract
FROM
`thieme.D_joined_state_house`
`thieme.dataflow_lower_test_final`
GROUP BY
client_lat, client_lon, day, tract"
Expand All @@ -44,10 +47,11 @@ query_senate<-"#standardSQL
SELECT day,
APPROX_QUANTILES(med_rtt, 1000)[OFFSET(500)] as med_rtt,
APPROX_QUANTILES(med_speed, 1000)[OFFSET(500)] as med_speed,
APPROX_QUANTILES(upload_med_speed, 1000)[OFFSET(500)] as med_up_speed,
SUM(count_ip) as tract_test_counts,
client_lat, client_lon, tract
FROM
`thieme.D_joined_state_senate`
`thieme.dataflow_upper_test_final`
GROUP BY
Expand Down Expand Up @@ -93,11 +97,12 @@ GROUP BY
tract, day"

#`thieme.dataflow_upper_final_int`
senate_count_query <- "#standardSQL
SELECT DATE_TRUNC(partition_date, MONTH) AS day, COUNT(connection_spec.client_ip) as count_ip, tract
FROM
`thieme.state_senate_ndt`
`thieme.dataflow_upper_final_int`
GROUP BY
Expand All @@ -107,7 +112,7 @@ house_count_query <- "#standardSQL
SELECT DATE_TRUNC(partition_date, MONTH) AS day, COUNT(connection_spec.client_ip) as count_ip, tract
FROM
`thieme.state_house_ndt`
`thieme.dataflow_lower_test_int_final`
GROUP BY
Expand All @@ -130,29 +135,23 @@ D_477_2014_dec_prov <- load_477_data(query_477_prov, "thieme.477_dec_2014")

D_state_house <- load_time_chunks(query_house)
house_count<-query_exec(house_count_query, project = project, use_legacy_sql=FALSE, max_pages = Inf)
names(D_state_house)[7]<-"GEOID"
names(D_state_house)[8]<-"GEOID"
names(house_count)[3]<-"GEOID"

D_house_sum<-left_join(D_state_house, house_count, by = c("GEOID", "day"))%>%na.omit

D_state_senate <- load_time_chunks(query_senate)
senate_count<-query_exec(senate_count_query,project = project, use_legacy_sql=FALSE, max_pages = Inf)
names(D_state_senate)[7]<-"GEOID"
names(D_state_senate)[8]<-"GEOID"
names(senate_count)[3]<-"GEOID"

D_senate_sum<-left_join(D_state_senate, senate_count, by = c("GEOID","day"))%>%na.omit



save(D, file="MLab_data_census_tract")
save(D, file="MLab_data_census_tract1")
save(D_477_2017_jun_prov, file="D_477_2017_jun_prov")
save(D_477_2016_dec_prov, file="D_477_2016_dec_prov")
save(D_477_2016_jun_prov, file="D_477_2016_jun_prov")
save(D_477_2015_dec_prov, file="D_477_2015_dec_prov")
save(D_477_2015_jun_prov, file="D_477_2015_jun_prov")
save(D_477_2014_dec_prov, file="D_477_2014_dec_prov")

save(D_state_house, file="MLab_data_state_house")
save(house_count, file="house_counts")
save(D_state_senate, file="MLab_data_state_senate")
save(senate_count, file="senate_counts")
save(D_state_house, file="MLab_data_state_house_2")
save(house_count, file="house_counts_2")
save(D_state_senate, file="MLab_data_state_senate_2")
save(senate_count, file="senate_counts_2")
18 changes: 10 additions & 8 deletions pre_dataflow_dataset_import.R
Expand Up @@ -122,24 +122,26 @@ save(df_final, file = "legislative_mlab")
data(state.fips)

state_cross<-state.fips[,c(1,6)]%>%
mutate(state=polyname%>%str_split(":")%>%lapply(function(x)return(x[1]))%>%unlist)%>%
select(fips, state)%>%distinct
mutate(state=polyname%>%str_split(":")%>%lapply(function(x)return(x[1]))%>%unlist)

state_cross<-state_cross[,c(1,3)]%>%distinct

state_cross$fips<-ifelse(nchar(state_cross$fips)==1, str_c("0", state_cross$fips), state_cross$fips)
names(state_cross)[1]<-"STATEFP"
missing_fi<-c("02", "15")
missing_name<-c("alaska","hawaii")
missing_data <- data.frame("STATEFP"=missing_fi, "state"=missing_name)
state_cross<-rbind(state_cross, missing_data)
df_final_lower <- df_final%>%filter(FUNCSTAT=="lower")%>%select(GEOID, geometry,STATEFP)
df_final_upper <- df_final%>%filter(FUNCSTAT=="upper")%>%select(GEOID, geometry,STATEFP)

dataflow_lower_df<-left_join(df_final_lower,state_cross)
dataflow_upper_df<-left_join(df_final_upper,state_cross)

df_final_lower <- df_final%>%filter(FUNCSTAT=="lower")
df_final_upper <- df_final%>%filter(FUNCSTAT=="upper")

dataflow_lower_df<-left_join(df_final_lower,state_cross)%>%select(GEOID, geometry, state)
dataflow_upper_df<-left_join(df_final_upper,state_cross)%>%select(GEOID, geometry, state)
to_dataflow_string(dataflow_lower_df, "dataflow_mapbox1_lower_combine.json")
to_dataflow_string(dataflow_upper_df, "dataflow_mapbox1_upper_combine.json")

to_dataflow_string(dataflow_lower_df, "dataflow_mapbox1_lower.json")
to_dataflow_string(dataflow_upper_df, "dataflow_mapbox1_upper.json")

##########
#ZCTA I/O#
Expand Down

0 comments on commit d3470bf

Please sign in to comment.