# Load data in BigQuery

We use dlt to load data in a convenient way

In [32]:
import pandas as pd


def download_taxi_data(url, service):
    
    


    if service == 'green':
    # native date parsing
        parse_dates = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']
        taxi_dtypes = {
                'VendorID': pd.Int64Dtype(),
                'passenger_count': pd.Int64Dtype(),
                'trip_distance': float,
                'RatecodeID':pd.Int64Dtype(),
                'store_and_fwd_flag':str,
                'PULocationID':pd.Int64Dtype(),
                'DOLocationID':pd.Int64Dtype(),
                'payment_type': pd.Int64Dtype(),
                'fare_amount': float,
                'extra':float,
                'mta_tax':float,
                'tip_amount':float,
                'tolls_amount':float,
                'improvement_surcharge':float,
                'total_amount':float,
                'congestion_surcharge':float,
                'trip_type': pd.Int64Dtype()
            }
    elif service == 'yellow':
        parse_dates = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']
        taxi_dtypes = {
                'VendorID': pd.Int64Dtype(),
                'passenger_count': pd.Int64Dtype(),
                'trip_distance': float,
                'RatecodeID':pd.Int64Dtype(),
                'store_and_fwd_flag':str,
                'PULocationID':pd.Int64Dtype(),
                'DOLocationID':pd.Int64Dtype(),
                'payment_type': pd.Int64Dtype(),
                'fare_amount': float,
                'extra':float,
                'mta_tax':float,
                'tip_amount':float,
                'tolls_amount':float,
                'improvement_surcharge':float,
                'total_amount':float,
                'congestion_surcharge':float,
                'trip_type': pd.Int64Dtype()
            }
    else:
        parse_dates = ['pickup_datetime', 'dropOff_datetime']
        				
        taxi_dtypes = {
                    'dispatching_base_num': str,
                    'SR_Flag': float,
                    'Affiliated_base_number':str,
                    'PUlocationID':pd.Int64Dtype(),
                    'DOlocationID':pd.Int64Dtype(),
                }

    
    if service == 'fhv':
        Years = ['2019']
    else:
        Years = ['2019','2020']
        
    for year in Years:
        for i in range(12):

                    # sets the month part of the file_name string
            month = '0'+str(i+1)
            month = month[-2:]

            # csv file_name
            file_name = f"{service}_tripdata_{year}-{month}.csv.gz"

            # download it using requests via a pandas df
            request_url = f"{url}{service}/{file_name}"

            print(f'got file {request_url}')
            # load file with pd.concat
            df = pd.read_csv(
            request_url, sep=',', compression='gzip', dtype=taxi_dtypes,  parse_dates=parse_dates
            ) #
            print("Rename columns in Camel Case to Snake Case")
            df.columns = (df.columns
                  .str.replace('(?<=[a-z])(?=[A-Z])', '_', regex=True)
                  .str.lower()
                )

            yield df


In [2]:
!gcloud auth application-default login

Your browser has been opened to visit:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8085%2F&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login&state=CQrNCaWa7J3TKFh9LBbOLRcunzsWcW&access_type=offline&code_challenge=pZOtKQFMAUVoskbcylLEGOe594NBTgqpHYNg2hmkFik&code_challenge_method=S256


Credentials saved to file: [/Users/elvist/.config/gcloud/application_default_credentials.json]

These credentials will be used by any library that requests Application Default Credentials (ADC).

Quota project "dte-course" was added to ADC which can be used by Google client libraries for billing and quota. Note that some services may still bill the project owning the resource.


In [34]:
!python3.11 -m pip install google-cloud-bigquery-storage

Collecting google-cloud-bigquery-storage
  Obtaining dependency information for google-cloud-bigquery-storage from https://files.pythonhosted.org/packages/75/93/a4192dd34b42ab31c8411810db896deca31c48f845807a733602ac38d849/google_cloud_bigquery_storage-2.24.0-py2.py3-none-any.whl.metadata
  Downloading google_cloud_bigquery_storage-2.24.0-py2.py3-none-any.whl.metadata (5.6 kB)
Collecting proto-plus<2.0.0dev,>=1.22.0 (from google-cloud-bigquery-storage)
  Obtaining dependency information for proto-plus<2.0.0dev,>=1.22.0 from https://files.pythonhosted.org/packages/ad/41/7361075f3a31dcd05a6a38cfd807a6eecbfb6dbfe420d922cd400fc03ac1/proto_plus-1.23.0-py3-none-any.whl.metadata
  Downloading proto_plus-1.23.0-py3-none-any.whl.metadata (2.2 kB)
Collecting grpcio-status<2.0.dev0,>=1.33.2 (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0dev,>=1.34.0->google-cloud-bigquery-storage)
  Obtaining dependency information for grpc

In [35]:
import os
import dlt

os.environ['GOOGLE_CLOUD_PROJECT'] = 'dte-course'

init_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/'

services = ['green','yellow']

# Define your pipeline
pipeline = dlt.pipeline(
    #pipeline_name='pipeline',
    destination='bigquery',
    dataset_name='dbt_etogban'
)
for service in services:
    # Run the pipeline
    load_info = pipeline.run(download_taxi_data(init_url, service),
                              table_name=f"{service}_tripdata",
                              write_disposition="replace")
    print(f"{service} data has been loaded successfully")

got file https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-01.csv.gz
Rename columns in Camel Case to Snake Case
got file https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-02.csv.gz
Rename columns in Camel Case to Snake Case
got file https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-03.csv.gz
Rename columns in Camel Case to Snake Case
got file https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-04.csv.gz
Rename columns in Camel Case to Snake Case
got file https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-05.csv.gz
Rename columns in Camel Case to Snake Case
got file https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-06.csv.gz
Rename columns in Camel Case to Snake Case
got file https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tri