In [None]:
from dotenv import load_dotenv
load_dotenv()

In [None]:
from __future__ import absolute_import
import os
import findspark
findspark.init()
import pyspark  # noqa
from pyspark.sql import SparkSession  # noqa

In [None]:
cur_dir = os.path.dirname(os.path.realpath("./"))
external_libs_dir = os.path.join(cur_dir, "signal", "lib", "external")
external_libs_jars = [
    os.path.join(external_libs_dir, f) for f in os.listdir(external_libs_dir)
]
spark_warehouse_dir, meta_dir = "/tmp/spark-warehouse", "/tmp/spark-meta"

In [None]:
spark = (
        SparkSession.builder.master("local")
        # workaround to avoid snappy library issue
        .config("spark.sql.parquet.compression.codec", "uncompressed")
        .config("spark.master", "local")
        .config("spark.driver.memory", "1G")
        # make spark-warehouse temporary
        .config("spark.sql.warehouse.dir", spark_warehouse_dir)
        # make metastore temporary
        .config(
            "spark.driver.extraJavaOptions", "-Dderby.system.home={}".format(meta_dir)
        )
        .config(
            "spark.jars.repositories",
            "http://packages.confluent.io/maven/,https://repo1.maven.org/maven2/",
        )
        .config("spark.jars", ",".join(external_libs_jars))
        .config(
            "spark.jars.packages",
            "org.apache.spark:spark-avro_2.12:3.5.3,za.co.absa:abris_2.12:6.4.0,com.lihaoyi:os-lib_2.12:0.8.1,org.apache.kafka:kafka-clients:3.8.0,io.delta:delta-spark_2.12:3.2.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1,org.apache.hadoop:hadoop-aws:3.2.2,org.apache.hadoop:hadoop-client:3.2.2,org.apache.hadoop:hadoop-client-runtime:3.2.2",
        )
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .appName("seeknal-test")
        .enableHiveSupport()
        .getOrCreate()
    )

In [None]:
from datetime import datetime, timedelta

from seeknal.entity import Entity
from seeknal.featurestore.feature_group import (
    FeatureGroup,
    Materialization,
    OfflineMaterialization,
    OfflineStore,
    OfflineStoreEnum,
    FeatureStoreFileOutput,
    OnlineStore,
    OnlineStoreEnum,
    HistoricalFeatures,
    FeatureLookup,
    FillNull,
    GetLatestTimeStrategy,
    OnlineFeatures,
)

from seeknal.flow import *
from seeknal.featurestore.featurestore import Feature
from seeknal.common_artifact import Source, Rule, Common, Dataset
from seeknal.project import Project
from seeknal.workspace import Workspace
from seeknal.tasks.sparkengine import aggregators as G
from seeknal.tasks.sparkengine import transformers as T
from seeknal.tasks.sparkengine.transformers.spark_engine_transformers import (
    JoinTablesByExpr,
    JoinType,
    TableJoinDef,
)
from pyspark import SparkContext
from pyspark.sql import DataFrame
import pandas as pd

In [None]:
from datetime import datetime, timedelta

from src.seeknal.entity import Entity
from src.seeknal.featurestore.feature_group import (
    FeatureGroup,
    Materialization,
    OfflineMaterialization,
    OfflineStore,
    OfflineStoreEnum,
    FeatureStoreFileOutput,
    OnlineStore,
    OnlineStoreEnum,
    HistoricalFeatures,
    FeatureLookup,
    FillNull,
    GetLatestTimeStrategy,
    OnlineFeatures,
)

from src.seeknal.flow import *
from src.seeknal.featurestore.featurestore import Feature
from src.seeknal.common_artifact import Source, Rule, Common, Dataset
from src.seeknal.project import Project
from src.seeknal.workspace import Workspace
from src.seeknal.tasks.sparkengine import aggregators as G
from src.seeknal.tasks.sparkengine import transformers as T
from src.seeknal.tasks.sparkengine.transformers.spark_engine_transformers import (
    JoinTablesByExpr,
    JoinType,
    TableJoinDef,
)
from pyspark import SparkContext
from pyspark.sql import DataFrame
import pandas as pd

