## Write data for Snowflake workflow

In [1]:
# https://towardsdatascience.com/exploring-bike-share-data-3e3b2f28760c


In [18]:
import urllib
import zipfile
import pandas as pd
import dask.dataframe as dd
import seaborn as sns
import os
import coiled

from dotenv import dotenv_values
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from dask.distributed import wait, LocalCluster, Client
from dask_snowflake import read_snowflake, to_snowflake
from distributed.client import _get_global_client

In [96]:
connection_kwargs = {
    k.replace("SNOWFLAKE_", "").lower(): v
    for k, v in dotenv_values(".env").items()
}

## Create table if not exists

In [6]:
table_name = "citibike_tripdata"

In [5]:
engine = create_engine(URL(**connection_kwargs))

In [5]:
engine.execute(f"drop table if exists {table_name}");

  engine.execute(f"drop table if exists {table_name}");


In [7]:
engine.execute(f"""create table if not exists {table_name} (
    ride_id varchar not null unique,
    rideable_type varchar not null,
    started_at timestamp not null,
    ended_at timestamp not null,
    start_station_name varchar not null,
    start_station_id smallint not null,
    end_station_name varchar not null,
    end_station_id smallint not null,
    start_lat number,
    start_lng number,
    end_lat number,
    end_lng number,
    is_member boolean not null
)""")

  engine.execute(f"""create table if not exists {table_name} (


<sqlalchemy.engine.cursor.LegacyCursorResult at 0x28d6c9110>

## Create cluster and client

In [20]:
%%time

cluster = coiled.Cluster(
    name="irina-write-to-snowflake",
    n_workers=5,
    backend_options={"region": "us-east-2"}
)

Output()

Output()

CPU times: user 6.27 s, sys: 458 ms, total: 6.73 s
Wall time: 1min 31s


In [7]:
# cluster = LocalCluster()

In [21]:
client = cluster.get_client()

In [22]:
client

0,1
Connection method: Cluster object,Cluster type: coiled.ClusterBeta
Dashboard: https://cluster-kvfws.dask.host?token=_AsqzLow92fhdZqi,

0,1
Dashboard: https://cluster-kvfws.dask.host?token=_AsqzLow92fhdZqi,Workers: 4
Total threads: 16,Total memory: 59.74 GiB

0,1
Comm: tls://10.0.26.250:8786,Workers: 4
Dashboard: http://10.0.26.250:8787/status,Total threads: 16
Started: Just now,Total memory: 59.74 GiB

0,1
Comm: tls://10.0.29.126:35681,Total threads: 4
Dashboard: http://10.0.29.126:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.29.126:42953,
Local directory: /scratch/dask-worker-space/worker-451w1co9,Local directory: /scratch/dask-worker-space/worker-451w1co9

0,1
Comm: tls://10.0.26.143:39199,Total threads: 4
Dashboard: http://10.0.26.143:8787/status,Memory: 14.94 GiB
Nanny: tls://10.0.26.143:45505,
Local directory: /scratch/dask-worker-space/worker-008_252n,Local directory: /scratch/dask-worker-space/worker-008_252n

0,1
Comm: tls://10.0.19.87:35749,Total threads: 4
Dashboard: http://10.0.19.87:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.19.87:41891,
Local directory: /scratch/dask-worker-space/worker-ryvfl797,Local directory: /scratch/dask-worker-space/worker-ryvfl797

0,1
Comm: tls://10.0.25.172:44353,Total threads: 4
Dashboard: http://10.0.25.172:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.25.172:45359,
Local directory: /scratch/dask-worker-space/worker-kls4vyvd,Local directory: /scratch/dask-worker-space/worker-kls4vyvd


## Read the data into dataframe

In [23]:
def safe_int(x):
    try:
        return int(float(x))
    except:
        return -1

In [24]:
start_date = "2019-01-01"
end_date = "2021-12-31"

In [26]:
csv_paths = []
for ts in pd.date_range(start=start_date, end=end_date, freq="MS"):
    csv_paths.append(f"s3://tripdata/{ts.year}{ts.month:02}-*-*.zip")
    csv_paths.append(f"s3://tripdata/{ts.year}{ts.month:02}-*-*.csv.zip")
csv_paths

['s3://tripdata/201901-*-*.zip',
 's3://tripdata/201901-*-*.csv.zip',
 's3://tripdata/201902-*-*.zip',
 's3://tripdata/201902-*-*.csv.zip',
 's3://tripdata/201903-*-*.zip',
 's3://tripdata/201903-*-*.csv.zip',
 's3://tripdata/201904-*-*.zip',
 's3://tripdata/201904-*-*.csv.zip',
 's3://tripdata/201905-*-*.zip',
 's3://tripdata/201905-*-*.csv.zip',
 's3://tripdata/201906-*-*.zip',
 's3://tripdata/201906-*-*.csv.zip',
 's3://tripdata/201907-*-*.zip',
 's3://tripdata/201907-*-*.csv.zip',
 's3://tripdata/201908-*-*.zip',
 's3://tripdata/201908-*-*.csv.zip',
 's3://tripdata/201909-*-*.zip',
 's3://tripdata/201909-*-*.csv.zip',
 's3://tripdata/201910-*-*.zip',
 's3://tripdata/201910-*-*.csv.zip',
 's3://tripdata/201911-*-*.zip',
 's3://tripdata/201911-*-*.csv.zip',
 's3://tripdata/201912-*-*.zip',
 's3://tripdata/201912-*-*.csv.zip',
 's3://tripdata/202001-*-*.zip',
 's3://tripdata/202001-*-*.csv.zip',
 's3://tripdata/202002-*-*.zip',
 's3://tripdata/202002-*-*.csv.zip',
 's3://tripdata/2020

In [81]:
ddf = dd.read_csv(
    csv_paths,
    compression="zip",
    blocksize=None,
    converters={
        "start_station_id": safe_int, 
        "end_station_id": safe_int, 
        "start station id": safe_int, 
        "end station id": safe_int
    }
)
ddf

Unnamed: 0_level_0,tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,end station longitude,bikeid,usertype,birth year,gender
npartitions=72,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
,int64,object,object,int64,object,float64,float64,int64,object,float64,float64,int64,object,int64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [31]:
ddf.head(3)

Unnamed: 0,tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,end station longitude,bikeid,usertype,birth year,gender
0,320,2019-01-01 00:01:47.4010,2019-01-01 00:07:07.5810,3160,Central Park West & W 76 St,40.778968,-73.973747,3283,W 89 St & Columbus Ave,40.788221,-73.970416,15839,Subscriber,1971,1
1,316,2019-01-01 00:04:43.7360,2019-01-01 00:10:00.6080,519,Pershing Square North,40.751873,-73.977706,518,E 39 St & 2 Ave,40.747804,-73.973442,32723,Subscriber,1964,1
2,591,2019-01-01 00:06:03.9970,2019-01-01 00:15:55.4380,3171,Amsterdam Ave & W 82 St,40.785247,-73.976673,3154,E 77 St & 3 Ave,40.773142,-73.958562,27451,Subscriber,1987,1


In [82]:
ddf.dtypes

tripduration                 int64
starttime                   object
stoptime                    object
start station id             int64
start station name          object
start station latitude     float64
start station longitude    float64
end station id               int64
end station name            object
end station latitude       float64
end station longitude      float64
bikeid                       int64
usertype                    object
birth year                   int64
gender                       int64
dtype: object

In [None]:
"""
ride_id                object
rideable_type          object
started_at             object
ended_at               object
start_station_name     object
start_station_id        int64
end_station_name       object
end_station_id          int64
start_lat             float64
start_lng             float64
end_lat               float64
end_lng               float64
is_member                bool
"""

In [83]:
ddf = ddf.rename(columns={
    "start station id": "start_station_id",
    "start station name": "start_station_name",
    "start station latitude": "start_lat",
    "start station longitude": "start_lng",
    "end station id": "end_station_id",
    "end station name": "end_station_name",
    "end station latitude": "end_lat",
    "end station longitude": "end_lng",
    "starttime": "started_at",
    "stoptime": "ended_at"
})
ddf.dtypes

tripduration            int64
started_at             object
ended_at               object
start_station_id        int64
start_station_name     object
start_lat             float64
start_lng             float64
end_station_id          int64
end_station_name       object
end_lat               float64
end_lng               float64
bikeid                  int64
usertype               object
birth year              int64
gender                  int64
dtype: object

In [84]:
ddf["ride_id"] = (
    ddf["bikeid"].astype(str) + 
    ddf["usertype"] + 
    ddf["birth year"].astype(str) + 
    ddf["gender"].astype(str) + 
    ddf["tripduration"].astype(str)
)

In [85]:
ddf["is_member"] = ddf["usertype"] == "Subscriber"

In [86]:
ddf = ddf.drop(columns=["bikeid", "usertype", "birth year", "gender", "tripduration"])

In [87]:
ddf.dtypes

started_at             object
ended_at               object
start_station_id        int64
start_station_name     object
start_lat             float64
start_lng             float64
end_station_id          int64
end_station_name       object
end_lat               float64
end_lng               float64
ride_id                object
is_member                bool
dtype: object

## Filter out invalid station ids

In [89]:
ddf2 = ddf[(ddf["start_station_id"] != -1) & (ddf["end_station_id"] != -1)].reset_index(drop=True)

## Create boolean is_member and drop member_casual

In [13]:
#ddf2["is_member"] = ddf2.member_casual == "member"

In [14]:
#ddf2 = ddf2.drop(columns="member_casual")

In [57]:
ddf2 = ddf2.repartition(partition_size="100Mb")

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['ended_at', 'member_casual', 'ride_id', 'rideable_type']
  Missing: ['bikeid', 'birth year', 'gender', 'stoptime', 'tripduration', 'usertype']

In [90]:
ddf2.npartitions

72

In [39]:
ddf2.index.head(3)

RangeIndex(start=0, stop=3, step=1)

In [91]:
ddf2.dtypes

started_at             object
ended_at               object
start_station_id        int64
start_station_name     object
start_lat             float64
start_lng             float64
end_station_id          int64
end_station_name       object
end_lat               float64
end_lng               float64
ride_id                object
is_member                bool
dtype: object

## Store to snowflake

In [92]:
wait(ddf2)

DoneAndNotDoneFutures(done=set(), not_done=set())

In [93]:
to_snowflake(ddf2, name=table_name, connection_kwargs=connection_kwargs)

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['member_casual', 'ride_id', 'rideable_type']
  Missing: ['bikeid', 'birth year', 'gender', 'tripduration', 'usertype']

## Cleanup

In [80]:
client.restart()

0,1
Connection method: Cluster object,Cluster type: coiled.ClusterBeta
Dashboard: https://cluster-kvfws.dask.host?token=_AsqzLow92fhdZqi,

0,1
Dashboard: https://cluster-kvfws.dask.host?token=_AsqzLow92fhdZqi,Workers: 15
Total threads: 60,Total memory: 223.98 GiB

0,1
Comm: tls://10.0.26.250:8786,Workers: 15
Dashboard: http://10.0.26.250:8787/status,Total threads: 60
Started: 36 minutes ago,Total memory: 223.98 GiB

0,1
Comm: tls://10.0.28.135:34433,Total threads: 4
Dashboard: http://10.0.28.135:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.28.135:39091,
Local directory: /scratch/dask-worker-space/worker-vogy6vgs,Local directory: /scratch/dask-worker-space/worker-vogy6vgs

0,1
Comm: tls://10.0.19.43:42717,Total threads: 4
Dashboard: http://10.0.19.43:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.19.43:37361,
Local directory: /scratch/dask-worker-space/worker-mmub66h0,Local directory: /scratch/dask-worker-space/worker-mmub66h0

0,1
Comm: tls://10.0.31.173:39403,Total threads: 4
Dashboard: http://10.0.31.173:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.31.173:42647,
Local directory: /scratch/dask-worker-space/worker-qz_2pni8,Local directory: /scratch/dask-worker-space/worker-qz_2pni8

0,1
Comm: tls://10.0.19.171:33127,Total threads: 4
Dashboard: http://10.0.19.171:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.19.171:35839,
Local directory: /scratch/dask-worker-space/worker-_b4ztx7s,Local directory: /scratch/dask-worker-space/worker-_b4ztx7s

0,1
Comm: tls://10.0.23.239:41431,Total threads: 4
Dashboard: http://10.0.23.239:8787/status,Memory: 14.94 GiB
Nanny: tls://10.0.23.239:46349,
Local directory: /scratch/dask-worker-space/worker-8dz2dlpt,Local directory: /scratch/dask-worker-space/worker-8dz2dlpt

0,1
Comm: tls://10.0.24.52:36455,Total threads: 4
Dashboard: http://10.0.24.52:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.24.52:45537,
Local directory: /scratch/dask-worker-space/worker-_m_wk5bz,Local directory: /scratch/dask-worker-space/worker-_m_wk5bz

0,1
Comm: tls://10.0.17.31:38613,Total threads: 4
Dashboard: http://10.0.17.31:8787/status,Memory: 14.95 GiB
Nanny: tls://10.0.17.31:38681,
Local directory: /scratch/dask-worker-space/worker-tagylinp,Local directory: /scratch/dask-worker-space/worker-tagylinp

0,1
Comm: tls://10.0.18.80:43613,Total threads: 4
Dashboard: http://10.0.18.80:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.18.80:42469,
Local directory: /scratch/dask-worker-space/worker-81_i2r9k,Local directory: /scratch/dask-worker-space/worker-81_i2r9k

0,1
Comm: tls://10.0.19.110:42367,Total threads: 4
Dashboard: http://10.0.19.110:8787/status,Memory: 14.92 GiB
Nanny: tls://10.0.19.110:41799,
Local directory: /scratch/dask-worker-space/worker-refj1_01,Local directory: /scratch/dask-worker-space/worker-refj1_01

0,1
Comm: tls://10.0.28.239:37103,Total threads: 4
Dashboard: http://10.0.28.239:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.28.239:39885,
Local directory: /scratch/dask-worker-space/worker-c7r9ada8,Local directory: /scratch/dask-worker-space/worker-c7r9ada8

0,1
Comm: tls://10.0.29.153:34091,Total threads: 4
Dashboard: http://10.0.29.153:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.29.153:42963,
Local directory: /scratch/dask-worker-space/worker-t5tu1b1n,Local directory: /scratch/dask-worker-space/worker-t5tu1b1n

0,1
Comm: tls://10.0.18.120:41365,Total threads: 4
Dashboard: http://10.0.18.120:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.18.120:44235,
Local directory: /scratch/dask-worker-space/worker-qkij6nkd,Local directory: /scratch/dask-worker-space/worker-qkij6nkd

0,1
Comm: tls://10.0.19.105:41787,Total threads: 4
Dashboard: http://10.0.19.105:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.19.105:36039,
Local directory: /scratch/dask-worker-space/worker-alevdd2_,Local directory: /scratch/dask-worker-space/worker-alevdd2_

0,1
Comm: tls://10.0.19.205:34125,Total threads: 4
Dashboard: http://10.0.19.205:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.19.205:40999,
Local directory: /scratch/dask-worker-space/worker-ppvf94yv,Local directory: /scratch/dask-worker-space/worker-ppvf94yv

0,1
Comm: tls://10.0.19.127:41119,Total threads: 4
Dashboard: http://10.0.19.127:8787/status,Memory: 14.93 GiB
Nanny: tls://10.0.19.127:38923,
Local directory: /scratch/dask-worker-space/worker-2f45iutj,Local directory: /scratch/dask-worker-space/worker-2f45iutj


In [63]:
cluster.scale(15)

In [94]:
client.close()

In [95]:
cluster.close()