In [None]:
docker run --rm -e CLICKHOUSE_USER=luntaixia -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 -e CLICKHOUSE_PASSWORD=luntaixia -p 9000:9000 -p 8123:8123 -v "/home/luntaixia/Downloads/docker-volume-mapping/playground/clickhouse-storage/logs:/var/log/clickhouse-server" -v "/home/luntaixia/Downloads/docker-volume-mapping/playground/clickhouse-storage/data:/var/lib/clickhouse" -v "/home/luntaixia/Downloads/docker-volume-mapping/playground/clickhouse-storage/configs/users.d:/etc/clickhouse-server/users.d" --ulimit nofile=262144:262144 clickhouse/clickhouse-server

In [11]:
import ibis
ibis.options.interactive = True

In [12]:
con = ibis.clickhouse.connect(
    user="luntaixia",
    password="luntaixia",
    host="localhost",
)

In [5]:
con = ibis.connect("clickhouse://play:clickhouse@play.clickhouse.com:443?secure=True")

In [6]:
actors = con.table("actors")
actors

In [10]:
con.list_tables(like = 'b', database = 'b')

[]

In [17]:
con.create_database('default', force = True)

In [157]:
con.get_schema("test")

ibis.Schema {
  name     !string
  snap_dt  timestamp('Europe/London', 0)
  history  !array<!int32>
}

In [168]:
df.dtypes

NAME                      object
GENDER                    object
PHONE                     object
EMAIL                     object
BLOOD_GRP                 object
JOB                       object
ADDRESS                   object
CUST_ID                    int64
OFFICE                    object
ORG                       object
TITLE                     object
SALARY                     int64
BONUS                      int64
BIRTH_DT     datetime64[ns, UTC]
SINCE_DT     datetime64[ns, UTC]
SNAP_DT      datetime64[ns, UTC]
dtype: object

In [197]:
import pandas as pd

df: pd.DataFrame = s3a.read_parquet("/fake/data/FEATURES/CUST/CUST_2024-01-01.parquet")
df['NAME'] = df['NAME'].astype('string')
schema_ = []
for n, t in zip(df.columns, df.dtypes):
    # https://numpy.org/doc/stable/reference/generated/numpy.dtype.kind.html
    if t.kind == 'O':
        schema_.append((n, ibis.expr.datatypes.String()))
    else:
        schema_.append((n, ibis.dtype(t)))
schema(schema_)

ibis.Schema {
  NAME       string
  GENDER     string
  PHONE      string
  EMAIL      string
  BLOOD_GRP  string
  JOB        string
  ADDRESS    string
  CUST_ID    int64
  OFFICE     string
  ORG        string
  TITLE      string
  SALARY     int64
  BONUS      int64
  BIRTH_DT   timestamp('UTC')
  SINCE_DT   timestamp('UTC')
  SNAP_DT    timestamp('UTC')
}

In [182]:
dtypes = df.select_dtypes(exclude = 'object').dtypes

[('CUST_ID', dtype('int64')),
 ('SALARY', dtype('int64')),
 ('BONUS', dtype('int64')),
 ('BIRTH_DT', datetime64[ns, UTC]),
 ('SINCE_DT', datetime64[ns, UTC]),
 ('SNAP_DT', datetime64[ns, UTC])]

In [185]:
Schema.from_pandas(list(zip(dtypes.index, dtypes.values)))

ibis.Schema {
  CUST_ID   int64
  SALARY    int64
  BONUS     int64
  BIRTH_DT  timestamp('UTC')
  SINCE_DT  timestamp('UTC')
  SNAP_DT   timestamp('UTC')
}

In [3]:
import os
import sys
sys.path.append(os.path.dirname(os.getcwd()))

In [136]:
from __future__ import annotations
from typing import List, Tuple, Dict
import logging
from datetime import date
import ibis
from ibis import BaseBackend, Schema
from clickhouse_connect.driver.tools import insert_file
import pandas as pd
import pyspark
from CommonTools.SnapStructure.structure import SnapshotDataManagerBase
from CommonTools.sparker import SparkConnector
from ProviderTools.clickhouse.dbapi import ClickHouse
from CommonTools.utils import dt2str, str2dt

