In [22]:
%cd /home/jovyan/notebooks/
from IPython.core.display import HTML
HTML("""
<link rel="stylesheet" type="text/css" href="//fonts.googleapis.com/css?family=Bangers" />
<style>
h1 {font-family: Bangers !important; font-size: 150px !important}
h2 {font-family: Bangers !important; font-size: 120px !important}
h3 {font-family: Bangers !important; font-size: 100px !important}
h4 {font-family: Bangers !important; font-size: 80px !important}
h5 {font-family: Bangers !important; font-size: 50px !important}
h6 {font-family: Bangers !important; font-size: 30px !important}
.txt7 {font-family: Bangers !important; font-size: 70px !important}
.txt6 {font-family: Bangers !important; font-size: 60px !important}
.txt5 {font-family: Bangers !important; font-size: 50px !important}
.txt4 {font-family: Bangers !important; font-size: 40px !important}
.txt3 {font-family: Bangers !important; font-size: 30px !important}
.cap {font-family: Bangers !important; font-size: 30px !important}
.controls
{
    display: none !important;
}

.slide-number
{
    display: none !important;
}
.inline-block {
   display: inline-block;
}
</style>
""")

/home/jovyan/notebooks


In [None]:
!pip install confluent-kafka psycopg2-binary

#### Postgress:

``` bash
docker run -it --name postgres --rm --network=app_default \
 -e POSTGRES_PASSWORD=postgres \
 -p 5432:5432 postgres:12.11 -c wal_level=logical
```

#### Zookeeper & Kafka:

``` bash
docker run -it --rm --name zookeeper --network app_default \
 -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 \
 confluentinc/cp-zookeeper:7.2.0
```

``` bash
docker run -it --rm --name kafka --network app_default -p 9092:9092 \
 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092 \
 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \
 -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
 confluentinc/cp-kafka:7.2.0
```

#### Kafka Connect:

```bash
docker run -it --name connect --rm --network=app_default -p 8083:8083 \
        -e GROUP_ID=1 \
        -e CONFIG_STORAGE_TOPIC=my-connect-configs \
        -e OFFSET_STORAGE_TOPIC=my-connect-offsets \
        -e BOOTSTRAP_SERVERS=kafka:9092 \
        -e CONNECT_TOPIC_CREATION_ENABLE=true \
        -v $(pwd)/kafka-connect-iceberg-sink/kafka-connect-iceberg-sink-0.1.3-shaded.jar:/kafka/connect/kafka-connect-iceberg-sink/kafka-connect-iceberg-sink-0.1.3-shaded.jar \
        debezium/connect:2.0
```

#### Minio:

```bash
docker run --rm -it -p 9000:9000 \
 -p 9001:9001 --name minio \
 -v $(pwd)/minio-data:/data \
 --network app_default \
 minio/minio server /data --console-address ":9001"
```

#### Create kafka topics:

In [None]:
from confluent_kafka.admin import AdminClient, NewTopic


admin_client = AdminClient({
    "bootstrap.servers": "kafka:9092"
})

topic_list = []
topic_list.append(NewTopic("postgres.public.mystats_fv1", 1, 1))
topic_list.append(NewTopic("postgres.public.mystats_fv2", 1, 1))
admin_client.create_topics(topic_list)

# Kafka Connect configuration 

#### Postgress connector configuration:

In [None]:
import requests
import json

data = {
  "name": "postgres-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "postgres", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "postgres", 
    "database.server.name": "postgres",
    "slot.name": "debezium",
    "plugin.name": "pgoutput",
    "table.include.list": "public.mystats_fv1,public.mystats_fv2"
  }
}

headers = { "Content-Type": "application/json" }
url="http://connect:8083/connectors"
requests.post(url, headers=headers, data=json.dumps(data))

#### Iceberg sink configuration:

In [None]:
import requests
import json
data = {
  "name": "iceberg-sink",
  "config": {
    "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink",
    "topics": "postgres.public.mystats_fv1,postgres.public.mystats_fv2",
    "upsert": False,
    "upsert.keep-deletes": True,
    "table.auto-create": True,
    "table.write-format": "parquet",
    "table.namespace": "mytable_dbz",
    "table.prefix": "debeziumcdc_",
    "iceberg.warehouse": "s3a://mybucket",
    "iceberg.fs.defaultFS": "s3a://mybucket", 
    "iceberg.catalog-name": "mycatalog", 
    "iceberg.catalog-impl": "org.apache.iceberg.hadoop.HadoopCatalog", 
    "iceberg.fs.s3a.path.style.access": True,
    "iceberg.fs.s3a.endpoint": "http://minio:9000",
    "iceberg.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "iceberg.fs.s3a.access.key": "minioadmin",
    "iceberg.fs.s3a.secret.key": "minioadmin",
  }
}

