# Data exploration and validation

In this exercise we will cover how to use Polars and Pandera to explore, tidy, and validate the data.

## Task 0 - Create a virtual environment

### 🔄 Task

Create a new virtual environment using uv.

### 🧑‍💻 Code

Run the following in the terminal:

```bash
uv venv
source .venv/bin/activate
which python
uv pip sync requirements.txt
```

## Task 1 - load data from database

### 🔄 Task

- Use `polars` to read the data from the database into a Polars dataframe.

### 🧑‍💻 Code

In [None]:
import os
from pathlib import Path

import polars as pl
from posit.connect import Client
from dotenv import load_dotenv

In [None]:
# Get the database credentials
if Path(".env").exists():
    print("loading .env")
    load_dotenv(override=True)

uri = os.environ["DATABASE_URI_PYTHON"]

In [None]:
# Get your username
with Client() as client:
    username = client.me.username

username

Get the vessel verbose data set.

In [None]:
vessel_verbose = pl.read_database_uri(
    query=f"SELECT * FROM {username}_vessel_verbose_raw;",
    uri=uri,
    engine="adbc"
)

vessel_verbose

Get the vessel history data set.

In [None]:
vessel_history = pl.read_database_uri(
    query=f"SELECT * FROM {username}_vessel_history_raw;",
    uri=uri,
    engine="adbc"
)

vessel_history

Get the terminal locations data set.

In [None]:
terminal_locations = pl.read_database_uri(
    query=f"SELECT * FROM {username}_terminal_locations_raw;",
    uri=uri,
    engine="adbc"
)

terminal_locations

Get the terminal weather data set.

In [None]:
terminal_weather = pl.read_database_uri(
    query=f"SELECT * FROM {username}_terminal_weather_raw;",
    uri=uri,
    engine="adbc"
)

terminal_weather

## Task 2 - explore the data

### 🔄 Task

Begin exploring the data. You will want to understand:

- What columns exist in the data?
- How do the two data sets relate to one another?
- What is the type of each column (e.g. string, number, category, date)?
- Which columns could be useful for the model?
- What steps will I need to perform to clean the data?

**Tips**

- Use VS Codes built in data viewer to explore the data.
- If you are more comfortable with Pandas, you can convert the polars dataframe into a pandas dataframe (e.g. `df.to_pandas()`).
- The polars user guide has great docs on how to use polars: https://docs.pola.rs.

🚨 We are not performing feature engineering at this stage. But it is a good time to start thinking about what features you can create from the data.

### 🧑‍💻 Code

#### vessel_history

In [None]:
vessel_history.head(3)

The dates and times are not formatted correctly. We can fix this when we tidy the data.

#### vessel_verbose

In [None]:
vessel_verbose.head(2)

How many different vessels are in the data?

In [None]:
vessel_verbose.select(pl.col("VesselID"), pl.col("VesselName"))

In [None]:
# Verify that each VesselID is unique.
vessel_verbose.get_column("VesselID").n_unique()

In [None]:
vessel_verbose.get_column("VesselID").n_unique() == vessel_verbose.shape[0]

What are all of the numerical columns?

In [None]:
vessel_verbose.select(pl.selectors.numeric()).head()

Some of the date based columns are integers or floats (e.g. `YearBuilt`). During data tidying we could convert them into a proper date type.


What are all of the string columns?

In [None]:
vessel_verbose.select(pl.selectors.string()).head()

- It looks like some missing values are represented with an empty string `""` while others have a `null` value. We may want to make this consistent when we tidy the data.
- Some string columns are measurements that should be converted into numeric types.

How much data is missing?

In [None]:
(
    vessel_verbose.null_count()
    .transpose(include_header=True)
    .rename({"column": "Column Name", "column_0": "Missing Rows"})
    .with_columns(
        ((pl.col("Missing Rows") / vessel_verbose.shape[0]) * 100)
        .round(1)
        .alias("% Missing")
    )
    .sort("Missing Rows", descending=True)
)

#### terminal_locations

In the interest of time, we will not explore the `terminal_locations` data set. But you should explore it in the same way as the other data sets.

In [None]:
terminal_locations.head()

#### terminal_weather

