In [1]:
import os
import zipfile
import bz2
import requests
from typing import Tuple
from google.cloud import storage
from datetime import timedelta
from typing import List, Tuple
from prefect import flow, task
from prefect.tasks import task_input_hash, exponential_backoff

In [2]:
def download_data(url: str, output_dir: str) -> str:

    output_dir = os.path.abspath(output_dir)

    os.makedirs(output_dir, exist_ok=True)
    file_path = os.path.join(output_dir, os.path.basename(url))

    response = requests.get(url)
    with open(file_path, 'wb') as f:
        f.write(response.content)

    return file_path

def extract_data(file_path: str, output_dir: str) -> Tuple[str, str]:

    output_dir = os.path.abspath(output_dir)

    os.makedirs(output_dir, exist_ok=True)
    with zipfile.ZipFile(file_path, 'r') as zip_ref:
        zip_ref.extractall(output_dir)

    extracted_files = []
    for root, _, files in os.walk(output_dir):
        for file in files:
            extracted_files.append(os.path.join(root, file))

    return tuple(extracted_files)

In [14]:
extract_data(download_data('https://github.com/DataTalksClub/nyc-tlc-data/archive/refs/tags/yellow.zip', 'data'), 'data')

('/workspaces/DE_airline_on_time/data/yellow.zip',
 '/workspaces/DE_airline_on_time/data/nyc-tlc-data-yellow/README.md')

In [84]:
extracted_files = []
for root, _, files in os.walk('data'):
    for file in files:
        extracted_files.append(os.path.join(root, file))

print(tuple(extracted_files))
extracted_files = tuple(extracted_files)

('data/1987.csv.gz', 'data/1988.csv.gz', 'data/1989.csv.gz', 'data/1990.csv.gz', 'data/1991.csv.gz', 'data/1992.csv.gz', 'data/1993.csv.gz', 'data/1994.csv.gz', 'data/1995.csv.gz', 'data/1996.csv.gz', 'data/1997.csv.gz', 'data/1998.csv.gz', 'data/1999.csv.gz', 'data/2000.csv.gz', 'data/2001.csv.gz', 'data/2002.csv.gz', 'data/2003.csv.gz', 'data/2004.csv.gz', 'data/2005.csv.gz', 'data/2006.csv.gz', 'data/2007.csv.gz', 'data/2008.csv.gz', 'data/airports.csv', 'data/carriers.csv', 'data/plane-data.csv', 'data/variable-descriptions.csv')


In [88]:
filename = os.path.join('data2', os.path.basename('data/1987.csv.gz'))
print(filename)
print(os.path.basename('data/1987.csv.gz'))

data2/1987.csv.gz
1987.csv.gz


In [79]:
import pandas as pd
def compress_files(extracted_files: Tuple[str, ...], output_dir: str) -> Tuple[str, str]:
    """
    This function compresses extracted CSV files in .bz2 format into .gz format and saves them in the specified output
    directory. If the CSV file is encoded with 'latin_1', it will first be decoded before being compressed. Additionally,
    any occurrences of the characters '-', 'ä', 'æ', and 'â' in the 'TailNum' column will be stripped of whitespace.

    Parameters:
    extracted_files (Tuple[str, ...]): A tuple of file paths to CSV files in .bz2 format to be compressed.
    output_dir (str): The directory in which the compressed files will be saved.

    Returns:
    Tuple[str, str]: A tuple of file paths to the compressed files in the output directory.

    """
    for file_path in extracted_files:
        # Check if the file is in .bz2 format
        if file_path.endswith('.bz2'):
            compressed_file_path = file_path[:-4] + '.gz'
            try:
                # Try to read the file using the default encoding ('utf-8')
                df = pd.read_csv(file_path)
                # Compress the file and save it in the output directory
                df.to_csv(compressed_file_path, index=False, compression='gzip')
                # Remove the original file
                os.remove(file_path)
            except UnicodeDecodeError:
                # If the default encoding fails, try reading the file using 'latin_1' encoding
                df = pd.read_csv(file_path, encoding='latin_1')
                df['TailNum'] = df['TailNum'].str.strip('-äæâ')
                # Compress the file and save it in the output directory
                df.to_csv(compressed_file_path, index=False, compression='gzip')
                # Remove the original file
                os.remove(file_path)
    
    # Collect all file paths in the output directory
    all_files = []
    for root, _, files in os.walk(output_dir):
        for file in files:
            all_files.append(os.path.join(root, file))
    
    # Return the file paths as a tuple
    return tuple(all_files)