class SnapshotDataManagerCHSQL(SnapshotDataManagerBase):
    """files are saved as clickhouse tables under each schema.table
    """
    @classmethod
    def setup(cls, ch_conf: ClickHouse, spark_connector: SparkConnector = None):
        super(SnapshotDataManagerCHSQL, cls).setup(spark_connector = spark_connector)
        cls._ch_conf = ch_conf
        cls._ops = ibis.clickhouse.connect(
            user=ch_conf.username,
            password=ch_conf.password,
            host=ch_conf.ip,
            port=ch_conf.port,
        )
        
    def __init__(self, schema:str, table:str, snap_dt_key: str):
        """database management interface

        :param schema: schema
        :param table:  table under each schema
        :param snap_dt_key: snap date column name for all tables
        """
        super().__init__(schema = schema, table = table)
        self.snap_dt_key = snap_dt_key
        
    def is_exist(self) -> bool:
        return self.table in self._ops.list_tables(like = self.table, database = self.schema)

    def init_table(self, col_schemas: Schema, primary_keys:List[str] = None, overwrite:bool = False, **settings):
        """initialize/create table in the underlying data warehouse system

        :param Schema col_schemas: ibis column schema
        :param List[str] primary_keys: primary keys, defaults to None
        :param bool overwrite: whether to drop table if exists, defaults to False
        """

        if self.is_exist():
            if overwrite:
                self.drop()
            else:
                logging.warning(f"{self.schema}.{self.table} already exists, will do nothing." 
                                "set overwrite to True if you wish to reset table")
                return
        
        # create schema
        self._ops.create_database(
            name = self.schema,
            force = True
        )
        # create table
        self._ops.create_table(
            name = self.table,
            schema = col_schemas,
            database = self.schema,
            engine = "MergeTree",
            order_by = primary_keys,
            partition_by = [self.snap_dt_key],
            settings = settings
        )
        
    def get_schema(self) -> Schema:
        """Check dtypes of the given schema/dataset

        :return:
        """
        return self._ops.get_schema(
            table_name = self.table,
            database = self.schema
        )
        
    def count(self, snap_dt: date) -> int:
        table = self._ops.table(
            name = self.table,
            database = self.schema
        )
        return table.count(
            where = (table[self.snap_dt_key] == snap_dt)
        ).to_pandas() # convert to a scalar number
        
    def save_qry(self, query: str, snap_dt: date, overwrite: bool = False):
        """save the query into the table using "insert into schema.table select ... " clause

        :param query: the sql query for table to be inserted
        :param snap_dt: the snap date partition to be inserted, use to check existence
        :param overwrite: whether to overwrite the table if exists

        :return:
        """
        # first detect whether snap date already exists
        clear = self.pre_save_check(snap_dt = snap_dt, overwrite = overwrite)
        if clear is False:
            return

        # first create a view of that table which can be inspected the column dtypes and names
        dt_str = dt2str(snap_dt, format = "%Y%m")
        view_name = f"{self.table}_V_{dt_str}"
        qry_cols = self._ops.sql(query).columns

        # insert into the table
        # clickhouse insert need column order correct
        sql = f"""
        INSERT INTO {self.schema}.{self.table} ({','.join(qry_cols)})
        {query}
        """
        self._ops.con.command(sql)

        logging.info(f"Successfully saved to {self.schema}.{self.table}@{snap_dt}")
        
    def _save(self, df: pd.DataFrame, snap_dt: date, **kws):
        """The pure logic to save pandas dataframe to the system, without handling existing record problem

        :param pd.DataFrame df: _description_
        :param date snap_dt: _description_
        :raises NotImplementedError: _description_
        """
        self._ops.insert(
            name = self.table,
            obj = df,
            database = self.schema,
            **kws
        )
        
    def ingest_from_file(self, file_path:str, snap_dt: date, header:bool = True, overwrite:bool = True):
        if overwrite:
            self.delete(snap_dt = snap_dt)
        else:
            num_records = self.count(snap_dt)
            if num_records > 0:
                logging.warning(f"{num_records} records found for {self.schema}.{self.table}@{snap_dt}, will do nothing")
                return
            
        # extract file format
        if file_path.endswith("parquet"):
            fmt = "Parquet"
        elif file_path.endswith("csv"):
            fmt = "CSVWithNames" if header else "CSV"
        else:
            raise TypeError("File not supported")
        
        insert_file(
            client = self._ops.con,
            table = self.table,
            database = self.schema,
            file_path = file_path,
            fmt = fmt,
        )
        logging.info(f"Successfully ingested file {file_path} to {self.schema}.{self.table}")
        
    def get_existing_snap_dts(self) -> List[date]:
        existing_snaps = (
            self._ops.table(
                name = self.table,
                database = self.schema
            ).select(self.snap_dt_key)
            .distinct()
            .order_by(self.snap_dt_key)
            .to_pandas()
        )
        if len(existing_snaps) == 0:
            return []
        return list(
            str2dt(dt.date()) 
            for dt in pd.to_datetime(existing_snaps[self.snap_dt_key]).dt.to_pydatetime()
        )
        
    def read(self, snap_dt: date, **kws) -> pd.DataFrame:
        """Read as pandas dataframe (one snapshot date) data

        :param snap_dt: snap_dt to load
        :return:
        """
        table = self._ops.table(
            name = self.table,
            database = self.schema
        )
        df = table.filter(table[self.snap_dt_key] == snap_dt)
        if 'columns' in kws:
            df = df.select(*kws['columns'])
        return df.to_pandas()
    
    def reads(self, snap_dts: List[date], **kws) -> pd.DataFrame:
        """reads as pandas dataframe (vertically concat of several given snap dates data)

        :param snap_dts: list of snap dates to read
        :return:
        """
        table = self._ops.table(
            name = self.table,
            database = self.schema
        )
        df = table.filter(table[self.snap_dt_key].isin(snap_dts))
        if 'columns' in kws:
            df = df.select(*kws['columns'])
        return df.to_pandas()
    
    def load(self, snap_dt: date, **kws) -> pyspark.sql.DataFrame:
        """Read as spark dataframe (one snapshot date) data, and can also access from sc temporary view

        :param snap_dt: snap_dt to load
        :return:
        """
        if 'columns' in kws:
            cols = ','.join(kws['columns'])
        else:
            cols = '*'
        sql = f"""
        select {cols} from {self.schema}.{self.table} where {self.snap_dt_key} = '{snap_dt}'
        """
        if hasattr(self, "sc"):
            df = self.sc.query_db(self.ch_conf, sql)
            df.createOrReplaceTempView(f"{self.table}")
            return df
        else:
            ValueError("No Spark Connector Specified, please call .setup() to bind a spark connector")

    def loads(self, snap_dts: List[date], **kws) -> pyspark.sql.DataFrame:
        """reads as pyspark dataframe (vertically concat of several given snap dates data)

        :param snap_dts: list of snap dates to read
        :return:
        """
        if 'columns' in kws:
            cols = ','.join(kws['columns'])
        else:
            cols = '*'
        snap_dt_range = ",".join(f"'{dt}'" for dt in snap_dts)
        sql = f"""
        select {cols} from {self.schema}.{self.table} where {self.snap_dt_key} in [{snap_dt_range}]
        """
        if hasattr(self, "sc"):
            df = self.sc.query_db(self.ch_conf, sql)
            df.createOrReplaceTempView(f"{self.table}")
            return df
        else:
            ValueError("No Spark Connector Specified, please call .setup() to bind a spark connector")
        
    def delete(self, snap_dt: date):
        """Delete a snap shot dataframe

        :param snap_dt: which snap date to delete
        :return:
        """
        sql = f"""
        ALTER TABLE {self.schema}.{self.table} DELETE
        WHERE {self.snap_dt_key} = '{snap_dt}'
        """
        self._ops.con.command(cmd = sql)
        
    def drop(self):
        """drop the whole table

        :return:
        """
        self._ops.drop_table(
            name = self.table,
            database = self.schema,
            force = True
        )
        
    def duplicate(self, dst_schema: str, dst_table: str) -> SnapshotDataManagerCHSQL:
        sql = """
        insert into {dst_schema:Identifier}.{dst_table:Identifier} 
        select * from {src_schema:Identifier}.{src_table:Identifier}"""
        args = dict(
            dst_schema = dst_schema, 
            dst_table = dst_table, 
            src_schema = self.schema, 
            src_table = self.table
        )
        self._ops.con.command(cmd = sql, parameters = args)
        new = SnapshotDataManagerCHSQL(
            schema = dst_schema,
            table = dst_table,
            snap_dt_key = self.snap_dt_key
        )
        return new
    
    def disk_space(self, snap_dt, unit='MB') -> float:
        """get the size of the snap date file (pandas) or folder (pyspark partitions)

        :param snap_dt:
        :param unit: {KB, MB, GB}
        """
        sql = """
        select 
            sum(rows) as rows,
            sum(bytes_on_disk) as bytes_on_disk
        from system.parts
        where 
            active
            and database = %(schema)s
            and table = %(table)s
            and partition = %(snap_dt)s
        """
        args = dict(schema = self.schema, table =self.table, snap_dt = snap_dt)
        d = self._ops.con.query(query=sql, parameters=args).first_item

        size_bytes, rows = d['bytes_on_disk'], d['rows']
        scale = {'KB': 1, 'MB': 2, 'GB': 3}.get(unit, 0)
        size = size_bytes / (1024 ** scale)
        return size

