In [2]:
%load_ext autoreload
%autoreload 2

import polars as pl
from data_api import get_raw, get_raw_dask, parse_datetime
import pyarrow.dataset as ds
from pyarrow.fs import GcsFileSystem

gcs = GcsFileSystem()
gcs_nline = "gs://nline-public-data"

start, end ="2023-10-01 00:00", "2023-10-01 02:00"
start = parse_datetime(start)
end = parse_datetime(end)


Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [None]:
import pyarrow.parquet as pq

source = "nline-public-data/ghana/gridwatch_data/2023_period/"

time_filter = [("time", ">=", start), ("time", "<", end)]

dataset = pq.ParquetDataset(source, filesystem=gcs, filters=time_filter)
print(vars(dataset))

In [6]:
import pyarrow.dataset as ds
from pyarrow.fs import GcsFileSystem
import pandas as pd

gcs = GcsFileSystem()
source = "nline-public-data/ghana/gridwatch_data/2023_period/"

# Define your time range
start_time = pd.Timestamp("2023-01-01")
end_time = pd.Timestamp("2023-01-02")

# Create a dataset with a filter
dataset = ds.dataset(source, filesystem=gcs, format="parquet")
filtered_dataset = dataset.filter((ds.field("time") >= start_time) & (ds.field("time") < end_time))

# Read only the filtered data
table = filtered_dataset.to_table()

# Convert to Pandas or Polars if needed
# df = table.to_pandas()
# or
df = pl.from_arrow(table)


In [None]:
df = dataset.read()

In [None]:
df.shape

In [None]:
df

In [None]:
import pyarrow.parquet as pq

# source = "nline-public-data/ghana/gridwatch_data/2023_period/*.parquet"
source = "gs://nline-public-data/ghana/gridwatch_data/2023_period/*.parquet"

dataset = ds.dataset(source, format="parquet")
fragments = list(dataset.get_fragments())

In [None]:
gcs = GcsFileSystem()
source = "gs://nline-public-data/ghana/gridwatch_data/2023_period/*.parquet"

dataset = ds.dataset(source, filesystem=gcs, format="parquet")

In [None]:
from google.cloud import storage

client = storage.Client()
bucket = client.get_bucket("nline-public-data")
blobs = bucket.list_blobs(prefix="ghana/gridwatch_data/2023_period/")
for blob in blobs:
    print(blob.name)

In [None]:
df = get_raw_dask(bucket_name, start_time, end_time)

In [None]:
df = get_raw("ghana/gridwatch_data/full_period/", "2019-10-01 00:00", "2019-10-01 01:00")

In [None]:
df.collect()

In [None]:
from pyarrow.dataset import dataset
import gcsfs
import polars as pl
import pandas as pd
import polars as pl
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from pyarrow.fs import GcsFileSystem

cloudfs = gcsfs.GCSFileSystem()
source = "gs://nline-public-data/ghana/gridwatch_data/full_period/*.parquet"
# reference multiple parquet files

start, end ="2019-10-01 00:00", "2019-10-01 01:00"
start = parse_datetime(start)
end = parse_datetime(end)

# load efficiently into polars
ldf = pl.scan_parquet(source)
ldf = ldf.filter((pl.col("time") >= start) & (pl.col("time") < end))
ldf.collect()

In [None]:

def get_raw(bucket_name, start_time, end_time):
    source = f"{gcs_nline}/{bucket_name}"
    
    # Convert start_time and end_time to datetime objects
    start = parse_datetime(start_time)
    end = parse_datetime(end_time)

    dataset = ds.dataset(source, partitioning="hive")
    
    df = pl.scan_pyarrow_dataset(
        dataset
    )

    df_filtered = df.filter((pl.col("time") >= start) & (pl.col("time") < end))


    return df

# Function to write filtered data to disk
def write_filtered_data(df, output_path):
    df.sink_parquet(output_path)

# Usage example
bucket_name = "ghana/gridwatch_data/full_period/"
start_time = "2019-10-01 00:00"
end_time = "2019-10-01 01:00"
output_path = "filtered_data.parquet"

df = get_raw(bucket_name, start_time, end_time)
# write_filtered_data(df, output_path)


In [None]:
df.head(10)

In [None]:
df.collect()

In [None]:
import dask.dataframe as dd
from datetime import datetime

def get_raw_dask(bucket_name, start_time, end_time):
    source = "gs://nline-public-data/ghana/gridwatch_data/full_period/*.parquet"

    # source = f"{gcs_nline}/{bucket_name}/*.parquet"
    
    # Convert start_time and end_time to datetime objects
    start = parse_datetime(start_time)
    end = parse_datetime(end_time)

    # Read the Parquet dataset
    df = dd.read_parquet(source, engine='pyarrow')

    # Apply time filter
    # df_filtered = df[(df.time >= start) & (df.time < end)]
    print(df.columns)
    df_filtered = df[(df['time'] >= start) & (df['time'] < end)]

    return df_filtered

def write_filtered_data_dask(df, output_path):
    df.to_parquet(output_path)

# Usage example
bucket_name = "ghana/gridwatch_data/full_period/"
start_time = "2019-10-01 00:00"
end_time = "2019-10-01 01:00"
output_path = "filtered_data_dask.parquet"

df = get_raw_dask(bucket_name, start_time, end_time)

In [None]:
pandas_df = df.compute()