# Libraries : 

In [1]:
import requests
import pandas as pd
import os
import io
from google.cloud import storage
from google.cloud import bigquery
import glob

# Downloading data : 

In [2]:
data = "yellow" # green, yellow, fhv
file_directory = (f'D:/other/project/data-engineering/homeworks/homework-dbt/data/{data}')
files = glob.glob(f'{file_directory}/{data}_tripdata_*.csv.gz')

In [None]:
#link for yellow taxi : https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz
#link for green taxi : https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-01.csv.gz
#link for fhv taxi : https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-01.csv.gz

month_list = ['01','02','03','04','05','06',"07","08","09","10","11","12"]
annual_list = ['2019','2020']
for j in annual_list: 

    for i in month_list:

        response = requests.get(f'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{data}/{data}_tripdata_{j}-{i}.csv.gz')
        file_path = (f'{file_directory}/{data}_tripdata_{j}-{i}.csv.gz')
        
        if response.status_code == 200 : 

            with open(file_path,'wb') as file : 
                file.write(response.content)
        else:
            print('unable to download the files successfully')     


print("All the monthly data are downloaded successfully")

# Merge all data : 

In [None]:
batch_size = 1
batch_list = []
for i in range(0,len(files),batch_size):

    batch_files = files[i:i+batch_size]
    print(f"Processing batch: {i // batch_size + 1}, Files: {batch_files}")
    process_file = [pd.read_csv(file,compression='gzip',dtype={'store_and_fwd_flag': 'str'}) for file in batch_files]
    merge_df = pd.concat(process_file,ignore_index=True)
    batch_list.append(merge_df)

final_df = pd.concat(batch_list,ignore_index=True)
final_df.to_csv(f"{file_directory}/{data}_tripdata.csv",index=False)
print("Concatenation completed successfully!")

Processing batch: 1, Files: ['D:/other/project/data-engineering/homeworks/homework-dbt/data/yellow\\yellow_tripdata_2019-01.csv.gz']
Processing batch: 2, Files: ['D:/other/project/data-engineering/homeworks/homework-dbt/data/yellow\\yellow_tripdata_2019-02.csv.gz']
Processing batch: 3, Files: ['D:/other/project/data-engineering/homeworks/homework-dbt/data/yellow\\yellow_tripdata_2019-03.csv.gz']
Processing batch: 4, Files: ['D:/other/project/data-engineering/homeworks/homework-dbt/data/yellow\\yellow_tripdata_2019-04.csv.gz']
Processing batch: 5, Files: ['D:/other/project/data-engineering/homeworks/homework-dbt/data/yellow\\yellow_tripdata_2019-05.csv.gz']
Processing batch: 6, Files: ['D:/other/project/data-engineering/homeworks/homework-dbt/data/yellow\\yellow_tripdata_2019-06.csv.gz']
Processing batch: 7, Files: ['D:/other/project/data-engineering/homeworks/homework-dbt/data/yellow\\yellow_tripdata_2019-07.csv.gz']
Processing batch: 8, Files: ['D:/other/project/data-engineering/homew

# Authentication to GCP : 

In [None]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "D:/other/project/data-engineering/homeworks/homework-3/credential.json"
print(os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"))

# Upload to bucket : 

In [None]:
storage_client = storage.Client()
bucket_name = 'ny-taxi-project-448909-hw-dbt'
bucket = storage_client.bucket(bucket_name)
'''bucket = storage_client.create_bucket(bucket_name)
print(f'bucket {bucket_name} was created')'''
file_name = (f'{data}_tripdata.csv')
directory = (f'{file_directory}/{data}_tripdata.csv')

'''for file_name in os.listdir(directory):
    if file_name.endswith('.csv'):
        file_path = os.path.join(directory,file_name)
        blob = bucket.blob(file_name)
        blob.upload_from_filename(file_path)
        print(f'{file_name} uploaded to {bucket_name}')'''

blob = bucket.blob(file_name)
blob.upload_from_filename(directory)
print(f'{file_name} uploaded to {bucket_name}')

# Create a dataset : 

In [None]:
bigquery_client = bigquery.Client()
dataset_id = f'{bigquery_client.project}.HW_3'
dataset = bigquery.Dataset(dataset_id)
dataset.location = 'US'
dataset = bigquery_client.create_dataset(dataset)
print(f'The {dataset_id} dataset was created')

In [None]:
with open("data/fhv_tripdata_2019_cleaned.csv", "r", encoding="utf-8") as f:
    f.seek(1876070839)
    print(f.readline())  # Print the problematic line


In [None]:
import csv

input_file = "data/fhv_tripdata_2019_cleaned.csv"
output_file = "data/fhv_tripdata_2019_fixed.csv"

with open(input_file, "r", encoding="utf-8") as infile, open(output_file, "w", encoding="utf-8", newline='') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)

    for row in reader:
        # Replace empty fields with explicit empty string
        row = ['""' if field == '' else field for field in row]
        writer.writerow(row)

print("CSV file cleaned and saved.")
