### Ray Data with Ray AI Runtime (AIR)
<img src="https://images.ctfassets.net/xjan103pcp94/6Dus0MGXn2d7LxspFtlL2Y/a0adc3d04cd940c35518d2da8a6dd1bc/blog-ray-datasets-1.png" width="100%" loading="lazy">

For this tutorial we will rely `ray['air']` which can be installed using `pip install -U "ray[air]"`

In [1]:
import ray

# verify if a cluster already exist and terminate it
if ray.is_initialized:
    ray.shutdown()

# start a new cluster
ray.init()

  from .autonotebook import tqdm as notebook_tqdm
2023-05-19 17:02:45,940	INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8267 [39m[22m


0,1
Python version:,3.9.6
Ray version:,2.4.0
Dashboard:,http://127.0.0.1:8267


Load dataset

In [2]:
# Read .csv file to Ray Dataset.
par_dataset = ray.data.read_parquet([
    "s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_01_data.parquet"
])

[2m[36m(_get_read_tasks pid=65062)[0m   pq_ds.pieces, **prefetch_remote_args
[2m[36m(_get_read_tasks pid=65062)[0m   num_files = len(self._pq_ds.pieces)
[2m[36m(_get_read_tasks pid=65062)[0m   self._pq_ds.pieces[idx]
[2m[36m(_get_read_tasks pid=65062)[0m   np.array_split(self._pq_ds.pieces, parallelism),


### Splitting Datasets

In [None]:
# Split dataset
training_set, valid_set = par_dataset.train_test_split(test_size=0.3)
# distribute the dataset on blocks
training_set = training_set.repartition(num_blocks=5)
valid_set = valid_set.repartition(num_blocks=5)

                                                              

### Get Metadata

In [25]:
# Dataset| column : variable type
training_set.schema()

2023-05-19 12:15:25,612	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
2023-05-19 12:15:25,613	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)

[A
Running: 0.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory 0:   0%|          | 0/5 [00:00<?, ?it/s]
                                                                                                                           
[A

vendor_id: string
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
passenger_count: int8
trip_distance: float
pickup_longitude: float
pickup_latitude: float
rate_code_id: null
store_and_fwd_flag: string
dropoff_longitude: float
dropoff_latitude: float
payment_type: string
fare_amount: float
extra: float
mta_tax: float
tip_amount: float
tolls_amount: float
total_amount: float
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 2524

In [27]:
# number of rows in the training set
print('Number of samples in training set: {}'.format(training_set.count()), ', and validation set: {}'.format(valid_set.count()))

Number of samples in training set: 1128493 , and validation set: 282124


In [28]:
# show first 10 samples
training_set.show(10)

{'vendor_id': 'VTS', 'pickup_at': datetime.datetime(2009, 1, 21, 14, 58), 'dropoff_at': datetime.datetime(2009, 1, 21, 15, 3), 'passenger_count': 1, 'trip_distance': 0.5299999713897705, 'pickup_longitude': -73.99270629882812, 'pickup_latitude': 40.7529411315918, 'rate_code_id': None, 'store_and_fwd_flag': None, 'dropoff_longitude': -73.98814392089844, 'dropoff_latitude': 40.75956344604492, 'payment_type': 'CASH', 'fare_amount': 4.5, 'extra': 0.0, 'mta_tax': None, 'tip_amount': 0.0, 'tolls_amount': 0.0, 'total_amount': 4.5}
{'vendor_id': 'VTS', 'pickup_at': datetime.datetime(2009, 1, 15, 18, 52), 'dropoff_at': datetime.datetime(2009, 1, 15, 18, 54), 'passenger_count': 1, 'trip_distance': 0.052000001072883606, 'pickup_longitude': -73.9472885131836, 'pickup_latitude': 40.775917053222656, 'rate_code_id': None, 'store_and_fwd_flag': None, 'dropoff_longitude': -73.9524917602539, 'dropoff_latitude': 40.77061080932617, 'payment_type': 'CASH', 'fare_amount': 3.700000047683716, 'extra': 1.0, 'mt

### Preprocessing Operations

In [50]:
from ray.data.preprocessors.scaler import StandardScaler

# define transformation including column titles
preprocessor = StandardScaler(columns=(["trip_distance", 'fare_amount', 'tip_amount', 'tolls_amount','total_amount']))

# fit and apply the transformation =: .fit() [e.g., to compute mean and std. ] and .transform() [apply the transform]
transformed_data = preprocessor.fit_transform(training_set)

transformed_data.show(10)

2023-05-19 12:53:54,294	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate]
2023-05-19 12:53:54,296	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)

[A
[A

[A[A

[A[A


[A[A[A


                                                                                                                 
[A

[A[A


[A[A[A2023-05-19 12:53:54,441	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[StandardScaler]
2023-05-19 12:53:54,441	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
            

{'vendor_id': 'VTS', 'pickup_at': Timestamp('2009-01-21 14:58:00'), 'dropoff_at': Timestamp('2009-01-21 15:03:00'), 'passenger_count': 1, 'trip_distance': -0.6873525977134705, 'pickup_longitude': -73.99270629882812, 'pickup_latitude': 40.7529411315918, 'rate_code_id': None, 'store_and_fwd_flag': None, 'dropoff_longitude': -73.98814392089844, 'dropoff_latitude': 40.75956344604492, 'payment_type': 'CASH', 'fare_amount': -0.6938865184783936, 'extra': 0.0, 'mta_tax': nan, 'tip_amount': -0.36115217208862305, 'tolls_amount': -0.15814156830310822, 'total_amount': -0.7043600082397461}
{'vendor_id': 'VTS', 'pickup_at': Timestamp('2009-01-15 18:52:00'), 'dropoff_at': Timestamp('2009-01-15 18:54:00'), 'passenger_count': 1, 'trip_distance': -0.8492912650108337, 'pickup_longitude': -73.9472885131836, 'pickup_latitude': 40.775917053222656, 'rate_code_id': None, 'store_and_fwd_flag': None, 'dropoff_longitude': -73.9524917602539, 'dropoff_latitude': 40.77061080932617, 'payment_type': 'CASH', 'fare_amo

