# Assignment: NY Taxi Fare Prediction with DASK

* Note: see [NY Taxi Usecase Notebook](https://colab.research.google.com/github/keuperj/DataEngineering22/blob/main/week_8/UseCase_NY_Taxi.ipynb) for an discription of a non-parallel solution

* NY Taxi Fare Prediction Task + Data: https://www.kaggle.com/c/new-york-city-taxi-fare-prediction/overview

In [1]:
#install DASK
!pip install distributed "dask[complete]" dask-ml graphviz  --upgrade

Collecting distributed
  Downloading distributed-2023.12.0-py3-none-any.whl (997 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m997.8/997.8 kB[0m [31m8.6 MB/s[0m eta [36m0:00:00[0m
Collecting dask[complete]
  Downloading dask-2023.12.0-py3-none-any.whl (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dask-ml
  Downloading dask_ml-2023.3.24-py3-none-any.whl (148 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m148.7/148.7 kB[0m [31m17.8 MB/s[0m eta [36m0:00:00[0m
Collecting pyarrow-hotfix (from dask[complete])
  Downloading pyarrow_hotfix-0.6-py3-none-any.whl (7.9 kB)
Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.3.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m29.4 MB/s[0m eta [36m0:00:00[0m
Collecting dask-glm>=0.2.

In [2]:
#in colab, we need to clone the data from the repo
!git clone https://github.com/keuperj/DATA.git
path='DATA'

Cloning into 'DATA'...
remote: Enumerating objects: 126, done.[K
remote: Counting objects: 100% (39/39), done.[K
remote: Compressing objects: 100% (28/28), done.[K
remote: Total 126 (delta 11), reused 39 (delta 11), pack-reused 87[K
Receiving objects: 100% (126/126), 185.56 MiB | 1.98 MiB/s, done.
Resolving deltas: 100% (32/32), done.
Updating files: 100% (86/86), done.


In [3]:
#read data to Pandas DF
import pandas as pd
data=pd.read_csv("DATA/NY_taxi_train_small.csv")
y=data['fare_amount']
X=data.drop(['fare_amount'],axis=1)


In [4]:
X.head()

Unnamed: 0,key,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,2014-03-12 17:04:30.0000001,2014-03-12 17:04:30 UTC,-73.956721,40.767081,-73.98908,40.772745,1
1,2009-04-17 21:59:00.00000044,2009-04-17 21:59:00 UTC,-73.870913,40.773722,-73.996285,40.716018,2
2,2009-10-06 13:42:00.00000015,2009-10-06 13:42:00 UTC,-73.976258,40.75141,-73.984795,40.751305,3
3,2012-05-02 21:38:39.0000004,2012-05-02 21:38:39 UTC,-73.97794,40.752586,-73.976525,40.667005,1
4,2011-04-21 18:11:13.0000001,2011-04-21 18:11:13 UTC,-73.98839,40.723152,-73.984367,40.736301,1


In [5]:
y.head()

0    11.0
1    23.7
2     5.7
3    28.1
4     5.7
Name: fare_amount, dtype: float64

## Assignment:
use *DASK Dataframes* and *DASK-ML* to
* split in train and test data
* build a pre-processing,
* feature extraction and
* prediction pipeline

to predict the taxi fares.

### Use
* local DASK cluster
* [DASK DataFrames](https://examples.dask.org/dataframe.html)
* [DASK-ML](https://ml.dask.org/)

### Hints:
* start with a very simple, but working predicten and then enhance the solution with better pre-processing and features

In [6]:
# Cluster setup
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:34831
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33567'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44029'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:44795', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:44795
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:32926
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:39171', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker co

In [7]:
#get client info
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 12.68 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34831,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 12.68 GiB

0,1
Comm: tcp://127.0.0.1:44795,Total threads: 1
Dashboard: http://127.0.0.1:38963/status,Memory: 6.34 GiB
Nanny: tcp://127.0.0.1:33567,
Local directory: /tmp/dask-scratch-space/worker-6rrs0ayj,Local directory: /tmp/dask-scratch-space/worker-6rrs0ayj

0,1
Comm: tcp://127.0.0.1:39171,Total threads: 1
Dashboard: http://127.0.0.1:36861/status,Memory: 6.34 GiB
Nanny: tcp://127.0.0.1:44029,
Local directory: /tmp/dask-scratch-space/worker-kjww3b5o,Local directory: /tmp/dask-scratch-space/worker-kjww3b5o


In [8]:
import dask.dataframe as dd
import dask.array as da



### Export DASK Dashboard to public URL

In [1]:
from google.colab.output import eval_js
print(eval_js( "google.colab.kernel.proxyPort(" + str(8787) + ")" ))

https://hsher8ujc0j-496ff2e9c6d22116-8787-colab.googleusercontent.com/