# 1. Project and Workspace

To start working with seeknal, we need to set up a project and a workspace.

In [None]:
project = Project(name="test_project", description="test project")
# attach project
project.get_or_create()

In [None]:
# create or use a workspace
Workspace(name="test_workspace", description="test workspace").get_or_create()

# check which workspace is active
Workspace.current()

## 1.2 Add Common Articfacts 

In [None]:
# Add source
entity = Entity(name="subscriber", join_keys=["msisdn"])
entity.get_or_create()
source = Source(
    "my_user_stay",
    Dataset(table="user_stay", params={"dateCol": "date_id"}),
    description="user stay",
    entity=entity,
)
source.get_or_create()
Source.list()

In [None]:
# Add rule
Rule(name="foo", value="bar", description="foo bar").get_or_create()
Rule.list()

In [None]:
# get common yaml

Rule(name="foo", value="bar", description="foo bar").get_or_create()
Rule(name="blah", value=["foo", "bar"]).get_or_create()
Source(
    "my_user_stay",
    Dataset(hive_table="user_stay", params={"dateCol": "date_id"}),
    description="user stay",
).get_or_create()
print(Common.as_yaml())

# 2. Tasks

DS2 build rich tasks that can be used to automate the process of data science.

## 2.1 Spark Engine Task

In [None]:
columns = "day:string, feature1:float, feature2:float, id:string"
vals = [
    ("20190620", 1.0, 1.0, "1"),
    ("20190610", -1.0, -1.0, "1"),
    ("20190602", 50.0, 50.0, "1"),
    ("20190601", 0.0, 0.0, "1"),
    ("20190520", 22.2, 22.2, "1"),
    ("20190510", 2.0, 2.0, "1"),
    ("20190501", 2.1, 2.1, "1"),
    ("20190620", 1.0, 1.0, "2"),
    ("20190710", None, None, "2"),
    ("20190602", 50.0, 50.0, "2"),
    ("20190601", 0.0, 0.0, "2"),
    ("20190520", 22.2, 22.2, "2"),
    ("20190510", 2.0, 2.0, "2"),
    ("20190501", 2.1, 2.1, "2"),
]

daily_features_1 = spark.createDataFrame(vals, columns)
daily_features_1.write.saveAsTable("test_df")

In [None]:
yaml_str = """
pipeline:
  input:
    table: {}
  stages:
    - className: tech.mta.seeknal.transformers.SQL
      params:
        statement: >-
          SELECT day, CONCAT(id, "-append") as id, feature1, feature2 FROM __THIS__
""".format(
    "test_df"
)

res = SparkEngineTask().add_yaml(yaml_str).transform(spark)
res.show()

In [None]:
Source(
    "my_test_df",
    Dataset(table="test_df", params={"dateCol": "date_id"}),
    description="just a dummy",
).get_or_create()

In [None]:
# transform data with common yaml
yaml_str = """
    pipeline:
      input:
        id: my_test_df
      stages:
        - className: tech.mta.seeknal.transformers.SQL
          params:
            statement: >-
              SELECT day, CONCAT(id, "-append") as id, feature1, feature2 FROM __THIS__
    """

res = (
        SparkEngineTask()
        .add_common_yaml(Common.as_yaml())
        .add_yaml(yaml_str)
        .transform(spark)
    )
res.show()


In [None]:
df = spark.table("test_df")
yaml_str = """
pipeline:
  stages:
    - className: tech.mta.seeknal.transformers.SQL
      params:
        statement: >-
          SELECT day, CONCAT(id, "-append") as id, feature1, feature2 FROM __THIS__
"""
res = (
    SparkEngineTask().add_yaml(yaml_str).add_input(dataframe=df).transform(spark)
)
res.show()

