In [1]:
import pandas as pd
from google.cloud import bigquery
bqclient = bigquery.Client()

In [2]:
table_path = 'bigquery-public-data.epa_historical_air_quality.co_daily_summary'
rows = bqclient.list_rows(table_path, max_results=10)
dataframe = rows.to_dataframe(create_bqstorage_client=True,)
dataframe.head()

Unnamed: 0,state_code,county_code,site_num,parameter_code,poc,latitude,longitude,datum,parameter_name,sample_duration,...,aqi,method_code,method_name,local_site_name,address,state_name,county_name,city_name,cbsa_name,date_of_last_change
0,6,99,5,42101,1,37.642165,-120.994212,NAD83,Carbon monoxide,8-HR RUN AVG END HOUR,...,32,,-,Modesto-14th Street,"814 14TH ST., MODESTO",California,Stanislaus,Modesto,"Modesto, CA",2021-11-08
1,32,510,4,42101,1,39.173056,-119.759167,WGS84,Carbon monoxide,8-HR RUN AVG END HOUR,...,25,,-,Carson City-NW Corner of Equipment Yard,875 EAST LONG STREET,Nevada,Carson City,Carson City,"Carson City, NV",2021-11-08
2,53,77,14,42101,1,46.602627,-120.505343,WGS84,Carbon monoxide,8-HR RUN AVG END HOUR,...,25,,-,"JIM AND JENNY'S TATTOO 210 E YAKIMA AVE, YAKIM...","210 E YAKIMA AVE, YAKIMA WA",Washington,Yakima,Yakima,"Yakima, WA",2021-11-08
3,2,20,17,42101,1,61.195004,-149.904983,WGS84,Carbon monoxide,8-HR RUN AVG END HOUR,...,40,,-,,SPENARD & BENSON/2902 SPENARD RD,Alaska,Anchorage,Anchorage,"Anchorage, AK",2021-11-09
4,2,20,37,42101,1,61.193337,-149.868868,WGS84,Carbon monoxide,8-HR RUN AVG END HOUR,...,31,,-,,SEWARD HWY & BENSON/3002 NEW SEWARD HWY,Alaska,Anchorage,Anchorage,"Anchorage, AK",2021-11-09


### Query required data from BigQuery and push aggregated data into MongoDB Atlas

In [62]:
# as the EPA dataset is very huge, we can only read parts of it at a time
# state names API - return list of all state names
offset = 0
offset_interval = 80000
row_count = 8614411
state_names = [] 
while offset < row_count:
    query_string = """SELECT DISTINCT state_name FROM `bigquery-public-data.epa_historical_air_quality.co_daily_summary` 
    LIMIT 80000 OFFSET """ + str(offset)
    names = (bqclient.query(query_string).result().to_dataframe(create_bqstorage_client=True,))
    #if !names.empty:
    state_names.append(names)
    offset += offset_interval

df = pd.concat(state_names, ignore_index=True)
state_names_list = df['state_name'].unique()
state_names_list