In [45]:
df = pd.read_csv('data/2001.csv.bz2', encoding='latin_1')
df['TailNum'] = df['TailNum'].str.strip('-äæâ')

In [None]:
df2 = pd.read_csv('data/2001.csv.bz2', encoding='latin_1')
df2['TailNum'] = df2['TailNum'].str.strip('-äæâ')

In [72]:
df2.columns

Index(['Unnamed: 0', 'Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime',
       'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum',
       'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay',
       'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut',
       'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay',
       'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay'],
      dtype='object')

In [51]:
df2['TailNum'] = df2['TailNum'].value_counts()

äNKNOæ    176811
-N823A      3982
-N819A      3850
-N810A      3755
-N916D      3631
           ...  
N000A1         1
N661äâ         1
N662äâ         1
N7BäA1         1
N668äâ         1
Name: TailNum, Length: 4561, dtype: int64

In [16]:
import pandas as pd
df3 = pd.read_csv('data/2008.csv.bz2')

In [17]:
df3.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2389217 entries, 0 to 2389216
Data columns (total 29 columns):
 #   Column             Dtype  
---  ------             -----  
 0   Year               int64  
 1   Month              int64  
 2   DayofMonth         int64  
 3   DayOfWeek          int64  
 4   DepTime            float64
 5   CRSDepTime         int64  
 6   ArrTime            float64
 7   CRSArrTime         int64  
 8   UniqueCarrier      object 
 9   FlightNum          int64  
 10  TailNum            object 
 11  ActualElapsedTime  float64
 12  CRSElapsedTime     float64
 13  AirTime            float64
 14  ArrDelay           float64
 15  DepDelay           float64
 16  Origin             object 
 17  Dest               object 
 18  Distance           int64  
 19  TaxiIn             float64
 20  TaxiOut            float64
 21  Cancelled          int64  
 22  CancellationCode   object 
 23  Diverted           int64  
 24  CarrierDelay       float64
 25  WeatherDelay      

In [18]:
df3['DepTime'].value_counts()

600.0    5733
555.0    5659
700.0    5454
655.0    5004
556.0    4799
         ... 
246.0       1
322.0       1
347.0       1
312.0       1
307.0       1
Name: DepTime, Length: 1434, dtype: int64

In [31]:
import numpy as np
def convert_to_time(value):
    try:
        return pd.to_datetime(int(value), format='%H%M').time()
    except (ValueError, TypeError):
        return pd.NaT


In [32]:
df3['time_col'] = df3['DepTime'].apply(convert_to_time)

KeyboardInterrupt: 

In [30]:
df3['time_col'].value_counts()

Series([], Name: time_col, dtype: int64)

In [13]:
df3[df3['DepTime']==7]

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
77380,2008,1,27,7,7.0,2030,207.0,2230,WN,273,...,3.0,10.0,0,,0,0.0,0.0,0.0,0.0,217.0
143637,2008,1,21,1,7.0,2220,115.0,2341,YV,2681,...,12.0,11.0,0,,0,94.0,0.0,0.0,0.0,0.0
202502,2008,1,21,1,7.0,2000,203.0,2142,OO,6703,...,5.0,35.0,0,,0,0.0,261.0,0.0,0.0,0.0
221040,2008,1,2,3,7.0,2240,837.0,654,UA,98,...,13.0,16.0,0,,0,0.0,0.0,20.0,0.0,83.0
233624,2008,1,4,5,7.0,2215,143.0,4,UA,522,...,4.0,17.0,0,,0,90.0,0.0,0.0,0.0,9.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2361525,2008,4,13,7,7.0,15,524.0,540,CO,1740,...,4.0,16.0,0,,0,,,,,
2363453,2008,4,14,1,7.0,15,520.0,542,CO,1740,...,5.0,7.0,0,,0,,,,,
2373382,2008,4,4,5,7.0,2125,132.0,2244,DL,1441,...,6.0,22.0,0,,0,0.0,0.0,6.0,0.0,162.0
2379046,2008,4,9,3,7.0,2140,133.0,2324,DL,870,...,6.0,12.0,0,,0,0.0,0.0,0.0,0.0,129.0