In [137]:
ch_conf=ClickHouse()
ch_conf.bindServer('localhost', 8123)
ch_conf.login('luntaixia', 'luntaixia')
SnapshotDataManagerCHSQL.setup(ch_conf=ch_conf)

ch = SnapshotDataManagerCHSQL(
    schema = 'RAW',
    table = 'CUST',
    snap_dt_key = 'SNAP_DT'
)

In [117]:
from ibis import schema
from ibis.expr.datatypes import String, Timestamp, Array, Int32, Int64, Date, Decimal, Float32

schema_ = schema(pairs = [
    ('CUST_ID', Int64(nullable = False)),
    ('SNAP_DT', Date(nullable = False)),
    ('NAME', String(nullable = False)),
    ('GENDER', String(nullable = False)),
    ('BIRTH_DT', Date(nullable = True)),
    ('PHONE', String(nullable = True)),
    ('EMAIL', String(nullable = True)),
    ('BLOOD_GRP', String(nullable = True)),
    ('JOB', String(nullable = True)),
    ('OFFICE', String(nullable = True)),
    ('ADDRESS', String(nullable = True)),
    ('ORG', String(nullable = True)),
    ('TITLE', String(nullable = True)),
    ('SINCE_DT', Date(nullable = True)),
    ('SALARY', Float32(nullable = True)),
    ('BONUS', Float32(nullable = True)),
])

