# Citibike Project - Normalizing the Datasets

Normalizing the bike share datsets from `03_normalizing_datasets.ipynb` to prepare for SQL database creation.

## Imports

In [1]:
import pandas as pd
import numpy as np

import polars as pl

import pyarrow as pa
import pyarrow.parquet as pq

import feather

from matplotlib import pyplot as plt

import os

## Files that we'll be working on: 
Two .CSVs, `group1` is ~10Gb and `group2` is ~20Gb

In [2]:
group1_location='/Users/sra/files/projects/citibike_project/combined/group1_combined/group1.csv'
group2_location='/Users/sra/files/projects/citibike_project/combined/group2_combined/group2.csv'

### Let's explore group1 first:

In [3]:
q=(
    pl.scan_csv(group1_location,ignore_errors=True,try_parse_dates=True)
)

group1_pl=q.collect()

In [4]:
group1_pl.head()

ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
str,str,datetime[μs],datetime[μs],str,f64,str,f64,f64,f64,f64,f64,str
"""26A3DC47FE0EA3…","""docked_bike""",2021-05-13 12:48:08,2021-05-13 13:07:37,"""Broadway & W 2…",6173.08,"""E 2 St & Avenu…",5515.02,40.742868,-73.989186,40.722174,-73.983688,"""member"""
"""A99F2E1D627B08…","""docked_bike""",2021-05-16 08:30:13,2021-05-16 08:45:47,"""46 Ave & 5 St""",6286.02,"""34th Ave & Ver…",6873.01,40.74731,-73.95451,40.765354,-73.939863,"""member"""
"""43E79A45997B73…","""docked_bike""",2021-05-01 08:38:14,2021-05-01 08:54:27,"""46 Ave & 5 St""",6286.02,"""34th Ave & Ver…",6873.01,40.74731,-73.95451,40.765354,-73.939863,"""member"""
"""8B3CC649F4F588…","""docked_bike""",2021-05-09 08:12:31,2021-05-09 08:27:05,"""46 Ave & 5 St""",6286.02,"""34th Ave & Ver…",6873.01,40.74731,-73.95451,40.765354,-73.939863,"""member"""
"""7101C98F057486…","""docked_bike""",2021-05-27 07:52:27,2021-05-27 08:09:01,"""E 123 St & Lex…",7636.05,"""1 Ave & E 78 S…",7020.09,40.802926,-73.9379,40.771404,-73.953517,"""member"""


In [5]:
group1_pl.groupby('start_station_name').count()\
.sort('count',descending=True)\
.head()

start_station_name,count
str,u32
"""W 21 St & 6 Av…",247908
"""West St & Cham…",219580
"""1 Ave & E 68 S…",208483
"""6 Ave & W 33 S…",197135
"""Broadway & W 2…",194687


There are some junk stations in the dataset:

In [6]:
q=(
    group1_pl.lazy()
    .groupby('start_station_name')
    .agg(pl.count('start_station_name').alias('count'))
    .filter(
        (pl.col('count')>=10)
    )
    .sort('count',descending=True)
    # .limit(5)
)

group1_pl_count=q.collect()
print(group1_pl_count)

shape: (2_027, 2)
┌──────────────────────────┬────────┐
│ start_station_name       ┆ count  │
│ ---                      ┆ ---    │
│ str                      ┆ u32    │
╞══════════════════════════╪════════╡
│ W 21 St & 6 Ave          ┆ 247908 │
│ West St & Chambers St    ┆ 219580 │
│ 1 Ave & E 68 St          ┆ 208483 │
│ 6 Ave & W 33 St          ┆ 197135 │
│ …                        ┆ …      │
│ Apache                   ┆ 15     │
│ Prototype Lab            ┆ 15     │
│ Rogers Pl & E 165 St_old ┆ 11     │
│ 4455.10                  ┆ 11     │
└──────────────────────────┴────────┘


However, these junk stations are such a small part of the dataset that I think it's ok to include.

In [7]:
q=(
    group1_pl.lazy()
    .groupby('start_station_id')
    .agg(pl.count('start_station_id').alias('count'))
    # .filter(
        # (pl.col('count')>=1)
    # )
    .sort('count',descending=True)
    # .limit(5)
)

group1_pl_count=q.collect()
print(group1_pl_count)