In [None]:
format_date = T.Transformer(
    T.ClassName.ADD_DATE,
    inputCol="day",
    outputCol="new_date",
    inputDateFormat="yyyyMMdd",
    outputDateFormat="yyyy-MM-dd",
)
res = (
    SparkEngineTask()
    .add_input(dataframe=df)
    .add_common_yaml(Common.as_yaml())
    .add_stage(
        transformer=T.SQL(
            statement="SELECT day, CONCAT(id, '-append') as id, feature1, feature2 FROM __THIS__"
        )
    )
    .add_stage(transformer=format_date)
    .transform(spark)
)
res.show()

In [None]:
df = spark.table("test_df")
res = (
    SparkEngineTask()
    .add_input(dataframe=df)
    .add_stage(
        transformer=T.SQL(
            statement="SELECT day, CONCAT(id, '-append') as id, feature1, feature2 FROM __THIS__"
        )
    )
    .transform(spark)
)

aggr = G.Aggregator(
    group_by_cols=["id"],
    aggregators=[
        G.FunctionAggregator(
            inputCol="feature1", outputCol="feature1_sum", accumulatorFunction="sum"
        ),
        G.ExpressionAggregator(
            outputCol="feature2_sum", expression="sum(feature2)"
        ),
    ],
)
res = (
    SparkEngineTask(name="create aggregates")
    .add_input(dataframe=df)
    .add_stage(aggregator=aggr)
    .add_output(
        source="file",
        params={"path": "{}/test_output".format("/tmp/seeknal_result"), "format": "parquet"},
    )
    .transform(spark, materialize=False)
)
res.show()

In [None]:
spine_columns = "msisdn:string, app_day:string, feature3:float"
spine_vals = [("1", "20190510", 1.0), ("2", "20190715", 2.0)]
spine_df = spark.createDataFrame(spine_vals, spine_columns)

tables = [
    TableJoinDef(
        table=spine_df,
        joinType=JoinType.LEFT,
        alias="b",
        joinExpression="a.id = b.msisdn",
    )
]
join = JoinTablesByExpr(tables=tables, select_stm="a.*, b.feature3, b.app_day")
res = (
    SparkEngineTask()
    .add_input(dataframe=spark.table("test_df"))
    .add_stage(transformer=join)
    .transform(spark)
)
res.show()

In [None]:
df = spark.table("test_df")
add_month = T.Transformer(
    T.ClassName.ADD_DATE,
    inputCol="day",
    outputCol="month",
    inputDateFormat="yyyyMMdd",
    outputDateFormat="yyyy-MM-01",
)
preprocess = (
    SparkEngineTask()
    .add_input(dataframe=df)
    .add_stage(
        transformer=T.SQL(
            statement="SELECT day, CONCAT(id, '-append') as id, feature1, feature2 FROM __THIS__"
        )
    )
)
res_one = preprocess.copy().add_stage(transformer=add_month).transform(spark)
res_two = (
    preprocess.copy()
    .add_sql("SELECT *, CONCAT(id, '-append-again') as id_b FROM __THIS__")
    .transform(spark)
)
res_one.show()
res_two.show()

In [None]:
df = spark.table("test_df")
preprocess = (
    SparkEngineTask()
    .add_input(dataframe=df)
    .add_stage(
        transformer=T.SQL(
            statement="SELECT day, CONCAT(id, '-append') as id, feature1, feature2 FROM __THIS__"
        )
    )
)

preprocess.update_stage(
    0,
    transformer=T.SQL(
        statement="SELECT *, CONCAT(id, '-append-again') as id_b FROM __THIS__"
    ),
)

res = preprocess.transform(spark)
res.show()

In [None]:
df = spark.table("test_df")
preprocess = (
    SparkEngineTask()
    .add_input(dataframe=df)
    .add_stage(
        transformer=T.SQL(
            statement="SELECT day, CONCAT(id, '-append') as id, feature1, feature2 FROM __THIS__"
        )
    )
)
preprocess.insert_stage(
    0,
    transformer=T.SQL(
        statement="SELECT *, CONCAT(id, '-append-again') as id_b FROM __THIS__"
    ),
)
preprocess.print_yaml()

