In [1]:
from dask.distributed import Client, LocalCluster
from dask_cuda import LocalCUDACluster
import cudf
import os
import urllib.request
import dask_cudf

In [2]:
cluster = LocalCUDACluster(threads_per_worker=50)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:35605  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 200  Memory: 1.08 TB


In [3]:
def select_features ( USE_STR_FEATURES_FLAG = True ):
    """
    Define dataset [ select features and dtypes ]
    by default we include features capturing :
    time period, airline, origin/destination, departure delay, airtime, distance
    
        Attributes:
        USE_STR_FEATURES_FLAG (bool): whether to include string features (default: False)
        
    """
    str_dtypes = { \
        'Reporting_Airline': 'str',
        'IATA_CODE_Reporting_Airline' : 'str',
        'Origin' : 'str',
        'OriginCityName':'str',
        'Dest' : 'str',
        'DestCityName':'str',
        'Cancelled' : 'str' 
    }
    str_cols = list( str_dtypes.keys() )
    
    non_str_dtypes = { \
        'Flight_Number_Reporting_Airline': 'float32',
        'Year' : 'float32', 'Quarter' :'float32', 'Month' : 'float32', 'DayOfWeek' : 'float32',
        'DOT_ID_Reporting_Airline' : 'float32',
        'OriginCityMarketID':'float32', 'DestCityMarketID':'float32',
        'DepTime' : 'float32', 'DepDelay' : 'float32', 'DepDel15' : 'int',
        'ArrTime' : 'float32', 'ArrDelay' : 'float32',  'ArrDel15': 'int',
        'CRSDepTime' : 'float32', 'CRSArrTime' : 'float32',
        'AirTime' : 'float32',
        'Distance' : 'float32'                  
    }    
    non_str_cols = list ( non_str_dtypes.keys() )
    
    cols = []; dtypes = {}

    cols += non_str_cols    
    dtypes.update( non_str_dtypes )
    
    if USE_STR_FEATURES_FLAG:
        cols += str_cols
        dtypes.update(str_dtypes)
        
    # assert( len ( list( set(non_str_cols).union( set(str_cols) ) ) ) == ( len(non_str_cols) + len(str_cols) ) )
    return cols, dtypes

In [4]:
cols, dtypes = select_features()

In [5]:
!wget --no-check-certificate https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_6.zip

--2021-03-13 00:23:54--  https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_6.zip
Resolving transtats.bts.gov (transtats.bts.gov)... 204.68.194.70
Connecting to transtats.bts.gov (transtats.bts.gov)|204.68.194.70|:443... connected.
  Unable to locally verify the issuer's authority.
HTTP request sent, awaiting response... 200 OK
Length: 33597362 (32M) [application/x-zip-compressed]
Saving to: ‘On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_6.zip’


2021-03-13 00:24:20 (1.25 MB/s) - ‘On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_6.zip’ saved [33597362/33597362]



In [6]:
!unzip -o *.zip

Archive:  On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_6.zip
  inflating: On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2019_6.csv  
  inflating: readme.html             


In [7]:
!pwd

/workspace/ml


In [8]:
!ls

 benchmark.py
 dask-worker-space
 data
 data_loader.ipynb
 Dockerfile-ml
 hostfs
 images
 latest_ml_100
 ML_100.ipynb
'On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2019_6.csv'
 On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_6.zip
 readme.html
 teach_ML.py


In [9]:
csvfile = "On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2019_6.csv"
data=dask_cudf.read_csv(csvfile, chunksize=None, usecols=cols, dtype=dtypes)    

In [10]:
%%time
# force dask_cudf distributed dataframe onto a single gpu and single cudf.DataFrame prior to writing to csv
cudf_dataframe = data.compute()

CPU times: user 173 ms, sys: 169 ms, total: 342 ms
Wall time: 3.04 s


In [23]:
data_destination='/workspace/ml/data'

In [24]:
!mkdir -p data_destination

In [25]:
csv_path =  f'{data_destination}/2019_airlines.csv'

In [26]:
csv_path

'/workspace/ml/data/2019_airlines.csv'

In [27]:
cudf_dataframe.to_csv( csv_path )

In [28]:
test = cudf.read_csv(csv_path, index_col=0)

In [29]:
!du -ha {data_destination} | grep csv

98M	/workspace/ml/data/2019_airlines.csv
50M	/workspace/ml/data/2019_airlines_full.csv.tgz.ad
8.2M	/workspace/ml/data/airports.csv
516K	/workspace/ml/data/2019_airlines_full.csv.tgz.ae
51M	/workspace/ml/data/2019_airlines_full.csv.tgz.ab
44K	/workspace/ml/data/carriers.csv
51M	/workspace/ml/data/2019_airlines_full.csv.tgz.aa
51M	/workspace/ml/data/2019_airlines_full.csv.tgz.ac


In [30]:
cudf_dataframe.head()

Unnamed: 0,Year,Quarter,Month,DayOfWeek,Reporting_Airline,DOT_ID_Reporting_Airline,IATA_CODE_Reporting_Airline,Flight_Number_Reporting_Airline,OriginCityMarketID,Origin,...,DepTime,DepDelay,DepDel15,CRSArrTime,ArrTime,ArrDelay,ArrDel15,Cancelled,AirTime,Distance
0,2019.0,2.0,6.0,6.0,F9,20436.0,F9,1682.0,34492.0,RDU,...,,,,956.0,,,,1.0,,612.0
1,2019.0,2.0,6.0,6.0,F9,20436.0,F9,1683.0,30721.0,BOS,...,,,,1252.0,,,,1.0,,612.0
2,2019.0,2.0,6.0,6.0,F9,20436.0,F9,734.0,30643.0,BKG,...,,,,2318.0,,,,1.0,,471.0
3,2019.0,2.0,6.0,6.0,F9,20436.0,F9,2000.0,32211.0,LAS,...,21.0,3.0,0.0,715.0,650.0,-25.0,0.0,0.0,191.0,1747.0
4,2019.0,2.0,6.0,6.0,F9,20436.0,F9,2000.0,30397.0,ATL,...,753.0,-7.0,0.0,1021.0,959.0,-22.0,0.0,0.0,110.0,795.0
