In [None]:
from google.cloud import storage
import funcy as fn
import pandas as pd
import altair as alt
import plotly.express as px
import json
import re
from typing import Dict
from pydantic import BaseSettings, Field
from collectionish import AttyDict

In [None]:
class GoogleConfig(BaseSettings):
    credentials: str = Field(..., env="GOOGLE_APPLICATION_CREDENTIALS")
    project: str = Field(..., env="GOOGLE_PROJECT")
    bucket: str = Field(..., env="GOOGLE_BUCKET")
    folder: str = Field(..., env="GOOGLE_FOLDER")

class NotebookConfig(BaseSettings):
    google = GoogleConfig()

config = NotebookConfig()

In [None]:
class Storage:
    def __init__(self, project: str, bucket: str):
        self.project = project
        self.bucket_name = bucket
        client = storage.Client(project=project)
    
    @property
    def bucket(self):
        return client.get_bucket(self.bucket_name)
    
    
gcs = Storage(
    project=config.google.project,
    bucket=config.google.bucket
)

In [None]:
files = fn.lmap(lambda blob: blob.name, gcs.bucket.list_blobs(prefix=config.google.folder))
files[0]

In [None]:
blobs = fn.lmap(lambda file: gcs.bucket.get_blob(file), files)
blobs[0]

In [None]:
data = fn.flatten(fn.map(lambda x: x.download_as_string(), blobs))

In [None]:
RECORD_EXPR = re.compile(r"({.+)")
records = fn.flatten(fn.map(lambda x: RECORD_EXPR.search(str(x)).groups()[0].split("\\n"), data))

@fn.ignore(json.JSONDecodeError)
def parse_or_ignore_jsonlines(row: str) -> AttyDict:
    if isinstance(row, str):
        return AttyDict(json.loads(row))
    else:
        return row

In [None]:
parsed = fn.lmap(lambda x: parse_or_ignore_jsonlines(x), records)
len(parsed)

In [None]:
flights = fn.lfilter(lambda x: fn.notnone(x), parsed)
flights[0]

In [None]:
def make_row(row):
    return AttyDict({
        "id": row.flight.number,
        "date": row.flight_date,
        "status": row.flight_status,
        "airport_departed_code": row.departure.iata,
        "airport_departed": row.departure.airport,
        "scheduled_departure": row.departure.scheduled,
        "actual_departure": row.departure.actual,
        "airport_arrival_code": row.arrival.iata,    
        "airport_arrived": row.arrival.airport,
        "scheduled_arrival": row.arrival.scheduled,
        "actual_arrival": row.arrival.actual,    
        "delay": row.departure.delay,
        "airline": row.airline.name
    })

df = pd.DataFrame(fn.lmap(lambda row: make_row(row), flights))
df.head()

In [None]:
df[df.airport_departed_code == "LHR"]

In [None]:
departures = (
    df
    .drop_duplicates("id")
    .groupby("airport_departed")
    .agg(count=("id", "count"))
    .reset_index()
    .sort_values("count")
)
departures.head()

In [None]:
alt.Chart(departures).mark_bar().encode(
    x=alt.X("count:Q"),
    y=alt.Y("airport_departed"),
    tooltip=departures.columns.tolist()
).properties(
    height=700
)