res = preprocess.transform(spark)
res.show()

In [None]:
import datetime
df = spark.table("test_df")

res = (
    SparkEngineTask()
    .add_input(dataframe=df)
    .set_date_col("day")
    .add_stage(
        transformer=T.SQL(
            statement="SELECT day, CONCAT(id, '-append') as id, feature1, feature2 FROM __THIS__"
        )
    )
    .transform(
        start_date=datetime.datetime(2019, 5, 1),
        end_date=datetime.datetime(2019, 5, 10),
    )
)
res.show()

In [None]:
from datetime import datetime
res = (
        SparkEngineTask()
        .add_input(dataframe=df)
        .set_date_col("day")
        .get_date_available(after_date=datetime(2019, 5, 10))
)
print(res)

## 2.3 DuckDB Task

In [None]:
arrow_df = pq.read_table("src/tests/data/poi_sample.parquet")
my_duckdb = (
    DuckDBTask()
    .add_input(dataframe=arrow_df)
    .add_sql("SELECT poi_name, lat, long FROM __THIS__")
    .add_sql("SELECT poi_name, lat FROM __THIS__")
)
print(my_duckdb.__dict__)
res = my_duckdb.transform()

print(res)

In [None]:
my_duckdb = (
    DuckDBTask()
    .add_input(path="seeknal/tests/data/poi_sample.parquet/*.parquet")
    .add_sql("SELECT poi_name, lat, long FROM __THIS__")
    .add_sql("SELECT poi_name, lat FROM __THIS__")
)
print(my_duckdb.__dict__)
res = my_duckdb.transform()

print(res)

# 3. Flow

In [None]:
from seeknal.flow import (
    Flow,
    FlowInput,
    FlowOutput,
    FlowInputEnum,
    FlowOutputEnum,
    run_flow,
)

In [None]:

columns = "msisdn:string, lat:double, lon:double, start_time:string, end_time:string, count_hours:double, radius:double, movement_type:string, day:string"
vals = [
        (
            "id1",
            3.1165,
            101.5663,
            "2019-01-01 06:00:00",
            "2019-01-01 07:00:00",
            1.0,
            1395.04,
            "stay",
            "20190101",
        ),
        (
            "id2",
            3.812033,
            103.324633,
            "2019-01-01 06:00:00",
            "2019-01-01 07:00:00",
            1.0,
            841.36,
            "stay",
            "20190101",
        ),
        (
            "id3",
            3.0637,
            101.47016,
            "2019-01-01 06:00:00",
            "2019-01-01 07:00:00",
            1.0,
            1387.35,
            "stay",
            "20190101",
        ),
        (
            "id1",
            3.1186,
            101.6639,
            "2019-01-01 07:00:00",
            "2019-01-01 08:00:00",
            1.0,
            1234.22,
            "stay",
            "20190101",
        ),
        (
            "id1",
            3.1165,
            101.5663,
            "2019-01-01 06:00:00",
            "2019-01-01 07:00:00",
            1.0,
            1395.04,
            "stay",
            "20190102",
        ),
        (
            "id2",
            3.812033,
            103.324633,
            "2019-01-01 06:00:00",
            "2019-01-01 07:00:00",
            1.0,
            841.36,
            "stay",
            "20190102",
        ),
        (
            "id3",
            3.0637,
            101.47016,
            "2019-01-01 06:00:00",
            "2019-01-01 07:00:00",
            1.0,
            1387.35,
            "stay",
            "20190102",
        ),
        (
            "id1",
            3.1186,
            101.6639,
            "2019-01-01 07:00:00",
            "2019-01-01 08:00:00",
            1.0,
            1234.22,
            "stay",
            "20190102",
        ),
        (
            "id1",
            3.1165,
            101.5663,
            "2019-01-01 06:00:00",
            "2019-01-01 07:00:00",
            1.0,
            1395.04,
            "stay",
            "20190105",
        ),
        (
            "id2",
            3.812033,
            103.324633,
            "2019-01-01 06:00:00",
            "2019-01-01 07:00:00",
            1.0,
            841.36,
            "stay",
            "20190105",
        ),
        (
            "id3",
            3.0637,
            101.47016,
            "2019-01-01 06:00:00",
            "2019-01-01 07:00:00",
            1.0,
            1387.35,
            "stay",
            "20190105",
        ),
        (
            "id1",
            3.1186,
            101.6639,
            "2019-01-01 07:00:00",
            "2019-01-01 08:00:00",
            1.0,
            1234.22,
            "stay",
            "20190105",
        ),
    ]
