In [1]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings("ignore")

from google.cloud import bigquery

# **BigQuery**

In [2]:
# client object
client = bigquery.Client()

# version
bigquery.__version__

Using Kaggle's public dataset BigQuery integration.


'2.2.0'

In [3]:
def about_dataset(dataset_id, project_id="bigquery-public-data"):
    """
    Description of any dataset in BigQuery.
    
    dataset_id > typr(str)
    project_id > type(str)
    """
    # client object
    client = bigquery.Client()
    # refrence for dataset
    dataset_ref = client.dataset(dataset_id,project= project_id)
    # Api request - fetch the dataset:
    dataset = client.get_dataset(dataset_ref)
    # about dataset
    print(dataset.description)
    

def list_table_id(dataset_id, project_id="bigquery-public-data"):
    """
    Return list of table_id in dataset.
    
    dataset_id > typr(str)
    project_id > type(str)
    """
    # client object
    client = bigquery.Client()
    # refrence for dataset
    dataset_ref = client.dataset(dataset_id,project= project_id)
    # Api request - fetch the dataset:
    dataset = client.get_dataset(dataset_ref)
    tables = client.list_tables(dataset)
    print(f"table_name_id in {dataset_id} dataset\n")
    for table in tables:
        print(f"table_name_id : {table.table_id}")

def about_table(table_id,dataset_id, project_id="bigquery-public-data"):
    """
    Description of table of dataset.
    
    dataset_id > typr(str)
    project_id > type(str)
    table_id   >   type(str)
    """
    # client object
    client = bigquery.Client()
    # refrence for dataset
    dataset_ref = client.dataset(dataset_id,project= project_id)
    # Api request - fetch the dataset:
    dataset = client.get_dataset(dataset_ref)
    
    table_ref = dataset_ref.table(table_id)
    # API -reguest - fetch the table :
    table = client.get_table(table_ref)
    # about table :
    print(table.description)

def about_column_oftable(table_id,dataset_id, project_id="bigquery-public-data"):
    """
    Description about column(field) of any table of dataset.
    Each `SchemaField` tells us about all tha column(field) name,type,mode & description in table.
    
    dataset_id > typr(str)
    project_id > type(str)
    table_id   > type(str)
    """
    # client object
    client = bigquery.Client()
    # refrence for dataset
    dataset_ref = client.dataset(dataset_id,project= project_id)
    # Api request - fetch the dataset:
    dataset = client.get_dataset(dataset_ref)
    
    table_ref = dataset_ref.table(table_id)
    # API -reguest - fetch the table :
    table = client.get_table(table_ref)
    # about column of table :
    return table.schema
    

def fetch_table(table_id,dataset_id, project_id="bigquery-public-data"):
    """
    fetch tables of dataset
    
    dataset_id > typr(str)
    project_id > type(str)
    table_id   > type(str)
    """
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset_id, project = project_id)
    dataset = client.get_dataset(dataset_ref)
    table_ref = dataset_ref.table(table_id)
    table = client.get_table(table_ref)
    return table

In [4]:
about_dataset("openaq")

Using Kaggle's public dataset BigQuery integration.
OpenAQ is an open-source project to surface live, real-time air quality data from around the world. Their “mission is to enable previously impossible science, impact policy and empower the public to fight air pollution.” The data includes air quality measurements from 5490 locations in 47 countries.

Scientists, researchers, developers, and citizens can use this data to understand the quality of air near them currently. The dataset only includes the most current measurement available for the location (no historical data). 

Dataset Source: openaq.org

Category: Science

Use: This dataset is publicly available for anyone to use under the following terms provided by the Dataset Source — https://openaq.org/#/about?_k=s3aspo — and is provided "AS IS" without any warranty, express or implied, from Google. Google disclaims all liability for any damages, direct or indirect, resulting from the use of the dataset. 

Update Frequency: Hourly


In [5]:
list_table_id("openaq")

Using Kaggle's public dataset BigQuery integration.
table_name_id in openaq dataset

table_name_id : global_air_quality


In [6]:
about_table("global_air_quality","openaq")

Using Kaggle's public dataset BigQuery integration.
None


In [7]:
about_column_oftable("global_air_quality","openaq")

Using Kaggle's public dataset BigQuery integration.