shape: (1_927, 2)
┌──────────────────┬────────┐
│ start_station_id ┆ count  │
│ ---              ┆ ---    │
│ f64              ┆ u32    │
╞══════════════════╪════════╡
│ 6140.05          ┆ 247908 │
│ 5329.03          ┆ 219580 │
│ 6822.09          ┆ 208483 │
│ 6364.07          ┆ 197135 │
│ …                ┆ …      │
│ 5548.01          ┆ 44     │
│ 3704.01          ┆ 33     │
│ 4014.01          ┆ 30     │
│ 8419.03          ┆ 2      │
└──────────────────┴────────┘


## Normalize `group1`

### Ride table:

#### Normalize the `rideable_type` and `member_casual` categories:

In [11]:
group1_pl_intermediate=\
group1_pl.with_columns(
    pl.when(pl.col('member_casual') == "'member'")
    .then(pl.lit(0))
    .otherwise(pl.lit(1))
    .alias('ridertype')
)

group1_pl_intermediate.head()

ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual,ridertype
str,str,datetime[μs],datetime[μs],str,f64,str,f64,f64,f64,f64,f64,str,i32
"""26A3DC47FE0EA3…","""docked_bike""",2021-05-13 12:48:08,2021-05-13 13:07:37,"""Broadway & W 2…",6173.08,"""E 2 St & Avenu…",5515.02,40.742868,-73.989186,40.722174,-73.983688,"""member""",1
"""A99F2E1D627B08…","""docked_bike""",2021-05-16 08:30:13,2021-05-16 08:45:47,"""46 Ave & 5 St""",6286.02,"""34th Ave & Ver…",6873.01,40.74731,-73.95451,40.765354,-73.939863,"""member""",1
"""43E79A45997B73…","""docked_bike""",2021-05-01 08:38:14,2021-05-01 08:54:27,"""46 Ave & 5 St""",6286.02,"""34th Ave & Ver…",6873.01,40.74731,-73.95451,40.765354,-73.939863,"""member""",1
"""8B3CC649F4F588…","""docked_bike""",2021-05-09 08:12:31,2021-05-09 08:27:05,"""46 Ave & 5 St""",6286.02,"""34th Ave & Ver…",6873.01,40.74731,-73.95451,40.765354,-73.939863,"""member""",1
"""7101C98F057486…","""docked_bike""",2021-05-27 07:52:27,2021-05-27 08:09:01,"""E 123 St & Lex…",7636.05,"""1 Ave & E 78 S…",7020.09,40.802926,-73.9379,40.771404,-73.953517,"""member""",1


In [12]:
group1_pl_intermediate=\
group1_pl_intermediate.with_columns(
        pl.when(pl.col('rideable_type') == "'classic_bike'")
        .then(pl.lit(0))
        .when(pl.col('rideable_type') == "'electric_bike'")
        .then(pl.lit(1))
        .otherwise(pl.lit(2))
        .alias('biketype')
)

group1_pl_intermediate.head()

ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual,ridertype,biketype
str,str,datetime[μs],datetime[μs],str,f64,str,f64,f64,f64,f64,f64,str,i32,i32
"""26A3DC47FE0EA3…","""docked_bike""",2021-05-13 12:48:08,2021-05-13 13:07:37,"""Broadway & W 2…",6173.08,"""E 2 St & Avenu…",5515.02,40.742868,-73.989186,40.722174,-73.983688,"""member""",1,2
"""A99F2E1D627B08…","""docked_bike""",2021-05-16 08:30:13,2021-05-16 08:45:47,"""46 Ave & 5 St""",6286.02,"""34th Ave & Ver…",6873.01,40.74731,-73.95451,40.765354,-73.939863,"""member""",1,2
"""43E79A45997B73…","""docked_bike""",2021-05-01 08:38:14,2021-05-01 08:54:27,"""46 Ave & 5 St""",6286.02,"""34th Ave & Ver…",6873.01,40.74731,-73.95451,40.765354,-73.939863,"""member""",1,2
"""8B3CC649F4F588…","""docked_bike""",2021-05-09 08:12:31,2021-05-09 08:27:05,"""46 Ave & 5 St""",6286.02,"""34th Ave & Ver…",6873.01,40.74731,-73.95451,40.765354,-73.939863,"""member""",1,2
"""7101C98F057486…","""docked_bike""",2021-05-27 07:52:27,2021-05-27 08:09:01,"""E 123 St & Lex…",7636.05,"""1 Ave & E 78 S…",7020.09,40.802926,-73.9379,40.771404,-73.953517,"""member""",1,2


In [13]:
group1_pl_ridenorm=group1_pl_intermediate.select(pl.col('*').exclude(\
'rideable_type',
'member_casual',
'start_station_name',
'end_station_name',
'start_lat',
'end_lat',
'start_lng',
'end_lng'))

group1_pl_ridenorm.head()

ride_id,started_at,ended_at,start_station_id,end_station_id,ridertype,biketype
str,datetime[μs],datetime[μs],f64,f64,i32,i32
"""26A3DC47FE0EA3…",2021-05-13 12:48:08,2021-05-13 13:07:37,6173.08,5515.02,1,2
"""A99F2E1D627B08…",2021-05-16 08:30:13,2021-05-16 08:45:47,6286.02,6873.01,1,2
"""43E79A45997B73…",2021-05-01 08:38:14,2021-05-01 08:54:27,6286.02,6873.01,1,2
"""8B3CC649F4F588…",2021-05-09 08:12:31,2021-05-09 08:27:05,6286.02,6873.01,1,2
"""7101C98F057486…",2021-05-27 07:52:27,2021-05-27 08:09:01,7636.05,7020.09,1,2