headers = { "Content-Type": "application/json" }
url="http://connect:8083/connectors"
requests.post(url, headers=headers, data=json.dumps(data))

### Postgress data change simulation

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from sklearn.datasets import make_hastie_10_2
import warnings
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
warnings.filterwarnings("ignore", category=DeprecationWarning)

DATABASE_HOST='postgres';
DATABASE_USER='postgres';
DATABASE_PASSWORD='postgres';
DATABASE_NAME='postgres';

In [None]:
def generate_entities(size):
    return np.random.choice(size, size=size, replace=False)

def generate_data(entities, year=2021, month=10, day=1) -> pd.DataFrame:
    n_samples=len(entities)
    X, y = make_hastie_10_2(n_samples=n_samples, random_state=0)
    df = pd.DataFrame(X, columns=["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9"])
    df["y"]=y
    df['entity_id'] = entities
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(year, month, day, 0,tzinfo=timezone.utc).timestamp(),
                datetime(year, month, day, 22,tzinfo=timezone.utc).timestamp(),
                size=n_samples),
        unit="s",
    )
    df['created'] = pd.to_datetime(
            datetime.now(),
            )
    return df

entities=generate_entities(100)
alchemyEngine = create_engine('postgresql+psycopg2://postgres:postgres@postgres', pool_recycle=3600);
dbConnection = alchemyEngine.connect();

for d in range(1,15):
    data=generate_data(entities,month=1, day=d)
    fv1 = data[["entity_id", "datetime", "f0", "f1", "f2", "f3", "f4"]]
    fv2 = data[["entity_id", "datetime", "f5", "f6", "f7", "f8", "f9", "y"]]
    fv1.to_sql('mystats_fv1', dbConnection, if_exists='replace')
    fv2.to_sql('mystats_fv2', dbConnection, if_exists='replace')
    

In [23]:
%cd /home/jovyan/notebooks/
!rm -r feature_store
!mkdir feature_store
%cd /home/jovyan/notebooks/feature_store

/home/jovyan/notebooks
/home/jovyan/notebooks/feature_store


## Feast configuration

In [24]:
%%writefile ./feature_store.yaml
project: example_feature_repo
registry: data/registry.db
provider: local
offline_store:
  type: yummy.YummyOfflineStore
  backend: spark
  config:
    spark.master: "local[*]"
    spark.ui.enabled: "false"
    spark.eventLog.enabled: "false"
    spark.sql.session.timeZone: "UTC"
    spark.sql.extensions: "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    spark.sql.catalog.local: "org.apache.iceberg.spark.SparkCatalog"
    spark.sql.catalog.local.type: "hadoop"
    spark.sql.catalog.local.warehouse: "s3a://mybucket"
    spark.hadoop.fs.s3a.endpoint: "http://minio:9000"
    spark.hadoop.fs.s3a.access.key: "minioadmin"
    spark.hadoop.fs.s3a.secret.key: "minioadmin"
    spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
    spark.hadoop.fs.s3a.path.style.access: "true"
    spark.hadoop.fs.s3a.connection.ssl.enabled: "false"
online_store:
  path: data/online_store.db

Writing ./feature_store.yaml


In [25]:
%%writefile ./features.py
from datetime import datetime, timezone, timedelta
from google.protobuf.duration_pb2 import Duration
from feast import Entity, Feature, FeatureView, ValueType
from yummy import IcebergDataSource

entity = Entity(name="entity_id", value_type=ValueType.INT64, description="entity id",)

fv1 = FeatureView(
    name="debeziumcdc_postgres_public_mystats_fv1",
    entities=["entity_id"],
    ttl=Duration(seconds=3600*24*20),
    features=[
        Feature(name="f0", dtype=ValueType.FLOAT), Feature(name="f1", dtype=ValueType.FLOAT),
        Feature(name="f2", dtype=ValueType.FLOAT), Feature(name="f3", dtype=ValueType.FLOAT),
        Feature(name="f4", dtype=ValueType.FLOAT), ],
    online=True,
    input=IcebergDataSource(
            path="local.mytable_dbz.debeziumcdc_postgres_public_mystats_fv1",
            event_timestamp_column="__source_ts",
    ), tags={},)