[              state_name
0               Illinois
1          Massachusetts
2              Minnesota
3                Montana
4               Nebraska
5          New Hampshire
6             New Mexico
7               New York
8               Oklahoma
9                 Oregon
10          Pennsylvania
11          Rhode Island
12                 Texas
13                  Utah
14             Wisconsin
15                Alaska
16            California
17              Colorado
18           Connecticut
19             Tennessee
20               Wyoming
21              Kentucky
22               Arizona
23                 Maine
24        North Carolina
25        South Carolina
26              Michigan
27              Missouri
28                  Iowa
29               Alabama
30     Country Of Mexico
31                Nevada
32                  Ohio
33               Florida
34            Washington
35              Maryland
36               Indiana
37               Georgia
38            New Jersey

In [53]:
# county names API - return list counties in specified states
offset = 0
offset_interval = 80000
row_count = 8614411
job_config = bigquery.QueryJobConfig(use_legacy_sql=True)
county_names = [] 
while offset < row_count:
    query_string = """SELECT county_name, state_name FROM `bigquery-public-data.epa_historical_air_quality.co_daily_summary` 
    WHERE state_name='California' OR state_name='Nevada' GROUP BY county_name, state_name LIMIT 80000 OFFSET """ + str(offset)
    names = (bqclient.query(query_string).result().to_dataframe(create_bqstorage_client=True,))
    #if !names.empty:
    county_names.append(names)
    offset += offset_interval

county_df = pd.concat(county_names, ignore_index=True)
county_df

Unnamed: 0,county_name,state_name
0,Kern,California
1,Santa Clara,California
2,San Luis Obispo,California
3,Stanislaus,California
4,Mariposa,California
5,Washoe,Nevada
6,Fresno,California
7,San Diego,California
8,Santa Barbara,California
9,Clark,Nevada


In [55]:
from pymongo import MongoClient

def connectToMongoCollection():
    connect_uri = "mongodb+srv://owner-user:carbonscore@cluster0.tqbki.mongodb.net/carbonScore?retryWrites=true&w=majority"
    mongo_client = MongoClient(connect_uri)
    global db
    db = mongo_client["carbonScore"]

def insert_states_list():
    state_df = state_names_list.tolist()
    mongo_dict_list = []
    states_collection = db["states"]
    for item in state_df:
        temp_dict = {"state": item}
        mongo_dict_list.append(temp_dict)
    states_collection.insert_many(mongo_dict_list)

def insert_county_list():
    county_list = county_df.values.tolist()
    mongo_dict_list = []
    counties_collection = db['counties']
    for item in county_list:
        temp_dict = {"county": item[0], "state": item[1]}
        mongo_dict_list.append(temp_dict)
    counties_collection.insert_many(mongo_dict_list)
    

In [56]:
connectToMongoCollection()
insert_states_list()
insert_county_list()

In [2]:
# fetch data one year at a time - as the dataset is very huge, we can only read parts of it at a time
# get aggregate CO levels for states CA and Nevada
offset = 0
offset_interval = 80000
row_count = 8614411
job_config = bigquery.QueryJobConfig(use_legacy_sql=True)
state_info = [] 
while offset < row_count:
    query_string = """SELECT * FROM [bigquery-public-data:epa_historical_air_quality.co_daily_summary] 
    WHERE YEAR(date_local) = 2015 LIMIT 80000 OFFSET """ + str(offset) 
    names = (bqclient.query(query_string, job_config=job_config).result().to_dataframe(create_bqstorage_client=True,))
    #if !names.empty:
    state_info.append(names)
    offset += offset_interval

state_df = pd.concat(state_info, ignore_index=True)

In [11]:
# aggregate CO mean by month
state_df['month'] = state_df['date_local'].apply(lambda x : x.month)
state_agg_df = state_df.groupby(['state_name','month']).agg({'arithmetic_mean': ['mean']}).reset_index()
state_agg_df.index = state_agg_df['month']
state_agg_df

Unnamed: 0_level_0,state_name,month,arithmetic_mean
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,mean
month,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2
1,California,1,0.513453
2,California,2,0.403894
3,California,3,0.339843
4,California,4,0.280516
5,California,5,0.240546
6,California,6,0.243935
7,California,7,0.214861
8,California,8,0.265384
9,California,9,0.280746
10,California,10,0.335891


In [9]:
# get aggregate CO levels for states CA and Nevada
offset = 0
offset_interval = 80000
row_count = 8614411
job_config = bigquery.QueryJobConfig(use_legacy_sql=True)
state_info = [] 
while offset < row_count:
    query_string = """SELECT * FROM [bigquery-public-data:epa_historical_air_quality.co_daily_summary] 
    WHERE YEAR(date_local) = 2015 AND  (state_name = 'California' OR state_name = 'Nevada') LIMIT 80000 OFFSET """ + str(offset) 
    names = (bqclient.query(query_string, job_config=job_config).result().to_dataframe(create_bqstorage_client=True,))
    #if !names.empty:
    state_info.append(names)
    offset += offset_interval

state_df = pd.concat(state_info, ignore_index=True)

In [12]:
county_agg_df = state_df.groupby(['state_name','county_name','month']).agg({'arithmetic_mean': ['mean']}).reset_index()
# dropping counties that dont have mean level for all 12 months - Sutter and Madera
county_agg_df = county_agg_df.drop(county_agg_df.index[county_agg_df['county_name'] == 'Sutter'])
county_agg_df = county_agg_df.drop(county_agg_df.index[county_agg_df['county_name'] == 'Madera'])
county_agg_df.index = county_agg_df['month']
county_agg_df

Unnamed: 0_level_0,state_name,county_name,month,arithmetic_mean
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,mean
month,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
1,California,Alameda,1,0.721533
2,California,Alameda,2,0.530977
3,California,Alameda,3,0.429491
4,California,Alameda,4,0.347129
5,California,Alameda,5,0.304991
...,...,...,...,...
8,Nevada,Washoe,8,0.198395
9,Nevada,Washoe,9,0.214044
10,Nevada,Washoe,10,0.259591
11,Nevada,Washoe,11,0.402479


In [13]:
county_agg_df.values.tolist()

[['California', 'Alameda', 1, 0.7215331774193541],
 ['California', 'Alameda', 2, 0.5309766309523812],
 ['California', 'Alameda', 3, 0.42949125806451605],
 ['California', 'Alameda', 4, 0.34712926666666655],
 ['California', 'Alameda', 5, 0.3049908548387095],
 ['California', 'Alameda', 6, 0.3109790666666665],
 ['California', 'Alameda', 7, 0.2669968655913979],
 ['California', 'Alameda', 8, 0.319388564516129],
 ['California', 'Alameda', 9, 0.3812216000000001],
 ['California', 'Alameda', 10, 0.46145535483870986],
 ['California', 'Alameda', 11, 0.5312062611111111],
 ['California', 'Alameda', 12, 0.542443838709677],
 ['California', 'Butte', 1, 0.4679052741935483],
 ['California', 'Butte', 2, 0.2614309642857144],
 ['California', 'Butte', 3, 0.20508741935483865],
 ['California', 'Butte', 4, 0.17615656666666663],
 ['California', 'Butte', 5, 0.1564555],
 ['California', 'Butte', 6, 0.1470844333333333],
 ['California', 'Butte', 7, 0.13543640322580636],
 ['California', 'Butte', 8, 0.17368332258064517

In [14]:
from pymongo import MongoClient

def connectToMongoCollection():
    connect_uri = "mongodb+srv://owner-user:carbonscore@cluster0.tqbki.mongodb.net/carbonScore?retryWrites=true&w=majority"
    mongo_client = MongoClient(connect_uri)
    global db
    db = mongo_client["carbonScore"]

def state_insert_CO_level(year):
    state_list = state_agg_df.values.tolist()
    mongo_dict_list = []
    states_collection = db["statewise_CO_yearly"]
    state_dict = {}
    for state in ['California', 'Nevada']:
        state_dict[state] = []
    for state,month,mean_val in state_list:
        state_dict[state].append(mean_val)
    for key, value in state_dict.items():
        temp_dict = {"year": year, "state": key, "CO_monthly": value}
        mongo_dict_list.append(temp_dict)
    states_collection.insert_many(mongo_dict_list)

def county_insert_CO_level(year):
    county_list = county_agg_df.values.tolist()
    mongo_dict_list = []
    counties_collection = db['countywise_CO_yearly']
    county_dict = {}
    for county in county_agg_df['county_name'].unique():
        county_dict[county] = []
    for state,county,month,mean_val in county_list:
        county_dict[county].append(mean_val)
    for key, value in county_dict.items():
        temp_dict = {"year": year, "county": key, "CO_monthly": value}
        mongo_dict_list.append(temp_dict)
    counties_collection.insert_many(mongo_dict_list)

In [15]:
connectToMongoCollection()
#state_insert_CO_level(2014)

In [16]:
#county_insert_CO_level(2015)

In [6]:
state_year_agg = state_df.groupby(['state_name']).agg({'arithmetic_mean':'mean'}).reset_index()
state_year_agg

Unnamed: 0,state_name,arithmetic_mean
0,Alabama,0.38313
1,Alaska,0.474776
2,Arizona,0.38722
3,Arkansas,0.306667
4,California,0.32969
5,Colorado,0.383858
6,Connecticut,0.284545
7,Delaware,0.310372
8,District Of Columbia,0.415555
9,Florida,0.259293


In [17]:
county_year_agg = state_df.groupby(['state_name','county_name']).agg({'arithmetic_mean':'mean'}).reset_index()
county_year_agg

Unnamed: 0,state_name,county_name,arithmetic_mean
0,California,Alameda,0.428544
1,California,Butte,0.236497
2,California,Contra Costa,0.383072
3,California,Fresno,0.264828
4,California,Humboldt,0.307227
5,California,Imperial,0.226503
6,California,Kern,0.22603
7,California,Los Angeles,0.355882
8,California,Madera,0.188704
9,California,Marin,0.374296


In [18]:
city_year_agg = state_df.groupby(['city_name','county_name']).agg({'arithmetic_mean':'mean'}).reset_index()
city_year_agg

Unnamed: 0,city_name,county_name,arithmetic_mean
0,Anaheim,Orange,0.670749
1,Arden-Arcade,Sacramento,0.276853
2,Arvin,Kern,0.163691
3,Azusa,Los Angeles,0.352053
4,Bakersfield,Kern,0.286319
...,...,...,...
66,Upland,San Bernardino,0.455029
67,Vallejo,Solano,0.399336
68,Vandenberg Air Force Base,Santa Barbara,0.187136
69,Victorville,San Bernardino,0.338536


In [19]:
from pymongo import MongoClient

def connectToMongoCollection():
    connect_uri = "mongodb+srv://owner-user:carbonscore@cluster0.tqbki.mongodb.net/carbonScore?retryWrites=true&w=majority"
    mongo_client = MongoClient(connect_uri)
    global db
    db = mongo_client["carbonScore"]

def generate_score_state_year(year):
    state_agg_list = state_year_agg.values.tolist()
    mongo_dict_list = []
    states_collection = db["state_CO_year_agg"]
    for item in state_agg_list:
        temp_dict = {"state": item[0], 'mean': item[1], 'year': year}
        mongo_dict_list.append(temp_dict)
    states_collection.insert_many(mongo_dict_list)

def generate_score_county_year(year):
    county_agg_list = county_year_agg.values.tolist()
    mongo_dict_list = []
    county_collection = db["county_CO_year_agg"]
    for item in county_agg_list:
        temp_dict = {"county": item[1], 'mean': item[2], 'year': year}
        mongo_dict_list.append(temp_dict)
    county_collection.insert_many(mongo_dict_list)

def generate_score_city_year(year):
    city_agg_list = city_year_agg.values.tolist()
    mongo_dict_list = []
    city_collection = db["city_CO_year_agg"]
    for item in city_agg_list:
        temp_dict = {"county": item[1], "city": item[0], 'mean': item[2], 'year': year}
        mongo_dict_list.append(temp_dict)
    city_collection.insert_many(mongo_dict_list)


In [20]:
connectToMongoCollection()
generate_score_county_year(2015)
generate_score_city_year(2015)
#generate_score_state_year(2015)