In [6]:
import polars as pl
import os
from dagster import EnvVar
from pyiceberg.catalog import load_catalog
from dagster_project.schemas import (
    cit0day_partition_spec,
    cit0day_sort_order,
    cit0day_polars_schema,
    cit0day_schema,
)
from dagster_aws.s3 import S3Resource

name = "default"
warehouse = EnvVar("NESSIE_WAREHOUSE").get_value()
branch = EnvVar("NESSIE_BRANCH").get_value()
uri = EnvVar("NESSIE_URI").get_value()
py_io_impl = "pyiceberg.io.pyarrow.PyArrowFileIO"
s3_endpoint = EnvVar(\
    "DESTINATION__FILESYSTEM__CREDENTIALS__AWS_S3_ENDPOINT"\
        ).get_value()
s3_access_key_id = EnvVar(
    "DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID"
).get_value()
s3_secret_access_key = EnvVar(
    "DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY"
).get_value()
catalog_type = "rest"
catalog = load_catalog(
            **{
                "warehouse": warehouse,
                "uri": f"{uri}/{branch}",
                "py-io-impl": py_io_impl,
                "s3.endpoint": s3_endpoint,
                "s3.access-key-id": s3_access_key_id,
                "s3.secret-access-key": s3_secret_access_key,
                "type": catalog_type,
            },
        )

print(branch)

feat-push-data-to-elastic


In [7]:
catalog.list_tables("staging")

[('staging', 'cit0day_password_files')]

In [10]:
table = catalog.drop_table("staging.cit0day_password_files")

In [11]:
nas_minio = S3Resource(
    aws_secret_access_key=os.getenv(
        "SOURCES__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY"
    ),
    aws_access_key_id=os.getenv("SOURCES__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID"),
    endpoint_url=os.getenv("SOURCES__FILESYSTEM__CREDENTIALS__AWS_S3_ENDPOINT"),
)

client = nas_minio.get_client()
obj = client.get_object(Bucket="raw", Key="parquets/12a71a2e-8faf-4273-a25f-301856dbd939.parquet")
df = pl.read_parquet(obj["Body"])

print(f"Polars shape {df.shape}")

pa_df = df.to_arrow()
print(f"Pyarrow shape: {pa_df.shape}")

catalog.create_table_if_not_exists(
        "staging.cit0day_password_files",
        schema=cit0day_schema,
        partition_spec=cit0day_partition_spec,
        sort_order=cit0day_sort_order
)

Polars shape (7695950, 8)
Pyarrow shape: (7695950, 8)


cit0day_password_files(
  1: email: optional string,
  2: username: optional string,
  3: email_domain: optional string,
  4: data: optional string,
  5: bucket: optional string,
  6: prefix: optional string,
  7: category: optional string,
  8: date: optional date
),
partition by: [category],
sort order: [1 ASC NULLS FIRST],
snapshot: null

In [12]:
table = catalog.load_table("staging.cit0day_password_files")
res = table.append(pa_df)

# Query the Data

In [13]:
table = catalog.load_table("staging.cit0day_password_files")

In [15]:
con = table.scan().to_duckdb(table_name="cit0day_password_files")
duck_df = pl.DataFrame(con.execute(
        """
            select 
            count(*) from cit0day_password_files
            --where category='Web Hosting'
        """
    ).fetch_arrow_table())
print(duck_df)

shape: (1, 1)
┌──────────────┐
│ count_star() │
│ ---          │
│ i64          │
╞══════════════╡
│ 7695950      │
└──────────────┘


In [24]:
from elasticsearch import Elasticsearch, helpers
import os 
# Replace with your Elasticsearch URL, username, and password
es_host = "https://elastic.local.reinthal.cc"  # or your Elasticsearch server address
api_key = os.getenv("ELASTIC_API_KEY")
index = os.getenv("ELASTIC_PASSWORDS_INDEX")

# Instantiate the Elasticsearch client
es = Elasticsearch(
    [es_host],
    api_key=api_key
)

# Optional: Check if the connection is successful
if es.ping():
    print("Elasticsearch cluster is up!")
else:
    print("Elasticsearch cluster is down!")

Elasticsearch cluster is up!


In [2]:
# Define the index name
index_name = os.getenv("ELASTIC_PASSWORDS_INDEX")

# Define the index mappings
mapping = {
    "mappings": {
        "properties": {
            "email": {
                "type": "text",  # or "keyword" if you want exact matches
            },
            "username": {
                "type": "text",
            },
            "email_domain": {
                "type": "text",  # or "keyword" if you want exact matches
            },
            "data": {
                "type": "text",
            },
            "bucket": {
                "type": "text",  # or "keyword" if you want exact matches
            },
            "prefix": {
                "type": "text",  # or "keyword" if you want exact matches
            },
            "category": {
                "type": "keyword",  # keyword will be better for exact matches
            },
            "date": {
                "type": "date",
            }
        }
    }
}

# Create the index
if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name, body=mapping)
    print(f"Index '{index_name}' created successfully!")
else:
    print(f"Index '{index_name}' already exists.")

Index 'passwords-dev' created successfully!


In [3]:
import polars as pl

In [5]:
pl.date(year=2020,month=11,day=4)

In [20]:
docs = df.to_dicts()

In [21]:
len(docs)

7695950

In [22]:
# Prepare actions for bulk insertion (no _id specified)
actions = [
    {
        "_index": "cit0day_index",  # Replace with your actual index name
        "_source": doc
    }
    for doc in docs
]

In [25]:
resp = helpers.bulk(es, actions)