ch.init_table(
    col_schemas = schema_,
    primary_keys = ['CUST_ID', 'SNAP_DT'],
)

In [118]:
from ProviderTools.aws.s3 import S3Accessor

s3a = S3Accessor(
    aws_access_key_id='luntaixiaadmin',
    aws_secret_access_key='luntaixiaadmin',
    endpoint_url = 'http://tnas-3991.local:9000'
)
s3a.enter_bucket('general-clf-pipeline-project')

df = s3a.read_parquet("/fake/data/FEATURES/CUST/CUST_2024-01-01.parquet")
ch.save(df, snap_dt=date(2024,1,1))

In [119]:
df = s3a.read_parquet("/fake/data/FEATURES/CUST/CUST_2024-01-02.parquet")
ch.save(df, snap_dt=date(2024,1,2))

In [135]:
existing_snaps = (
    con.table(
        name = 'CUST',
        database = 'RAW'
    ).select('SNAP_DT')
    .distinct()
    .order_by('SNAP_DT')
    .to_pandas()
)
pd.to_datetime(existing_snaps['SNAP_DT']).dt.to_pydatetime()

array([datetime.datetime(2024, 1, 1, 0, 0),
       datetime.datetime(2024, 1, 2, 0, 0)], dtype=object)

In [147]:
ch.disk_space(date(2024,1,2))

0.878631591796875

In [144]:
ch.reads([date(2024,1,1), date(2024,1,2)])