### Station table:

In [9]:
group1_pl_stationnorm=group1_pl.select(pl.col('*').exclude('ride_id',
                                         'rideable_type',
                                         'started_at',
                                         'ended_at',
                                         'member_casual'))

print(group1_pl_stationnorm)

shape: (55_800_085, 8)
┌────────────┬────────────┬────────────┬────────────┬───────────┬─────────┬───────────┬────────────┐
│ start_stat ┆ start_stat ┆ end_statio ┆ end_statio ┆ start_lat ┆ start_l ┆ end_lat   ┆ end_lng    │
│ ion_name   ┆ ion_id     ┆ n_name     ┆ n_id       ┆ ---       ┆ ng      ┆ ---       ┆ ---        │
│ ---        ┆ ---        ┆ ---        ┆ ---        ┆ f64       ┆ ---     ┆ f64       ┆ f64        │
│ str        ┆ f64        ┆ str        ┆ f64        ┆           ┆ f64     ┆           ┆            │
╞════════════╪════════════╪════════════╪════════════╪═══════════╪═════════╪═══════════╪════════════╡
│ Broadway & ┆ 6173.08    ┆ E 2 St &   ┆ 5515.02    ┆ 40.742868 ┆ -73.989 ┆ 40.722174 ┆ -73.983688 │
│ W 25 St    ┆            ┆ Avenue B   ┆            ┆           ┆ 186     ┆           ┆            │
│ 46 Ave & 5 ┆ 6286.02    ┆ 34th Ave & ┆ 6873.01    ┆ 40.74731  ┆ -73.954 ┆ 40.765354 ┆ -73.939863 │
│ St         ┆            ┆ Vernon     ┆            ┆           ┆ 51

### Biketype table:

#### First, I'll need to see how many types of rideable types are there:

In [44]:
q=(
    group1_pl.lazy()
    .groupby('rideable_type')
    .agg(pl.count('rideable_type').alias('rideable_type_count'))
    .sort('rideable_type_count',descending=True)
)

group1_pl_rideable_type=q.collect()

q=(
    group1_pl_rideable_type.lazy()
    .with_columns([pl.col("rideable_type_count").sum().alias('sum')])
)

group1_pl_rideable_type=q.collect()

print(group1_pl_rideable_type)

shape: (3, 3)
┌───────────────┬─────────────────────┬──────────┐
│ rideable_type ┆ rideable_type_count ┆ sum      │
│ ---           ┆ ---                 ┆ ---      │
│ str           ┆ u32                 ┆ u32      │
╞═══════════════╪═════════════════════╪══════════╡
│ classic_bike  ┆ 34010263            ┆ 55800085 │
│ electric_bike ┆ 13092408            ┆ 55800085 │
│ docked_bike   ┆ 8697414             ┆ 55800085 │
└───────────────┴─────────────────────┴──────────┘


There are about three times as many classic bike trips used than electric bikes used. The total amount of trips equals the total rows of the dataset.