In the interest of time, we will not explore the `terminal_weather` data set. But you should explore it in the same way as the other data sets.

In [None]:
terminal_weather.head()

## Task 3 - Tidy the Data

### 🔄 Task

Now that you have a basic understanding of the data, the next step is to tidy the data.

### 🧑‍💻 Code

#### terminal_locations

In [None]:
terminal_locations.head()

Clean the string values and keep only the desired columns.

In [None]:
terminal_locations_clean = terminal_locations.select(
    pl.col("TerminalName").str.to_lowercase().str.strip_chars(),
    pl.col("TerminalAbbrev").str.to_uppercase().str.strip_chars(),
    pl.col("Latitude"),
    pl.col("Longitude"),
)

terminal_locations_clean

#### terminal_weather

In [None]:
terminal_weather.head()

Tidy strings.

In [None]:
terminal_weather_clean = terminal_weather.with_columns(
    pl.col("timezone").str.to_lowercase().str.strip_chars(),
    pl.col("timezone_abbreviation").str.to_lowercase().str.strip_chars(),
    pl.col("terminal_name").str.to_lowercase().str.strip_chars(),
)

terminal_weather_clean.head()

Tidy datetime.

In [None]:
# Is all of the data GMT?
terminal_weather_clean.get_column("timezone").value_counts()

In [None]:
terminal_weather_clean = terminal_weather_clean.with_columns(
    pl.col("time").str.to_datetime(time_zone="GMT")
)

terminal_weather_clean.head()

#### vessel_verbose

In [None]:
vessel_verbose.head(3)

Convert the length measurements into a numeric value. Again we will use a function to capture this complex logic.

In [None]:
def convert_measurement_string_to_inches(series: pl.Series) -> pl.Series:
    """
    Convert the measurement string into a float.
    """
    feet = series.str.extract(r"(\d+)'").cast(pl.Int64)
    inches = series.str.extract(r'(\d+)"').cast(pl.Int64).fill_null(0)
    total_inches = feet * 12 + inches
    return total_inches

In [None]:
convert_measurement_string_to_inches(pl.Series(['''78' 8"''']))

In [None]:
convert_measurement_string_to_inches(pl.Series(["""64'""", '''100' 11"''']))

In [None]:
vessel_verbose_clean = vessel_verbose.with_columns(
    pl.col("Beam", "Length", "Draft")
    .map_batches(convert_measurement_string_to_inches)
    .name.suffix("Inches"),
).select(pl.col("*").exclude(["Beam", "Length", "Draft"]))

In [None]:
vessel_verbose_clean.head()

Fix the year columns.

In [None]:
vessel_verbose_clean = vessel_verbose_clean.with_columns(
    pl.col("YearBuilt").cast(pl.String).str.to_date("%Y"),
    pl.col("YearRebuilt").cast(pl.Int64).cast(pl.String).str.to_date("%Y"),
)

In [None]:
vessel_verbose_clean.select("YearBuilt", "YearRebuilt")

Handle missing values for strings.

In [None]:
vessel_verbose_clean = vessel_verbose_clean.with_columns(
    pl.col(pl.String).replace(" ", None),
)

In [None]:
vessel_verbose_clean.head(2)

Normalize all of the string columns so that they are consistent.

In [None]:
vessel_verbose_clean = vessel_verbose_clean.with_columns(
    (
        pl.col("VesselName", "VesselAbbrev", "ClassName", "CityBuilt", "PropulsionInfo")
        .str.to_lowercase()
        .str.strip_chars()
    )
)

In [None]:
(
    vessel_verbose_clean
    .select("VesselName", "VesselAbbrev", "ClassName", "CityBuilt", "PropulsionInfo")
    .head()
)

#### vessel_history

In [None]:
vessel_history.head(2)

Convert the datetimes from strings to polars datetime objects. The logic is pretty complex. So we will abstract it into a function that we can apply to all of the required columns.