[SchemaField('location', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('country', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('pollutant', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('value', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('timestamp', 'TIMESTAMP', 'NULLABLE', None, (), None),
 SchemaField('unit', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('source_name', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('latitude', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('longitude', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('averaged_over_in_hours', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('location_geom', 'GEOGRAPHY', 'NULLABLE', None, (), None)]

In [8]:
table = fetch_table("global_air_quality","openaq")

# first 5 rows in "full" table:
df = client.list_rows(table,max_results=500).to_dataframe()

Using Kaggle's public dataset BigQuery integration.


In [9]:
df.head()

Unnamed: 0,location,city,country,pollutant,value,timestamp,unit,source_name,latitude,longitude,averaged_over_in_hours,location_geom
0,"Borówiec, ul. Drapałka",Borówiec,PL,bc,0.85217,2022-04-28 07:00:00+00:00,µg/m³,GIOS,1.0,52.276794,17.074114,POINT(52.276794 1)
1,"Kraków, ul. Bulwarowa",Kraków,PL,bc,0.91284,2022-04-27 23:00:00+00:00,µg/m³,GIOS,1.0,50.069308,20.053492,POINT(50.069308 1)
2,"Płock, ul. Reja",Płock,PL,bc,1.41,2022-03-30 04:00:00+00:00,µg/m³,GIOS,1.0,52.550938,19.709791,POINT(52.550938 1)
3,"Elbląg, ul. Bażyńskiego",Elbląg,PL,bc,0.33607,2022-05-03 13:00:00+00:00,µg/m³,GIOS,1.0,54.167847,19.410942,POINT(54.167847 1)
4,"Piastów, ul. Pułaskiego",Piastów,PL,bc,0.51,2022-05-11 05:00:00+00:00,µg/m³,GIOS,1.0,52.191728,20.837489,POINT(52.191728 1)


In [10]:
client.list_rows(table, max_results=5,
                 selected_fields=table.schema[:1]).to_dataframe()

Unnamed: 0,location
0,"Borówiec, ul. Drapałka"
1,"Kraków, ul. Bulwarowa"
2,"Płock, ul. Reja"
3,"Elbląg, ul. Bażyńskiego"
4,"Piastów, ul. Pułaskiego"


In [11]:
df.city.value_counts()

Kraków                  21
Łódź                    18
Rzeszów                 17
Płock                   16
Zielonka                14
                        ..
Czerwionka-Leszczyny     3
Złoty Potok              3
Nakło nad Notecią        3
Kalisz                   3
Kościerzyna              3
Name: city, Length: 64, dtype: int64

In [12]:
# save file
#with open("openaq_gaq.csv","w") as file:
   # file.write(df.To_csv())

# **SQL (Structured Query Language)**
## **1) Select, From & Where**
**The foundational compontents for all SQL queries**

In [13]:
query0 = """
        SELECT city
        FROM `bigquery-public-data.openaq.global_air_quality`
        WHERE country = 'US'
        """

query0_job = client.query(query0)
# API request -fetch data
query0_df = query0_job.to_dataframe()

In [14]:
query0_df.head(2)

Unnamed: 0,city
0,HOWARD
1,HOWARD


In [15]:
# save file
with open("query0.csv","w") as file:
    file.write(query0_df.to_csv())

In [16]:
query1 = """
        SELECT city
        FROM `bigquery-public-data.openaq.global_air_quality`
        WHERE country = 'IN'
        """
query1_job = client.query(query1)
# API request -fetch data
query1_df = query1_job.to_dataframe()

In [17]:
query1_df.head(2)

Unnamed: 0,city
0,Tirupati
1,Indore


In [18]:
# save file
with open("query1.csv","w") as file:
    file.write(query1_df.to_csv())

In [19]:
query2 = """ 
          SELECT city, country, value
          FROM `bigquery-public-data.openaq.global_air_quality`
          WHERE country = "IN"
          
          """
query2_job = client.query(query2)
# API request -fetch data
query2_df = query2_job.to_dataframe()

In [20]:
query2_df.head(2)

Unnamed: 0,city,country,value
0,Ambala,IN,490.0
1,Srinagar,IN,0.0


In [21]:
# save file
with open("query2.csv","w") as file:
    file.write(query2_df.to_csv())

In [22]:
query3 = """
          SELECT *
          FROM `bigquery-public-data.openaq.global_air_quality`
          WHERE country = "IN"
          """
query3_job = client.query(query3)
# API request -fetch data
query3_df = query3_job.to_dataframe()

In [23]:
query3_df.head(2)

Unnamed: 0,location,city,country,pollutant,value,timestamp,unit,source_name,latitude,longitude,averaged_over_in_hours,location_geom
0,"Patti Mehar, Ambala - HSPCB",Ambala,IN,co,490.0,2022-05-15 01:45:00+00:00,µg/m³,caaqm,0.25,30.379589,76.778328,POINT(30.379589 0.25)
1,"Rajbagh, Srinagar - JKSPCB",Srinagar,IN,co,0.0,2022-05-04 15:15:00+00:00,µg/m³,caaqm,0.25,34.066206,74.81982,POINT(34.066206 0.25)


In [24]:
# save file
with open("query3.csv","w") as file:
    file.write(query3_df.to_csv())

In [25]:
list_table_id("hacker_news")

Using Kaggle's public dataset BigQuery integration.
table_name_id in hacker_news dataset

table_name_id : comments
table_name_id : full
table_name_id : full_201510
table_name_id : stories


In [26]:
about_column_oftable("full","hacker_news")

Using Kaggle's public dataset BigQuery integration.


[SchemaField('title', 'STRING', 'NULLABLE', 'Story title', (), None),
 SchemaField('url', 'STRING', 'NULLABLE', 'Story url', (), None),
 SchemaField('text', 'STRING', 'NULLABLE', 'Story or comment text', (), None),
 SchemaField('dead', 'BOOLEAN', 'NULLABLE', 'Is dead?', (), None),
 SchemaField('by', 'STRING', 'NULLABLE', "The username of the item's author.", (), None),
 SchemaField('score', 'INTEGER', 'NULLABLE', 'Story score', (), None),
 SchemaField('time', 'INTEGER', 'NULLABLE', 'Unix time', (), None),
 SchemaField('timestamp', 'TIMESTAMP', 'NULLABLE', 'Timestamp for the unix time', (), None),
 SchemaField('type', 'STRING', 'NULLABLE', 'Type of details (comment, comment_ranking, poll, story, job, pollopt)', (), None),
 SchemaField('id', 'INTEGER', 'NULLABLE', "The item's unique id.", (), None),
 SchemaField('parent', 'INTEGER', 'NULLABLE', 'Parent comment ID', (), None),
 SchemaField('descendants', 'INTEGER', 'NULLABLE', 'Number of story or poll descendants', (), None),
 SchemaField

In [27]:
# `hacker_news` is a very large dataset
query4 = """
         SELECT title, score
         FROM `bigquery-public-data.hacker_news.full`
          WHERE type = "job"
         """
# create a `QueryJobConfig` object to estimates size of query `(dry_run=True)` without running it:
dry_run_config = bigquery.QueryJobConfig(dry_run=True)
query4_dry_run_config_job = client.query(query4,job_config=dry_run_config)

In [28]:
print(f"query will process {query4_dry_run_config_job.total_bytes_processed/1e+6} MB ")

query will process 548.784085 MB 


In [29]:
# limit how much data you are willing to scan: example(1GB)
one_GB = 1000*1000*1000
limit_config = bigquery.QueryJobConfig(maximum_bytes_billed=one_GB)
query4_limit_config_job = client.query(query4,job_config=limit_config)
# API- request - fetch data
query4_df = query4_limit_config_job.to_dataframe()

In [30]:
query4_df.head(2)

Unnamed: 0,title,score
0,Yoshi Engineering is hiring,1.0
1,Recent YC Grad looking for a data dev to help ...,1.0


In [31]:
with open("query4.csv","w") as file:
    file.write(query4_df.to_csv())

## **2) Group By, Having & Count**
**Get more interesting insights directly from your SQL queries**

In [32]:
# count - Total id's in full table
query5 = """
         SELECT COUNT(id)
         FROM `bigquery-public-data.hacker_news.full`
        
         """
query5_job = client.query(query5)
query5_df = query5_job.to_dataframe()

In [33]:
query5_df

Unnamed: 0,f0_
0,33272432


In [34]:
about_column_oftable("global_air_quality","openaq")

Using Kaggle's public dataset BigQuery integration.


[SchemaField('location', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('country', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('pollutant', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('value', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('timestamp', 'TIMESTAMP', 'NULLABLE', None, (), None),
 SchemaField('unit', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('source_name', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('latitude', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('longitude', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('averaged_over_in_hours', 'FLOAT', 'NULLABLE', None, (), None),
 SchemaField('location_geom', 'GEOGRAPHY', 'NULLABLE', None, (), None)]

In [35]:
# GROUP BY - How many cities in each country.
query6 = """
         SELECT country,COUNT(city)
         FROM `bigquery-public-data.openaq.global_air_quality`
         GROUP BY country
         """
query6_job = client.query(query6)
query6_df = query6_job.to_dataframe()

In [36]:
query6_df.head(2)

Unnamed: 0,country,f0_
0,PL,341984
1,US,1421351


In [37]:
query6_df[query6_df.country == "IN"]

Unnamed: 0,country,f0_
42,IN,770712


In [38]:
# HAVING - its used in combination with GROUP BY to 
# ignore groups that don't meet certain criteria.
query7 = """
         SELECT country,COUNT(city)
         FROM `bigquery-public-data.openaq.global_air_quality`
         GROUP BY country
         HAVING COUNT(city)>50
         """
query7_job = client.query(query7)
query7_df = query7_job.to_dataframe()

In [39]:
query7_df.tail(2)

Unnamed: 0,country,f0_
97,KZ,102
98,CS,59


In [40]:
# Aliasing and improved readability and faster
# `COUNT(1)` - faster than previous(use if unsure what to put inside`COUNT()`)
# `AS NumCity` - alias `f0_` column to `NumCity`
query8 = """
         SELECT country,COUNT(1) AS NumCity
         FROM `bigquery-public-data.openaq.global_air_quality`
         GROUP BY country
         HAVING COUNT(1)>50
         """
query8_job = client.query(query8)
query8_df = query8_job.to_dataframe()

In [41]:
query8_df.head()

Unnamed: 0,country,NumCity
0,PL,341984
1,US,1421351
2,ME,13466
3,AD,2920
4,CN,13681


## **3) Order By**
**Order your results to focus on the most important data for your use case.**

In [42]:
about_dataset("nhtsa_traffic_fatalities")

Using Kaggle's public dataset BigQuery integration.
None


In [43]:
list_table_id("nhtsa_traffic_fatalities")

Using Kaggle's public dataset BigQuery integration.
table_name_id in nhtsa_traffic_fatalities dataset

table_name_id :  accident_2015
table_name_id :  accident_2016
table_name_id :  accident_2017
table_name_id :  accident_2018
table_name_id :  accident_2019
table_name_id :  accident_2020
table_name_id :  cevent_2015
table_name_id :  cevent_2016
table_name_id :  cevent_2017
table_name_id :  cevent_2018
table_name_id :  cevent_2019
table_name_id :  cevent_2020
table_name_id :  damage_2015
table_name_id :  damage_2016
table_name_id :  damage_2017
table_name_id :  damage_2018
table_name_id :  damage_2019
table_name_id :  damage_2020
table_name_id :  distract_2015
table_name_id :  distract_2016
table_name_id :  distract_2017
table_name_id :  distract_2018
table_name_id :  distract_2019
table_name_id :  distract_2020
table_name_id :  drimpair_2015
table_name_id :  drimpair_2016
table_name_id :  drimpair_2017
table_name_id :  drimpair_2018
table_name_id :  drimpair_2019
table_name_id :  drimp

In [44]:
about_table("accident_2015","nhtsa_traffic_fatalities")

Using Kaggle's public dataset BigQuery integration.
This data file contains information about crash characteristics
and environmental conditions at the time of the crash. There is one record per crash.


In [45]:
table_accident_2015 = fetch_table("accident_2015","nhtsa_traffic_fatalities")
table_accident_2015_df= client.list_rows(table_accident_2015,max_results=50).to_dataframe()

Using Kaggle's public dataset BigQuery integration.


In [46]:
table_accident_2015_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50 entries, 0 to 49
Data columns (total 70 columns):
 #   Column                                                       Non-Null Count  Dtype              
---  ------                                                       --------------  -----              
 0   state_number                                                 50 non-null     int64              
 1   state_name                                                   50 non-null     object             
 2   consecutive_number                                           50 non-null     int64              
 3   number_of_vehicle_forms_submitted_all                        50 non-null     int64              
 4   number_of_motor_vehicles_in_transport_mvit                   50 non-null     int64              
 5   number_of_parked_working_vehicles                            50 non-null     int64              
 6   number_of_forms_submitted_for_persons_not_in_motor_vehicles  50 non-null    

In [47]:
#about_table_column("accident_2015","nhtsa_traffic_fatalities")

In [48]:
# total_Accident city wise
query9 = """
         SELECT city,COUNT(consecutive_number) AS Total_accidents
         FROM `bigquery-public-data.nhtsa_traffic_fatalities.accident_2015`
         GROUP BY city
         HAVING Total_accidents>100
         ORDER BY Total_accidents
         #limit 5
         """
# limit peocess data to be 1GB
limit_config = bigquery.QueryJobConfig(maximum_bytes_billed=10**9)
query9_job = client.query(query9,job_config=limit_config)
# API request - fetch data
query9_job_df = query9_job.to_dataframe()

In [49]:
query9_job_df.head()

Unnamed: 0,city,Total_accidents
0,3260,101
1,1620,106
2,2090,107
3,330,128
4,1260,131


In [50]:
# total_accident month wise
query10 = """
         SELECT COUNT(consecutive_number) AS Total_accidents,
         EXTRACT(MONTH  from timestamp_of_crash) AS month_of_year2015
         FROM `bigquery-public-data.nhtsa_traffic_fatalities.accident_2015`
         GROUP BY month_of_year2015
         ORDER BY Total_accidents DESC
         """
# limit peocess data to be 1GB
limit_config = bigquery.QueryJobConfig(maximum_bytes_billed=10**9)
query10_job = client.query(query10,job_config=limit_config)
# API request - fetch data
query10_job_df = query10_job.to_dataframe()

In [51]:
query10_job_df

Unnamed: 0,Total_accidents,month_of_year2015
0,3070,10
1,3049,8
2,3021,7
3,2904,9
4,2869,5
5,2861,12
6,2790,6
7,2780,11
8,2439,4
9,2401,3


# **4) As & With**
**Organize your query for better readability. This becomes especially important for complex queries.**

In [52]:
about_dataset("crypto_bitcoin")

Using Kaggle's public dataset BigQuery integration.



In [53]:
list_table_id("crypto_bitcoin")

Using Kaggle's public dataset BigQuery integration.
table_name_id in crypto_bitcoin dataset

table_name_id : blocks
table_name_id : inputs
table_name_id : outputs
table_name_id : transactions


In [54]:
about_table("transactions","crypto_bitcoin")

Using Kaggle's public dataset BigQuery integration.
All transactions.
Data is exported using https://github.com/blockchain-etl/bitcoin-etl



In [55]:
#about_column_oftable("transactions","crypto_bitcoin")

In [56]:
table = fetch_table("transactions","crypto_bitcoin")
table_df = client.list_rows(table,max_results=500).to_dataframe()

Using Kaggle's public dataset BigQuery integration.


In [57]:
table_df.head(2)

Unnamed: 0,hash,size,virtual_size,version,lock_time,block_hash,block_number,block_timestamp,block_timestamp_month,input_count,output_count,input_value,output_value,is_coinbase,fee,inputs,outputs
0,a16f3ce4dd5deb92d98ef5cf8afeaf0775ebca408f708b...,275,275,1,0,00000000dc55860c8a29c58d45209318fa9e9dc2c1833a...,181,2009-01-12 06:02:13+00:00,2009-01-01,1,2,4000000000.0,4000000000.0,False,0.0,"[{'index': 0, 'spent_transaction_hash': 'f4184...","[{'index': 0, 'script_asm': '04b5abd412d4341b4..."
1,591e91f809d716912ca1d4a9295e70c3e78bab077683f7...,275,275,1,0,0000000054487811fc4ff7a95be738aa5ad9320c394c48...,182,2009-01-12 06:12:16+00:00,2009-01-01,1,2,3000000000.0,3000000000.0,False,0.0,"[{'index': 0, 'spent_transaction_hash': 'a16f3...","[{'index': 0, 'script_asm': '0401518fa1d1e1e3e..."


## **5) Joining Data**
**Combine data sources. Critical for almost all real-world data problems**