Unnamed: 0,CUST_ID,SNAP_DT,NAME,GENDER,BIRTH_DT,PHONE,EMAIL,BLOOD_GRP,JOB,OFFICE,ADDRESS,ORG,TITLE,SINCE_DT,SALARY,BONUS
0,3487385,2024-01-02,Brittany Diaz,F,1980-01-16,713-14-3771,candice51@gmail.com,B+,Industrial buyer,Seattle,Unit 9731 Box 2082\nDPO AP 28599,Product,VP,2013-01-07,210000.0,40500.0
1,3783703,2024-01-02,Candice Russell,F,1990-08-06,210-90-9630,anthonyberger@yahoo.com,A-,"Merchandiser, retail",New York,"6510 Daniel Avenue Apt. 792\nAcostaburgh, TN 4...",Sales,Associate,2011-08-01,79000.0,13500.0
2,14173852,2024-01-02,William Olson,M,1988-04-25,308-46-8467,ibenton@hotmail.com,B+,"Engineer, manufacturing systems",Chicago,"26257 Lauren Parkway Apt. 289\nKathrynmouth, M...",Devops,Engineer,2010-04-20,116000.0,22000.0
3,15122648,2024-01-02,Mr. Timothy Anderson,M,1994-06-29,516-36-8579,yjohnson@gmail.com,A+,Control and instrumentation engineer,Austin,"3351 Christopher Meadow\nPort Scott, OK 40845",Devops,Senior Engineer,2021-06-22,129000.0,24000.0
4,18329736,2024-01-02,Shelly Davis,F,1977-06-24,505-30-9519,rodneyowen@yahoo.com,B+,"Civil engineer, contracting",Austin,"5995 Bonnie Lodge Apt. 893\nVelazquezmouth, MA...",Product,VP,2007-06-17,241000.0,45500.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10044,28136838373,2024-01-02,Julie Davis,F,1993-05-08,738-05-1691,tiffanyestes@gmail.com,A-,"Nurse, learning disability",Seattle,"1226 Norma Walks Suite 388\nBrianaborough, WA ...",Internal Tools,Engineer,2019-05-02,98000.0,18500.0
10045,28138297045,2024-01-02,Alexandria Kelly,F,1992-06-15,223-94-3662,adriennekeith@gmail.com,AB-,Management consultant,New York,"872 Miller Stravenue\nJeffreyshire, AK 62718",Sales,Associate,2014-06-10,63000.0,12000.0
10046,28138477309,2024-01-02,Terry Nelson,M,1989-09-01,039-01-9130,ssanchez@hotmail.com,B+,Herbalist,Seattle,USCGC Williams\nFPO AA 14015,Product,Manager,2016-08-25,150000.0,26000.0
10047,28139432962,2024-01-02,Caleb Williams,M,1980-04-04,690-80-0398,ashleymorris@gmail.com,B-,"Embryologist, clinical",Chicago,"3516 Tracy Rapids\nNew Ronaldmouth, MD 15728",Devops,Senior Engineer,2017-03-26,130000.0,20500.0


In [138]:
ch.get_existing_snap_dts()

[datetime.date(2024, 1, 1), datetime.date(2024, 1, 2)]

In [79]:
from datetime import date

table = con.table('CUST', database = 'default')
table.count(where = (table['SNAP_DT'] == date(2024,1,1))).to_pandas()

10000

In [98]:
table.filter(table['SNAP_DT'].isin([date(2024,1,1)])).to_pyarrow()