In [None]:
def convert_string_to_datetime(series: pl.Series) -> pl.Series:
    """
    Convert the datetime format from wadot into a datetime format that polars
    can understand.

    >>> convert_string_to_datetime(pl.Series(['/Date(1714547700000-0700)/']))
    shape: (1,)
    Series: '' [datetime[μs, UTC]]
    [
        2024-05-01 07:15:00 UTC
    ]
    """
    # Extract the unix time stamp. To work with polars we need the time
    # the number of seconds since 1970-01-01 00:00 UTC, so divide by
    # 1_000.
    unix_timestamp = (
        (series.str.extract(r"/Date\((\d{13})[-+]").cast(pl.Int64) / 1_000)
        .cast(pl.Int64)
        .cast(pl.String)
    )
    # Extract the timezone.
    timezone = series.str.extract(r"([-+]\d{4})")
    # Create a new series that has the timestamp and timezone.
    clean_timestamp = unix_timestamp + timezone
    # Convert into a datetime.
    datetime_series = clean_timestamp.str.to_datetime("%s%z")
    return datetime_series

In [None]:
convert_string_to_datetime(pl.Series(["/Date(1714547700000-0700)/"]))

In [None]:
vessel_history_clean = vessel_history.with_columns(
    (
        pl.col("ScheduledDepart", "ActualDepart", "EstArrival", "Date").map_batches(
            convert_string_to_datetime
        )
    )
)

In [None]:
vessel_history_clean.head()

Normalize all of the string columns so that they are consistent.

In [None]:
vessel_history_clean = vessel_history_clean.with_columns(
    pl.col("Vessel", "Departing", "Arriving").str.to_lowercase().str.strip_chars()
)

In [None]:
vessel_history_clean.select("Vessel", "Departing", "Arriving").head()

It was identified that many rows have no "Arriving" terminal, or "EstArrival" date.

In [None]:
(
    vessel_history_clean.filter(
        pl.col("Arriving").is_null() | pl.col("EstArrival").is_null()
    )
)

We will assume that it means these ferries were cancelled and drop these rows.

In [None]:
vessel_history_clean = vessel_history_clean.filter(
    pl.col("Arriving").is_not_null(),
    pl.col("EstArrival").is_not_null()
)

In [None]:
vessel_history_clean

Correct the names of the "Departing" and "Arriving" terminals so that they match the values in the `terminal_locations` data set.

In [None]:
terminal_name_mapping = {
    "anacortes": "anacortes",
    "bainbridge": "bainbridge island",
    "bremerton": "bremerton",
    "clinton": "clinton",
    "colman": "seattle",
    "edmonds": "edmonds",
    "fauntleroy": "fauntleroy",
    "friday harbor": "friday harbor",
    "keystone": "coupeville",
    "kingston": "kingston",
    "lopez": "lopez island",
    "mukilteo": "mukilteo",
    "orcas": "orcas island",
    "port townsend": "port townsend",
    "pt. defiance": "point defiance",
    "shaw": "shaw island",
    "sidney b. c.": "sidney b.c.",
    "southworth": "southworth",
    "tahlequah": "tahlequah",
    "vashon": "vashon island",
}

In [None]:
terminal_name_mapping_df = pl.DataFrame(
    {
        "OldName": terminal_name_mapping.keys(),
        "CorrectName": terminal_name_mapping.values(),
    }
)

terminal_name_mapping_df

Tidy the vessel history data so that the terminals are named consistently across all data sets.

In [None]:
vessel_history_clean = (
    vessel_history_clean.join(
        terminal_name_mapping_df,
        left_on="Departing",
        right_on="OldName",
        how="left",
        validate="m:1",
        coalesce=True
    )
    .rename({"CorrectName": "DepartingCorrected"})
    .join(
        terminal_name_mapping_df,
        left_on="Arriving",
        right_on="OldName",
        how="left",
        validate="m:1",
        coalesce=True
    )
    .rename({"CorrectName": "ArrivingCorrected"})
    .drop(["Departing", "Arriving"])
    .rename({"DepartingCorrected": "Departing", "ArrivingCorrected": "Arriving"})
    .select(
        [
            "VesselId",
            "Vessel",
            "Departing",
            "Arriving",
            "ScheduledDepart",
            "ActualDepart",
            "EstArrival",
            "Date",
        ]
    )
)

In [None]:
vessel_history_clean.head()

Tidy the vessel history data so that the relationship between vessel history and vessel verbose is correct.

It was identified that joins based on `VesselId` are not complete.

