In [12]:
import polars as pl
import pandas as pd
import json

from typing import Any

In [13]:
ctx = pl.SQLContext()

In [14]:
df = pl.read_csv(
    "../Tables/accounts/*.csv",
    dtypes={
        "account_id": pl.Utf8,
        "customer_id": pl.Utf8,
        "created_at": pl.Datetime,
        "status": pl.Utf8,
        "account_branch": pl.Utf8,
        "account_check_digit": pl.Utf8,
        "account_number": pl.Utf8
    }
)

ctx.register("accounts", df)


<SQLContext [tables:1] at 0x7f3c7e46e750>

In [15]:
df = pl.read_csv(
    "../Tables/transfer_ins/*.csv",
    dtypes={
        "id": pl.Utf8,
        "account_id": pl.Utf8,
        "amount": pl.Float64,
        "transaction_requested_at": pl.Datetime,
        "transaction_completed_at": pl.Datetime,
        "status": pl.Utf8
    }
)

ctx.register("transfer_ins", df)

<SQLContext [tables:2] at 0x7f3c7e46e750>

In [16]:
df = pl.read_csv(
    "../Tables/transfer_outs/*.csv",
    dtypes={
        "id": pl.Utf8,
        "account_id": pl.Utf8,
        "amount": pl.Float64,
        "transaction_requested_at": pl.Datetime,
        "transaction_completed_at": pl.Datetime,
        "status": pl.Utf8
    }
)

ctx.register("transfer_outs", df)

<SQLContext [tables:3] at 0x7f3c7e46e750>

In [17]:
df = pl.read_csv(
    "../Tables/pix_movements/*.csv",
    dtypes={
        "id": pl.Utf8,
        "account_id": pl.Utf8,
        "in_or_out": pl.Utf8,
        "pix_amount": pl.Float64,
        "pix_requested_at": pl.Datetime,
        "pix_completed_at": pl.Datetime,
        "status": pl.Utf8
    }
)

ctx.register("pix_movements", df)

<SQLContext [tables:4] at 0x7f3c7e46e750>

In [18]:
def parse_investments(investments: list[dict[str, Any]]):
    for investment in investments:
        for transaction in investment["transactions"]:
            yield {
                "account_id": investment["account_id"],
                **transaction
            }

with open("../Tables/investments/investments_json.txt") as f:
    investments = json.load(f)

investments = list(parse_investments(investments))

pd.DataFrame(investments).to_csv("../Tables/investments/investments_transformed.csv")

In [26]:
df = pl.read_csv(
    "../Tables/investments/investments_transformed.csv",
    dtypes={
        "transaction_id": pl.Utf8,
        "account_id": pl.Utf8,
        "type": pl.Utf8,
        "amount": pl.Float64,
        "investment_requested_at": pl.Utf8,
        "investment_completed_at": pl.Utf8,
        "investment_completed_at_timestamp": pl.Datetime,
        "status": pl.Utf8
    }
)

ctx.register("investments", df)

<SQLContext [tables:6] at 0x7f3c7e46e750>

In [39]:
ctx.execute(
    """
    SELECT
        cast((investment_requested_at || '000') as timestamp) as investment_requested_at,
        cast((investment_completed_at || '000') as timestamp) as investment_completed_at,
        investment_completed_at_timestamp
    FROM investments
    order by investment_requested_at desc
    """
).collect()

investment_requested_at,investment_completed_at,investment_completed_at_timestamp
datetime[ms],datetime[ms],datetime[μs]
2020-12-31 23:57:21,2020-12-31 23:57:34,2020-12-31 23:57:34
2020-12-31 23:50:17,2020-12-31 23:50:34,2020-12-31 23:50:34
2020-12-31 23:47:57,2020-12-31 23:48:05,2020-12-31 23:48:05
2020-12-31 23:33:50,2020-12-31 23:33:51,2020-12-31 23:33:51
2020-12-31 23:04:49,2020-12-31 23:04:51,2020-12-31 23:04:51
2020-12-31 22:32:58,2020-12-31 22:32:59,2020-12-31 22:32:59
2020-12-31 21:40:28,2020-12-31 21:40:46,2020-12-31 21:40:46
2020-12-31 21:10:56,,
2020-12-31 20:59:22,2020-12-31 20:59:33,2020-12-31 20:59:33
2020-12-31 20:40:55,2020-12-31 20:41:02,2020-12-31 20:41:02