pyarrow.Table
CUST_ID: int64 not null
SNAP_DT: date64[ms] not null
NAME: string not null
GENDER: string not null
BIRTH_DT: date64[ms]
PHONE: string
EMAIL: string
BLOOD_GRP: string
JOB: string
OFFICE: string
ADDRESS: string
ORG: string
TITLE: string
SINCE_DT: date64[ms]
SALARY: float
BONUS: float
----
CUST_ID: [[3487385,3783703,14173852,15122648,18329736,...,28136838373,28138297045,28138477309,28139432962,28143079368]]
SNAP_DT: [[2024-01-01,2024-01-01,2024-01-01,2024-01-01,2024-01-01,...,2024-01-01,2024-01-01,2024-01-01,2024-01-01,2024-01-01]]
NAME: [["Brittany Diaz","Candice Russell","William Olson","Mr. Timothy Anderson","Shelly Davis",...,"Julie Davis","Alexandria Kelly","Terry Nelson","Caleb Williams","Brian Berry"]]
GENDER: [["F","F","M","M","F",...,"F","F","M","M","M"]]
BIRTH_DT: [[1980-01-16,1990-08-06,1988-04-25,1994-06-29,1977-06-24,...,1993-05-08,1992-06-15,1989-09-01,1980-04-04,2000-12-25]]
PHONE: [["713-14-3771","210-90-9630","308-46-8467","516-36-8579","505-30-9519",...,"73

In [96]:
table.select('SNAP_DT').distinct().order_by('SNAP_DT').to_pandas()

Unnamed: 0,SNAP_DT
0,2024-01-01
1,2024-01-02


In [89]:
con.sql("select count(*) as t, SNAP_DT from default.CUST group by SNAP_DT").columns

['t', 'SNAP_DT']

In [85]:
con.create_view(
    'test_v',
    obj = con.sql("select SNAP_DT, count(*) from default.CUST group by SNAP_DT"),
    database = 'default',
    overwrite = True
)

In [5]:
from typing import List, Tuple, Dict
import ibis
from ibis import BaseBackend, Schema
from ProviderTools.clickhouse.dbapi import ClickHouse
from ibis.backends.clickhouse import Backend


class ClickhouseCRUD(Backend):
    def get_conn(self, ch_conf: ClickHouse) -> BaseBackend:
        if not hasattr(self, "conn"):
            self._conn = ibis.clickhouse.connect(
                user=ch_conf.username,
                password=ch_conf.password,
                host=ch_conf.ip,
                port=ch_conf.port,
            )
        return self._conn
    
    def list_schema(self, like: str = None) -> List[str]:
        return self.get_conn().list_databases(like = like)
    
    def list_table(self, schema: str = None, like: str = None) -> List[str]:
        return self.get_conn().list_tables(like = like, database = schema)
    
    def drop_schema(self, schema: str):
        self.get_conn().drop_database(self, name=schema, force=False)
    
    def drop_table(self, schema: str, table: str):
        self.get_conn().drop_table(self, name=table, database=schema, force=False)
        
    def truncate_table(self, schema: str, table: str) -> str:
        self.get_conn().truncate_table(self, name=table, database=schema)
        
    def drop_view(self, schema: str, view: str):
        self.get_conn().drop_view(self, name=view, database=schema, force=False)
    
    def is_exist(self, schema: str, table: str) -> bool:
        return table in self.list_table(schema = schema, like = table)
    
    def create_schema(self, schema: str):
        self.get_conn().create_database(name = schema, force = False)
        
    def create_table(self, schema: str, table: str, col_schemas: Schema,
                     engine: str = 'MergeTree', partition_keys: List[str] = None, 
                     primary_keys: List[str] = None, **extra_settings):
        # https://clickhouse.com/docs/en/guides/creating-tables
        # primary key is not actual primary key, it is order by key
        self.get_conn().create_table(
            name = table,
            schema = col_schemas,
            database = schema,
            engine = engine,
            order_by = primary_keys,
            partition_by = partition_keys,
            settings = extra_settings
        )

In [9]:
ch_conf=ClickHouse()
ch_conf.bindServer('localhost', 8123)
ch_conf.login('luntaixia', 'luntaixia')

c = ClickhouseCRUD()
c.get_conn(ch_conf)

<ibis.backends.clickhouse.Backend at 0x7fbf5dcf24d0>

In [158]:
import pandas as pd
import numpy as np
from typing import List, Tuple, Dict
from datetime import date, datetime
from ibis import schema
from ibis.expr.datatypes import String, Timestamp, Array, Int32

con.create_table(
    name = 'test',
    schema = schema(pairs = [
        ('name', String(nullable = False)),
        ('snap_dt', Timestamp(nullable = True, timezone = "Europe/London")),
        ('history', Array(nullable = False, value_type = Int32(nullable = False)))
    ]),
    database = 'default',
    engine = 'MergeTree',
    order_by = ['snap_dt'],
    partition_by = ['snap_dt'],
    settings = {
        'allow_nullable_key' : True,
    }
)

In [156]:
con.drop_table(name = 'test')

In [75]:
import pandas as pd
import numpy as np
from typing import List, Tuple, Dict
from datetime import date, datetime

In [80]:
ibis.dtype(np.dtype('int8'), nullable = True)
ibis.dtype(pd.Int16Dtype(), nullable = True)
ibis.dtype(List[str], nullable = False)
ibis.dtype(date, nullable = False)
ibis.dtype(datetime, nullable = False).to_pandas()

dtype('<M8[ns]')

In [102]:
ibis.dtype(ibis.expr.datatypes.Int32(nullable=True).to_pandas())

Int32(nullable=True)

In [107]:
ibis.expr.datatypes.Array(value_type = str, nullable = False).to_pyarrow()

ListType(list<item: string>)

In [159]:
from ibis import schema
from ibis.expr.datatypes import String, Timestamp, Array, Int32

schema(pairs = [
    ('name', String(nullable = False)),
    ('snap_dt', Timestamp(nullable = True, timezone = 'utc')),
    ('history', Array(nullable = False, value_type = Int32(nullable = False)))
])

ibis.Schema {
  name     !string
  snap_dt  timestamp('utc')
  history  !array<!int32>
}