user_stay = spark.createDataFrame(vals, columns)
(
    SparkEngineTask()
    .add_input(dataframe=user_stay)
    .transform()
    .write.mode("overwrite")
    .saveAsTable("user_stay")
)


In [None]:
project = Project(name="test_project", description="test project")
# attach project
project.get_or_create()

In [None]:
flow_input = FlowInput(kind=FlowInputEnum.HIVE_TABLE, value="user_stay")
flow_output = FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME)

task_two = SparkEngineTask().add_sql("SELECT * FROM __THIS__")
task_three = DuckDBTask().add_sql("SELECT msisdn, lat, lon, movement_type, day FROM __THIS__")
flow = Flow(
    name="my_flow",
    input=flow_input,
    tasks=[task_two, task_three],
    output=FlowOutput(),
)

flow.get_or_create()


In [None]:
# save from the object
print(flow.run())

In [None]:
# load from seeknal
flowtwo = Flow(name="my_flow").get_or_create()
print(flowtwo.run())

In [None]:
flow.as_dict()

In [None]:
my_flow = Flow.from_dict(flow.as_dict())
my_flow.run()

# 4. Feature Group 

In [None]:
comm_day = spark.read.format("parquet").load("seeknal/tests/data/feateng_comm_day")
comm_day.write.saveAsTable("comm_day")

In [None]:
spark.table("comm_day").show()

In [None]:
spark.sql("DROP TABLE seeknal.fg_1__1")

In [None]:
FeatureStoreFileOutput

In [None]:
materialization = Materialization(event_time_col="day", 
offline_materialization=OfflineMaterialization(
    store=OfflineStore(kind=OfflineStoreEnum.FILE, 
                       name="object_storage",
                       value=FeatureStoreFileOutput(path="s3a://warehouse/feature_store")), 
                       mode="overwrite", ttl=None),
    offline=True)

In [None]:
flow_input = FlowInput(kind=FlowInputEnum.HIVE_TABLE, value="comm_day")
my_flow = Flow(
    input=flow_input, tasks=None, output=FlowOutput(), name="my_flow_for_fg"
)

my_fg_one = FeatureGroup(
    name="comm_day_three",
    entity=Entity(name="msisdn", join_keys=["msisdn"]).get_or_create(),
    materialization=materialization,
).set_flow(my_flow)
my_fg_one.set_features()
# print(my_fg)
my_fg_one.get_or_create()

In [None]:
from datetime import datetime
my_fg_one.write(
        feature_start_time=datetime(2019, 3, 5)
    )

In [None]:
flow_input = FlowInput(kind=FlowInputEnum.HIVE_TABLE, value="comm_day")
my_flow = Flow(
    input=flow_input, tasks=None, output=FlowOutput(), name="my_flow_for_fg"
)

my_fg_one = FeatureGroup(
    name="comm_day",
    entity=Entity(name="msisdn", join_keys=["msisdn"]).get_or_create(),
    materialization=Materialization(event_time_col="day"),
).set_flow(my_flow)
my_fg_one.set_features()
# print(my_fg)
my_fg_one.get_or_create()

In [None]:
my_fg_one

In [None]:
input_df = spark.read.table("comm_day")
my_fg_two = FeatureGroup(
    name="comm_day_two",
    entity=Entity(name="msisdn", join_keys=["msisdn"]).get_or_create(),
    materialization=Materialization(event_time_col="day"),
).set_dataframe(dataframe=input_df)