In [None]:
(
    vessel_history_clean.join(
        vessel_verbose_clean,
        left_on="VesselId",
        right_on="VesselID",
        how="left",
        validate="m:1",
        coalesce=True
    )
    # Filter to show all of the rows where there was no matching Vessel ID in the
    # vessel_verbose_clean DataFrame.
    .filter(pl.col("VesselAbbrev").is_null())
)

However, joins based on `Vessel` and `VesselName` are complete.

In [None]:
(
    vessel_history_clean.join(
        vessel_verbose_clean,
        left_on="Vessel",
        right_on="VesselName",
        how="left",
        validate="m:1",
        coalesce=True
    )
    # Filter to show all of the rows where there was no matching Vessel ID in the
    # vessel_verbose_clean DataFrame. If no rows are returned, then the join was
    # successful.
    # .filter(pl.col("VesselAbbrev").is_null())
)

Therefore we should drop the `VesselId` from the data since it is not correct or useful.

In [None]:
vessel_history_clean = vessel_history_clean.drop("VesselId")

In [None]:
vessel_history_clean.head()

## Task 4 - Validate the Data

### 🔄 Task

In the previous activity we tidied the dataset. For some projects, this may be enough. However, for this project we plan to refresh the data on a regular basis. We would like to gain additional comfort that the data we are using is correct. Data validation can help prove that our data tidying was correct, and find any potential issues if the upstream data changes.