In [8]:
df3.head

AttributeError: 'DataFrame' object has no attribute 'to_datetime'

In [None]:
import pandas as pd

df = pd.read_csv('data/1987.csv.bz2')

df.head()

df.to_csv('data/1987.csv.gz', compression = "gzip")

In [13]:
def extract_individual_files(extracted_files: Tuple[str, ...]) -> Tuple[str, ...]:
    new_extracted_files = []
    for file_path in extracted_files:
        if file_path.endswith('.bz2'):
            with open(file_path, 'rb') as f:
                content = f.read()
            decompressed_content = bz2.decompress(content)
            new_file_path = file_path[:-4]  # remove .bz2 extension
            with open(new_file_path, 'wb') as f:
                f.write(decompressed_content)
            new_extracted_files.append(new_file_path)
        else:
            new_extracted_files.append(file_path)
    print((new_extracted_files))
    return (new_extracted_files)


In [16]:
extracted_files = ('data/1987.csv.bz2', 'data/1988.csv.bz2', 'data/1989.csv.bz2', 'data/1990.csv.bz2', 'data/1991.csv.bz2', 'data/1992.csv.bz2', 'data/1993.csv.bz2', 'data/1994.csv.bz2', 'data/1995.csv.bz2', 'data/1996.csv.bz2', 'data/1997.csv.bz2', 'data/1998.csv.bz2', 'data/1999.csv.bz2', 'data/2000.csv.bz2', 'data/2001.csv.bz2', 'data/2002.csv.bz2', 'data/2003.csv.bz2', 'data/2004.csv.bz2', 'data/2005.csv.bz2', 'data/2006.csv.bz2', 'data/2007.csv.bz2', 'data/2008.csv.bz2', 'data/airports.csv', 'data/carriers.csv', 'data/plane-data.csv', 'data/variable-descriptions.csv')
individual_files = extract_individual_files(extracted_files)
# compress_file(individual_files)

['data/1987.csv', 'data/1988.csv', 'data/1989.csv', 'data/1990.csv', 'data/1991.csv', 'data/1992.csv', 'data/1993.csv', 'data/1994.csv', 'data/1995.csv', 'data/1996.csv', 'data/1997.csv', 'data/1998.csv', 'data/1999.csv', 'data/2000.csv', 'data/2001.csv', 'data/2002.csv', 'data/2003.csv', 'data/2004.csv', 'data/2005.csv', 'data/2006.csv', 'data/2007.csv', 'data/2008.csv', 'data/airports.csv', 'data/carriers.csv', 'data/plane-data.csv', 'data/variable-descriptions.csv']


In [27]:
compress_file(['data/1987.csv', 'data/1988.csv', 'data/1989.csv', 'data/1990.csv', 'data/1991.csv', 'data/1992.csv', 'data/1993.csv', 'data/1994.csv', 'data/1995.csv', 'data/1996.csv', 'data/1997.csv', 'data/1998.csv', 'data/1999.csv', 'data/2000.csv', 'data/2001.csv', 'data/2002.csv', 'data/2003.csv', 'data/2004.csv', 'data/2005.csv', 'data/2006.csv', 'data/2007.csv', 'data/2008.csv', 'data/airports.csv', 'data/carriers.csv', 'data/plane-data.csv', 'data/variable-descriptions.csv'])

KeyboardInterrupt: 

In [28]:
import pandas as pd