#### _Switch to an environment that has the `mysql-connector` module installed and imported_

In [36]:
import mysql.connector
import os

In [44]:
flow_control='/Users/sra/files/projects/citibike_project/combined/group1_combined/flow_control/biketype'
flag=False

if not os.path.exists(flow_control):
    os.mkdir(flow_control)
    flag=True

if flag:
    # Establish a connection to the MySQL database
    conn = mysql.connector.connect(
        host='localhost',
        user='root',
        password='rootroot',
        database='citibike_project'
    )

    # Create a cursor object to execute SQL queries
    cursor = conn.cursor()

    # Create the table in the MySQL database with the desired schema
    create_table_query = '''
        CREATE TABLE IF NOT EXISTS biketype (
            id TINYINT,
            type VARCHAR(255)
        )
    '''
    cursor.execute(create_table_query)

    # Define the data as a list of tuples or dictionaries
    data = [
        ('0', 'classic_bike'),
        ('1', 'electric_bike'),
        ('2', 'docked_bike')
    ]

    # Insert the data into the table
    insert_query = '''
        INSERT INTO biketype (id, type)
        VALUES (%s, %s)
    '''

    cursor.executemany(insert_query, data)

    # Commit the changes and close the connection
    conn.commit()
    conn.close()

### Ridertype Table:

#### First, I'll need to see how many types of member-casual groups there are:

In [41]:
q=(
    group1_pl.lazy()
    .groupby('member_casual')
    .agg(pl.count('member_casual').alias('member_casual_count'))
    .sort('member_casual_count',descending=True)
)

group1_pl_rider_type=q.collect()

print(group1_pl_rider_type)

shape: (2, 2)
┌───────────────┬─────────────────────┐
│ member_casual ┆ member_casual_count │
│ ---           ┆ ---                 │
│ str           ┆ u32                 │
╞═══════════════╪═════════════════════╡
│ member        ┆ 42560681            │
│ casual        ┆ 13239404            │
└───────────────┴─────────────────────┘


In [43]:
flow_control='/Users/sra/files/projects/citibike_project/combined/group1_combined/flow_control/ridertype'
flag=False

if not os.path.exists(flow_control):
    os.mkdir(flow_control)
    flag=True

if flag:
    # Establish a connection to the MySQL database
    conn = mysql.connector.connect(
        host='localhost',
        user='root',
        password='rootroot',
        database='citibike_project'
    )

    # Create a cursor object to execute SQL queries
    cursor = conn.cursor()

    # Create the table in the MySQL database with the desired schema
    create_table_query = '''
        CREATE TABLE IF NOT EXISTS ridertype (
            id TINYINT,
            type VARCHAR(255)
        )
    '''
    cursor.execute(create_table_query)

    # Define the data as a list of tuples or dictionaries
    data = [
        ('0', 'member'),
        ('1', 'casual')
    ]

    # Insert the data into the table
    insert_query = '''
        INSERT INTO ridertype (id, type)
        VALUES (%s, %s)
    '''

    cursor.executemany(insert_query, data)

    # Commit the changes and close the connection
    conn.commit()
    conn.close()

Save the following tables. 
* `group1_pl_ridenorm`
* `group1_pl_stationnorm`

They are currently in a `polars` data type:

In [14]:
type(group1_pl_ridenorm)

polars.dataframe.frame.DataFrame

## Convert the `polars` tables into a form that will make it easy to convert to `MySQL`.

This is because the size of the tables warrants compression.

We will convert the `polars` dataframe to a `parquet` dataframe before sending to `MySQL`:

In [19]:
path_to_group1_pl_ridenorm_parquet=\
'/Users/sra/files/projects/citibike_project/combined/group1_combined/group1_pl_ridenorm.parquet'

if not os.path.exists(path_to_group1_pl_ridenorm_parquet):
    group1_pl_ridenorm.write_parquet(path_to_group1_pl_ridenorm_parquet)

In [20]:
path_to_group1_pl_stationnorm_parquet=\
'/Users/sra/files/projects/citibike_project/combined/group1_combined/group1_pl_stationnorm.parquet'

if not os.path.exists(path_to_group1_pl_stationnorm_parquet):
    group1_pl_stationnorm.write_parquet(path_to_group1_pl_stationnorm_parquet)