[Pandera](https://pandera.readthedocs.io/en/stable/) is a Python library for validating Pandas dataframes. There are two steps:

1. Define a schema for your data. For example:
   - Define the type for each column
   - Confirm if null values are allowed
   - Define custom checks
2. Run your data through the schema validator.

You will find these links useful when defining your schema:

- Polars data validation guide: https://pandera.readthedocs.io/en/stable/polars.html#usage
- Polars data types: https://pandera.readthedocs.io/en/stable/reference/dtypes.html#polars-dtypes
- `pa.Field` API: https://pandera.readthedocs.io/en/stable/reference/generated/pandera.api.dataframe.model_components.Field.html#pandera.api.dataframe.model_components.Field
- List of built in checks you can use with `pa.Field`: https://pandera.readthedocs.io/en/stable/reference/generated/pandera.api.checks.Check.html#pandera.api.checks.Check

Before working on our real data, lets play around with a "toy" example. Take a few minutes and play around with the example below:

- Can you run the code as is?
- Try channging some of the values in the DataFrame so that the schema validation fails.
- Try updating the schema so that it passes again.

In [None]:
import pandera.polars as pa

In [None]:
# data to validate
df = pl.DataFrame({
    "column1": [1, 11, 0, 10, 9],
    "column2": [-1.3, -1.4, -2.9, -10.1, -5.2],
    "column3": ["value_1", "value_2", "value_3", "value_2", "value_1"],
})

df

In [None]:
class ToySchema(pa.DataFrameModel):
    column1: int = pa.Field(le=11)
    column2: float = pa.Field(lt=1.2)
    column3: str = pa.Field(str_startswith="value_")

    @pa.check("column3")
    def has_one_underscore(cls, data: pa.PolarsData) -> pl.LazyFrame:
        list_is_len_2 = (
            data
            .lazyframe
            .select(
                pl.col(data.key).str.split("_").list.len() == 2
            )
        )
        # print(list_is_len_2.collect())
        return list_is_len_2

In [None]:
ToySchema.validate(df)

### 🧑‍💻 Code

#### vessel_history

Start by validating the `vessel_history` data set. As a reminder, here is what the data looks like:

In [None]:
vessel_history_clean.head(3)

The class below defines the schema and checks for the `vessel_history` data set.

- Each column is a class attribute. At a minimum, we define the column type (e.g. int, str, datetime, etc.)
- For some columns, we use `pa.Field` to add more checks. For example in the `EstArrival` column we are going to allow nullable values.
- We can define additional and more complex column and dataframe level checks by defining class methods.

In [None]:
from pandera.engines.polars_engine import DateTime, Date, Int64

In [None]:
class VesselHistorySchema(pa.DataFrameModel):
    Vessel: str
    Departing: str
    Arriving: str
    ScheduledDepart: DateTime = pa.Field(dtype_kwargs={"time_zone": "UTC"})
    ActualDepart: DateTime = pa.Field(dtype_kwargs={"time_zone": "UTC"})
    EstArrival: DateTime = pa.Field(dtype_kwargs={"time_zone": "UTC"})
    Date: DateTime = pa.Field(
        dtype_kwargs={"time_zone": "UTC"},
        ge=pl.datetime(2024, 3, 1, time_zone="America/Vancouver").dt.convert_time_zone(
            "UTC"
        ),
    )

    @pa.dataframe_check
    def year_of_date_matches_scheduled_depart(cls, df: pa.PolarsData) -> pl.LazyFrame:
        """
        Verify that the year of the Date column matches the year of the
        ScheduledDepart column.
        """
        return df.lazyframe.select(
            pl.col("Date").dt.year().eq(pl.col("ScheduledDepart").dt.year())
        )

    @pa.dataframe_check(raise_warning=True)
    def estimated_arrival_is_after_scheduled_depart(
        cls, df: pa.PolarsData
    ) -> pl.LazyFrame:
        """
        Verify that the EstArrival date time is always after the ScheduledDepart
        date time.

        Note this check is expected to fail, therefore raise_warning=True is
        used. In the future we should go back and understand why this check
        fails.
        """
        return df.lazyframe.select(pl.col("EstArrival").ge(pl.col("ScheduledDepart")))

    @pa.check("Vessel")
    def vessel_in_vessel_verbose_data_set(cls, data: pa.PolarsData) -> pl.LazyFrame:
        """
        Verify that all of the vessels in the vessel history data set also exist
        in the vessel verbose data set.

        Note this check is expected to fail, therefore raise_warning=True is
        used. In the future we should go back and understand why this check
        fails.

        """
        vessel_names = vessel_verbose_clean.get_column("VesselName").to_list()
        return data.lazyframe.select(pl.col(data.key).is_in(vessel_names))

    @pa.check("Departing")
    def departing_terminal_in_terminal_data(cls, data: pa.PolarsData) -> pl.LazyFrame:
        """
        Verify that all of the vessels in the vessel history data set also exist
        in the vessel verbose data set.

        Note this check is expected to fail, therefore raise_warning=True is
        used. In the future we should go back and understand why this check
        fails.
        """
        terminals = terminal_locations_clean.get_column("TerminalName").to_list()
        return data.lazyframe.select(pl.col(data.key).is_in(terminals))

    @pa.check("Arriving")
    def arriving_terminal_in_terminal_data(cls, data: pa.PolarsData) -> pl.LazyFrame:
        """
        Verify that all of the vessels in the vessel history data set also exist
        in the vessel verbose data set.

        Note this check is expected to fail, therefore raise_warning=True is
        used. In the future we should go back and understand why this check
        fails.
        """
        terminals = terminal_locations_clean.get_column("TerminalName").to_list()
        return data.lazyframe.select(pl.col(data.key).is_in(terminals))

To validate the data, run the dataframe through the `pa.DataFrameModel.validate` method.

In [None]:
VesselHistorySchema.validate(vessel_history_clean)

- Are there any more checks that you would add?
- How should we handle the data that fails the two checks that raise a warning instead of fail?
- Try changing some of the validations so that they fail? Are you able to use the failure message to identify the bad data?

#### vessel_verbose

In the interest of time, we will "skim" over the validation of the `vessel_verbose` data set. The class below defines the schema and checks for the `vessel_verbose` data set.

*💁 Note: time permitting walk the learners through using multiple cursors and split editors in VS Code and how they can be used to quickly create the code for the DataFrame model.*

In [None]:
# Get all the columns into a format we can copy.
vessel_verbose_clean.columns

In [None]:
# Get each column as a single row, and show the first two examples. This is easy
# to read when I am trying to suss out the correct column type and checks.
pl.Config.set_tbl_rows(50)
vessel_verbose_clean.head(2).transpose(include_header=True)

In [None]:
pl.Config.set_tbl_rows(10)
vessel_verbose_clean.head(2)

In [None]:
class VesselVerboseSchema(pa.DataFrameModel):
    VesselID: int
    VesselSubjectID: int
    VesselName: str = pa.Field(unique=True)
    VesselAbbrev: str
    ClassID: int
    ClassName: str
    ClassSubjectID: int
    DrawingImg: str
    PublicDisplayName: str
    SilhouetteImg: str
    SortSeq: int
    Status: int
    OwnedByWSF: bool
    CarDeckRestroom: bool
    CarDeckShelter: bool
    Elevator: bool
    ADAAccessible: bool
    MainCabinGalley: bool
    MainCabinRestroom: bool
    PublicWifi: bool
    ADAInfo: str
    AdditionalInfo: str = pa.Field(nullable=True)
    VesselNameDesc: str
    VesselHistory: str = pa.Field(nullable=True)
    CityBuilt: str
    SpeedInKnots: int
    EngineCount: int
    Horsepower: int
    MaxPassengerCount: int
    PassengerOnly: bool
    FastFerry: bool
    PropulsionInfo: str
    TallDeckClearance: int
    RegDeckSpace: int
    TallDeckSpace: int
    Tonnage: int
    Displacement: int
    YearBuilt: Date
    YearRebuilt: Date = pa.Field(nullable=True)
    SolasCertified: bool
    MaxPassengerCountForInternational: int = pa.Field(nullable=True)
    BeamInches: int
    LengthInches: int
    DraftInches: int = pa.Field(nullable=True)

    @pa.check("DrawingImg")
    def validate_urls(cls, data: pa.PolarsData) -> pl.LazyFrame:
        return data.lazyframe.select(pl.col(data.key).str.starts_with("https://"))

In [None]:
VesselVerboseSchema.validate(vessel_verbose_clean)

#### terminal_locations

In the interest of time, we will "skim" over the validation of the this data.

In [None]:
class TerminalLocationsSchema(pa.DataFrameModel):
    TerminalName: str = pa.Field(unique=True)
    TerminalAbbrev: str
    Latitude: float = pa.Field(ge=-90.0, le=90.0)
    Longitude: float = pa.Field(ge=-180.0, le=180.0)

In [None]:
TerminalLocationsSchema.validate(terminal_locations_clean)

#### terminal_weather

In the interest of time, we will "skim" over the validation of the this data.

In [None]:
terminal_weather_clean.head()

In [None]:
class TerminalWeatherSchema(pa.DataFrameModel):
    latitude: float = pa.Field(ge=-90.0, le=90.0)
    longitude: float = pa.Field(ge=-180.0, le=180.0)
    generationtime_ms: float
    utc_offset_seconds: int
    timezone: str = pa.Field(eq="gmt")
    timezone_abbreviation: str = pa.Field(eq="gmt")
    elevation: float
    time: DateTime = pa.Field(dtype_kwargs={"time_zone": "GMT"}, nullable=True)
    weather_code: int
    temperature_2m: float
    precipitation: float
    cloud_cover: int = pa.Field(ge=0, le=100)
    wind_speed_10m: float
    wind_direction_10m: int = pa.Field(ge=0, le=360)
    wind_gusts_10m: float
    terminal_name: str


In [None]:
TerminalWeatherSchema.validate(terminal_weather_clean)

## Task 5 - Write Data to the database

### 🔄 Task

Save the clean data to Posit Connect as a pin.

### 🧑‍💻 Code

Establish a connection to the database.

In [None]:
# Get the database credentials
if Path(".env").exists():
    print("loading .env")
    load_dotenv()

uri = os.environ["DATABASE_URI_PYTHON"]

#### vessel_history

In [None]:
# Write data to the database
vessel_history_clean.write_database(
    table_name=f"{username}_vessel_history_clean",
    connection=uri,
    engine="adbc",
    if_table_exists='replace'
)

In [None]:
# Test that you can read the data
pl.read_database_uri(
    query=f"SELECT * FROM {username}_vessel_history_clean LIMIT 5;",
    uri=uri,
    engine="adbc"
)

#### vessel_verbose

In [None]:
# Write data to the database
vessel_verbose_clean.write_database(
    table_name=f"{username}_vessel_verbose_clean",
    connection=uri,
    engine="adbc",
    if_table_exists='replace'
)

In [None]:
# Test that you can read the data
pl.read_database_uri(
    query=f"SELECT * FROM {username}_vessel_verbose_clean LIMIT 5;",
    uri=uri,
    engine="adbc"
)

#### terminal_locations

In [None]:
# Write data to the database
terminal_locations_clean.write_database(
    table_name=f"{username}_terminal_locations_clean",
    connection=uri,
    engine="adbc",
    if_table_exists='replace'
)

In [None]:
# Test that you can read the data
pl.read_database_uri(
    query=f"SELECT * FROM {username}_terminal_locations_clean LIMIT 5;",
    uri=uri,
    engine="adbc"
)

#### terminal_weather

In [None]:
# Write data to the database
terminal_weather_clean.write_database(
    table_name=f"{username}_terminal_weather_clean",
    connection=uri,
    engine="adbc",
    if_table_exists='replace'
)

In [None]:
# Test that you can read the data
pl.read_database_uri(
    query=f"SELECT * FROM {username}_terminal_weather_clean LIMIT 5;",
    uri=uri,
    engine="adbc"
)

## Task 6 - Set up email with Posit Connect and Quarto

### 🔄 Task

Posit Connect has support for sending emails with Quarto: https://docs.posit.co/connect/user/quarto/#email-customization.

Generate an email to update all the email recipients on the status of the new data.

**Tips**

Run the following in the terminal to preview the email:

```bash
quarto render notebook.ipynb --execute --output-dir tmp
```

The open `tmp/email-preview/index.html` to preview the email.

### 🧑‍💻 Code

Define the variable data.

In [None]:
import datetime

todays_date = datetime.datetime.now().strftime("%Y-%m-%d")
todays_date

Create your email template.

Seattle Ferry Data Validation Report for `{python} todays_date`

**Update**

The Seattle Ferry data has been updated and validated. The following data sets are available for your use:

- Terminal Locations: `{python} f"{username}_terminal_locations_clean"`
- Vessel Verbose: `{python} f"{username}_vessel_verbose_clean"`
- Vessel History: `{python} f"{username}_vessel_history_clean"`

**Terminal Locations**

- `{python} f"{terminal_locations_clean.shape[0]:,}"` rows
- `{python} f"{terminal_locations_clean.shape[1]:,}"` columns
- Terminals: `{python} ", ".join(terminal_locations_clean['TerminalName'].unique())`

**Vessel Verbose**

- `{python} f"{vessel_verbose_clean.shape[0]:,}"` rows
- `{python} f"{vessel_verbose_clean.shape[1]:,}"` columns
- Vessels: `{python} ", ".join(vessel_verbose_clean['VesselName'].unique())`

**Vessel History**

- `{python} f"{vessel_history_clean.shape[0]:,}"` rows
- `{python} f"{vessel_history_clean.shape[1]:,}"` columns

In [None]:
# | echo: false
import matplotlib.pyplot as plt
import matplotlib as mpl

ax = (
    vessel_history_clean.with_columns(
        pl.col("Date").dt.date().dt.month_start().alias("Month"),
    )
    .group_by("Month")
    .agg(pl.col("Vessel").count().alias("Trips"))
    .sort("Month")
    .to_pandas()
    .plot(
        x="Month",
        y="Trips",
        title="Trips by Month",
    )
)

ax.yaxis.set_major_formatter(mpl.ticker.StrMethodFormatter("{x:,.0f}"))

plt.show()

## Task 7 - publish notebook as Quarto document to Posit Connect

### 🔄 Task

Deploy the notebook to Posit Connect as a Quarto document.

### 🧑‍💻 Code

**Posit Publisher**

Deploy using the Posit Publisher VS Code extension. Check that you have the required environment variables:

```bash
source .env
echo $CONNECT_SERVER
echo $CONNECT_API_KEY
```

**rsconnect CLI**

Run the following to deploy the notebook to Connect:

```bash
# Check that you have the required environment variables set
source .env
echo $CONNECT_SERVER
echo $CONNECT_API_KEY
echo $DATABASE_URI_PYTHON

# Publish the notebook
rsconnect deploy quarto --title "Seattle Ferries #2 - Data exploration and validation" -E DATABASE_URI_PYTHON notebook.ipynb
```

After the deployment is successful:

- Share the notebook with the person beside you.
- Schedule the notebook to run once every week.

In [None]:
print("Notebook complete ✅")