fv2 = FeatureView(
    name="debeziumcdc_postgres_public_mystats_fv2",
    entities=["entity_id"],
    ttl=Duration(seconds=3600*24*20),
    features=[
        Feature(name="f5", dtype=ValueType.FLOAT), Feature(name="f6", dtype=ValueType.FLOAT),
        Feature(name="f7", dtype=ValueType.FLOAT), Feature(name="f8", dtype=ValueType.FLOAT),
        Feature(name="f9", dtype=ValueType.FLOAT), Feature(name="y", dtype=ValueType.FLOAT), ],
    online=True,
    input=IcebergDataSource(
            path="local.mytable_dbz.debeziumcdc_postgres_public_mystats_fv2",
            event_timestamp_column="__source_ts",
    ), tags={},)

Writing ./features.py


In [26]:
!feast apply

Created entity [1m[32mentity_id[0m
Created feature view [1m[32mdebeziumcdc_postgres_public_mystats_fv2[0m
Created feature view [1m[32mdebeziumcdc_postgres_public_mystats_fv1[0m

Created sqlite table [1m[32mexample_feature_repo_debeziumcdc_postgres_public_mystats_fv2[0m
Created sqlite table [1m[32mexample_feature_repo_debeziumcdc_postgres_public_mystats_fv1[0m



In [27]:
%cd /home/jovyan/notebooks/feature_store
import pandas as pd
import numpy as np
from datetime import datetime, timezone, timedelta
from feast import FeatureStore

def generate_entities(size: int):
    return np.random.choice(size, size=size, replace=False)

def entity_df(size:int = 10):
    entities=generate_entities(size)
    entity_df = pd.DataFrame(data=entities, columns=['entity_id'])
    entity_df["event_timestamp"]=datetime.now()
    return entity_df

entity_df = entity_df()
FeatureStore(".").get_historical_features(
    features=[
        "debeziumcdc_postgres_public_mystats_fv1:f0", "debeziumcdc_postgres_public_mystats_fv1:f1",
        "debeziumcdc_postgres_public_mystats_fv1:f2", "debeziumcdc_postgres_public_mystats_fv1:f3",
        "debeziumcdc_postgres_public_mystats_fv1:f4", "debeziumcdc_postgres_public_mystats_fv2:f5",
        "debeziumcdc_postgres_public_mystats_fv2:f6", "debeziumcdc_postgres_public_mystats_fv2:f7",
        "debeziumcdc_postgres_public_mystats_fv2:f8", "debeziumcdc_postgres_public_mystats_fv2:f9",
    ], entity_df=entity_df, full_feature_names=True).to_df()

/home/jovyan/notebooks/feature_store
I will use spark backend


Unnamed: 0,entity_id,debeziumcdc_postgres_public_mystats_fv2__f5,debeziumcdc_postgres_public_mystats_fv2__f6,debeziumcdc_postgres_public_mystats_fv2__f7,debeziumcdc_postgres_public_mystats_fv2__f8,debeziumcdc_postgres_public_mystats_fv2__f9,debeziumcdc_postgres_public_mystats_fv1__f0,debeziumcdc_postgres_public_mystats_fv1__f1,debeziumcdc_postgres_public_mystats_fv1__f2,debeziumcdc_postgres_public_mystats_fv1__f3,debeziumcdc_postgres_public_mystats_fv1__f4,event_timestamp
0,0,-0.1151074684872267,0.4574156062209908,-0.9646120137337284,-0.7826291558275251,-0.1103892990268877,-0.3952289826543543,-1.159420516399913,-0.0859307669716127,0.1942929380457716,0.8758327615873309,2022-07-24 12:59:14.537634
1,1,0.3792355335355867,-0.4700328827008748,-0.2167314705755386,-0.9301565025243212,-0.1785890920873291,0.5433118913875197,0.4390429576720425,-0.2195410283312132,-1.0840366206719343,0.3517801106813583,2022-07-24 12:59:14.537634
2,2,0.355481792743769,0.8145198224878664,0.0589255891816299,-0.1850536710093415,-0.8076484876163557,-0.5986539369229861,-1.1158969859603944,0.7666631816450861,0.3562928174722889,-1.7685384506770307,2022-07-24 12:59:14.537634
3,3,2.132153410570444,0.9364457258311158,-0.0350951768696703,1.265077838088766,0.211497012731878,-1.4449401990733717,-1.2105429941233516,-0.7886692545093662,1.0946383747120914,0.2348215259487319,2022-07-24 12:59:14.537634
4,4,1.8227236001279596,-0.5215796779933731,-1.184686590411552,0.9606933984606596,1.3290628465396823,-1.461732688261408,-0.6834397667886818,0.367544896022269,0.1903115575939397,-0.85172919725359,2022-07-24 12:59:14.537634
5,5,0.681594518281627,-0.8034096641738411,-0.6895497777502005,-0.4555325035173431,0.0174791590250567,0.920858823780819,0.3187276529430212,0.8568306119026912,-0.6510255933001469,-1.034242841784465,2022-07-24 12:59:14.537634
6,6,0.0632619942003317,0.1565065379653756,0.2321810362002757,-0.5973160689653627,-0.237921729736007,-0.955945000492777,-0.3459817756993864,-0.4635959746460942,0.4814814737734622,-1.5407970144446248,2022-07-24 12:59:14.537634
7,7,0.405204080322888,0.4980524046828567,-0.0261922373442504,-1.6882300277714322,-0.1124659825595527,1.4111720638896117,0.7858038268311726,-0.0574695184653946,-0.3912170521740162,0.9409176145751134,2022-07-24 12:59:14.537634
8,8,-2.655619092974933,1.5133280825732052,0.553132064207584,-0.0457039606602348,0.2205076557571733,0.7714059486768455,1.0294388287827672,-0.9087632459590532,-0.4243176209779015,0.8625960113284511,2022-07-24 12:59:14.537634
9,9,-0.4438360931551978,1.0781973037142378,-2.5591846663440965,1.181378601288286,-0.6319037580051673,0.2134800489101689,-1.2085736537332212,-0.2420198298702195,1.5182611703557054,-0.3846454231425177,2022-07-24 12:59:14.537634


In [None]:
%%writefile ./test.py
import pandas as pd
import numpy as np
from datetime import datetime, timezone, timedelta
from feast import FeatureStore

def generate_entities(size: int):
    return np.random.choice(size, size=size, replace=False)

def entity_df(size:int = 10):
    entities=generate_entities(size)
    entity_df = pd.DataFrame(data=entities, columns=['entity_id'])
    entity_df["event_timestamp"]=datetime(2022, 7, 24, 23, 59, 42, tzinfo=timezone.utc)
    return entity_df

entity_df = entity_df()

feature_store = FeatureStore(".")
feature_vector = feature_store.get_historical_features(
    features=[
        "debeziumcdc_postgres_public_mystats_fv1:f0",
        "debeziumcdc_postgres_public_mystats_fv1:f1",
        "debeziumcdc_postgres_public_mystats_fv1:f2",
        "debeziumcdc_postgres_public_mystats_fv1:f3",
        "debeziumcdc_postgres_public_mystats_fv1:f4",
        "debeziumcdc_postgres_public_mystats_fv2:f5",
        "debeziumcdc_postgres_public_mystats_fv2:f6",
        "debeziumcdc_postgres_public_mystats_fv2:f7",
        "debeziumcdc_postgres_public_mystats_fv2:f8",
        "debeziumcdc_postgres_public_mystats_fv2:f9",
    ], entity_df=entity_df, full_feature_names=True
).to_df()

print(feature_vector)

In [None]:
%%writefile ./test.sh
#!/bin/bash

PYSPARK_PYTHON=/opt/conda/bin/python3 PYSPARK_DRIVER_PYTHON=/opt/conda/bin/python3 FEAST_USAGE=False IS_TEST=True spark-submit \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2,io.delta:delta-core_2.12:1.1.0,org.apache.hadoop:hadoop-aws:3.3.1,software.amazon.awssdk:s3:2.17.131 \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
    --conf "spark.driver.memory=5g" \
    --conf "spark.executor.memory=5g" \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" \
    --conf "spark.sql.catalog.spark_catalog.type=hive" \
    --conf "spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.local.type=hadoop" \
    --conf "spark.sql.catalog.local.warehouse=$PWD/warehouse" \
    --conf "spark.hadoop.fs.s3a.access.key=minioadmin" \
    --conf "spark.hadoop.fs.s3a.secret.key=minioadmin" \
    --conf "spark.hadoop.fs.s3a.endpoint=http://minio:9000" \
    ./test.py

In [None]:
!bash ./test.sh