my_fg_two.set_features()
my_fg_two.get_or_create()

In [None]:
from datetime import datetime
my_fg_one.write(
        feature_start_time=datetime(2019, 3, 5)
    )

In [None]:
my_fg = FeatureGroup(name="comm_day_two").get_or_create()
my_fg.set_dataframe(dataframe=spark.read.table("comm_day")).write(
    feature_start_time=datetime(2019, 3, 5)
)

## 4.1 Historical Features

In [None]:
my_fg = FeatureGroup(name="comm_day_two")
fs = FeatureLookup(source=my_fg)
fillnull = FillNull(value="0.0", dataType="double")
hist = HistoricalFeatures(lookups=[fs], fill_nulls=[fillnull])
df = hist.to_dataframe(feature_start_time=datetime(2019, 3, 5))
df.show()

In [None]:
my_fg = FeatureGroup(name="comm_day_three")
my_fg_two = FeatureGroup(name="comm_day_three")
fs = FeatureLookup(source=my_fg)
fs_two = FeatureLookup(source=my_fg_two, features=["comm_count_call_in"])
fillnull = FillNull(value="0.0", dataType="double")
hist = HistoricalFeatures(lookups=[fs], fill_nulls=[fillnull])
df = hist.to_dataframe(feature_start_time=datetime(2019, 3, 5))
df.show()

In [None]:
my_fg

In [None]:
my_fg = FeatureGroup(name="comm_day")
fs = FeatureLookup(source=my_fg)

hist = HistoricalFeatures(lookups=[fs])
spine_dummy_data = pd.DataFrame(
    [
        {"msisdn": "011ezY2Kjs", "app_date": "2019-03-19", "label": 1},
        {"msisdn": "01ViZtJZCj", "app_date": "2019-03-10", "label": 0},
    ]
)
df = hist.using_spine(
    spine=spine_dummy_data, date_col="app_date", keep_cols=["label"]
).to_dataframe()
df.show()

In [None]:
my_fg = FeatureGroup(name="comm_day")
my_fg_two = FeatureGroup(name="comm_day_two")
fs = FeatureLookup(source=my_fg)
fs_two = FeatureLookup(source=my_fg_two)

hist = HistoricalFeatures(lookups=[fs, fs_two])
df = hist.using_latest(
    fetch_strategy=GetLatestTimeStrategy.REQUIRE_ANY
).to_dataframe()
df.show()

## 4.2 Serve features to Online Store

In [None]:
my_fg = FeatureGroup(name="comm_day_three")
fs = FeatureLookup(source=my_fg)

user_one = Entity(name="msisdn").get_or_create().set_key_values("05X5wBWKN3")
fillnull = FillNull(value="0.0", dataType="double")
hist = HistoricalFeatures(lookups=[fs], fill_nulls=[fillnull])
hist = hist.using_latest().serve()
abc = hist.get_features(keys=[user_one])
print(abc)

In [None]:
my_fg = FeatureGroup(name="comm_day_two")
fs = FeatureLookup(source=my_fg)

user_one = Entity(name="msisdn").get_or_create().set_key_values("05X5wBWKN3")
fillnull = FillNull(value="0.0", dataType="double")
hist = HistoricalFeatures(lookups=[fs], fill_nulls=[fillnull])
online_store = OnlineStore(value=FeatureStoreFileOutput(path="/tmp/online_store"))
online_table = hist.using_latest().serve(
    target=online_store, ttl=timedelta(minutes=1)
)
print(online_table.get_features(keys=[user_one]))

In [None]:
abc = spark.read.table("comm_day").withColumnRenamed("day", "event_time")
online_table = OnlineFeatures(
    lookup_key=Entity(name="msisdn").get_or_create(), dataframe=abc
)
abc = online_table.get_features(keys=[{"msisdn": "05X5wBWKN3"}])
print(abc)