In [1]:
import os
import boto3
import duckdb
import pandas as pd
os.chdir('../..')
display(f"Current root: {os.getcwd()}")

from etl.utils import s3_client

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None) # Adjust width to prevent line wrapping

'Current root: /home/wwh9345/WSL_Documents/GitHub/social-data-lakehouse-starter'

In [4]:
account_id = os.getenv('ACCOUNT_ID')
bronze_access_key_id = os.getenv('BRONZE_ACCESS_KEY_ID')
bronze_secret_access_key = os.getenv("BRONZE_SECRET_ACCESS_KEY")
endpoint = os.getenv('S3_ENDPOINT_URL')
bucket_name = os.getenv('BUCKET_BRONZE')
r2_prefix = f"r2://{bucket_name}"

# con = duckdb.connect()
con = duckdb.connect('dev.duckdb')  # persist secrets

create_secret = f"""
INSTALL httpfs; LOAD httpfs;

CREATE OR REPLACE PERSISTENT SECRET r2_bronze_profile (
  TYPE r2,
  PROVIDER credential_chain,
  CHAIN config,
  PROFILE 'default',
  ACCOUNT_ID '{account_id}',
  SCOPE 'r2://{bucket_name}',
  REGION 'auto'
);
"""

con.execute(create_secret).df()

Unnamed: 0,Success
0,True


In [8]:
# check existing progress / tables in db file 
con.execute("SHOW TABLES").df()

Unnamed: 0,name


In [7]:
# # show secrets
# con.execute("""FROM duckdb_secrets()""").df()

row = con.execute(
    "SELECT name FROM which_secret(?, 'r2')",
    [f"{r2_prefix}/"]).fetchone()

if row is None:
    raise RuntimeError(
        f"No DuckDB secret resolved for {bucket_name}. \
        Re-check creation of persistent R2 secret."
    )

# enable caches to speed up metadata calls
con.execute("PRAGMA enable_object_cache;")

# enable progress bar during loads
con.execute("PRAGMA enable_progress_bar;")

<duckdb.duckdb.DuckDBPyConnection at 0x7f36dec27db0>

In [None]:
# r2 = s3_client('bronze')
# list_objs = r2.list_objects(Bucket=bucket_name)
# [list_objs.get('Contents','')[i]['Key'] for i in range(len(list_objs.get('Contents', [])))]


['RC_2024_01to12_Showerthoughts_comments.zst',
 'RC_2024_01to12_explainlikeimfive_comments.zst',
 'test_pq_(direct_ZST-arrow_conv).parquet',
 'year=2008/month=01/rs-2008-01.zst',
 'year=2008/month=02/rs-2008-02.zst',
 'year=2008/month=03/rs-2008-03.zst',
 'year=2008/month=04/rs-2008-04.zst',
 'year=2008/month=05/rs-2008-05.zst',
 'year=2008/month=06/rs-2008-06.zst',
 'year=2008/month=07/rs-2008-07.zst',
 'year=2008/month=08/rs-2008-08.zst',
 'year=2008/month=09/rs-2008-09.zst',
 'year=2008/month=10/rs-2008-10.zst',
 'year=2008/month=11/rs-2008-11.zst',
 'year=2008/month=12/rs-2008-12.zst',
 'year=2009/month=01/rc-2009-01.zst',
 'year=2009/month=01/rs-2009-01.zst',
 'year=2009/month=02/rc-2009-02.zst',
 'year=2009/month=02/rs-2009-02.zst',
 'year=2009/month=03/rc-2009-03.zst',
 'year=2009/month=03/rs-2009-03.zst',
 'year=2009/month=04/rc-2009-04.zst',
 'year=2009/month=04/rs-2009-04.zst',
 'year=2009/month=05/rc-2009-05.zst',
 'year=2009/month=05/rs-2009-05.zst',
 'year=2009/month=06/rc

In [36]:
con.execute(
    """
    DESCRIBE SELECT * FROM read_parquet(?);
    """, [f"{r2_prefix}/*.parquet"]).df()

Unnamed: 0,column_name,column_type,null,key,default,extra
0,likes,VARCHAR,YES,,,
1,created,DOUBLE,YES,,,
2,approved_at_utc,VARCHAR,YES,,,
3,updated_on,BIGINT,YES,,,
4,mod_reason_by,VARCHAR,YES,,,
...,...,...,...,...,...,...
74,parent_id,VARCHAR,YES,,,
75,author_flair_type,VARCHAR,YES,,,
76,author,VARCHAR,YES,,,
77,treatment_tags,VARCHAR,YES,,,


In [47]:
# create 'raw' table
query = f"""
CREATE TABLE raw_comments AS
SELECT * FROM read_json_auto(?, compression='zstd');
"""

yyyy = '2008'
mm = '01'
df  = con.execute(query, [f"{r2_prefix}/year={yyyy}/month={mm}/rs-*.zst"]).df()
df

IOException: IO Error: Frame requires too much memory for decoding

In [38]:
df  = con.execute("""DESCRIBE RAW""").df()
df

Unnamed: 0,column_name,column_type,null,key,default,extra
0,likes,VARCHAR,YES,,,
1,created,DOUBLE,YES,,,
2,approved_at_utc,VARCHAR,YES,,,
3,updated_on,BIGINT,YES,,,
4,mod_reason_by,VARCHAR,YES,,,
...,...,...,...,...,...,...
74,parent_id,VARCHAR,YES,,,
75,author_flair_type,VARCHAR,YES,,,
76,author,VARCHAR,YES,,,
77,treatment_tags,VARCHAR,YES,,,


In [17]:
query = """
-- SUMMARIZE (SELECT * FROM raw);
SUMMARIZE (SELECT * FROM raw);
"""
df_overview = con.execute(query).df()
df_overview

Unnamed: 0,column_name,column_type,min,max,approx_unique,avg,std,q25,q50,q75,count,null_percentage
0,likes,VARCHAR,,,0,,,,,,300000,100.00
1,created,DOUBLE,1709251281.0,1715847992.0,297557,1712468637.31445,1879650.1978350743,1710871933.3357859,1712483512.4365,1714066896.5658803,300000,0.00
2,approved_at_utc,VARCHAR,,,0,,,,,,300000,100.00
3,updated_on,BIGINT,1709251311,1715934181,305148,1712469530.8698225,1879579.5339280388,1710871939,1712484628,1714066520,300000,0.00
4,mod_reason_by,VARCHAR,,,0,,,,,,300000,100.00
...,...,...,...,...,...,...,...,...,...,...,...,...
74,parent_id,VARCHAR,t1_c3dk28b,t3_tzejsj,133680,,,,,,300000,0.00
75,author_flair_type,VARCHAR,richtext,text,2,,,,,,300000,5.99
76,author,VARCHAR,---Corona_Virus---,zzzxxx0110,70107,,,,,,300000,0.00
77,treatment_tags,VARCHAR,[],[],1,,,,,,300000,0.00


In [None]:
con = duckdb.connect('dev.duckdb')  # persist secrets
con.execute("INSTALL httpfs; LOAD httpfs;")
con.execute(
    f"""CREATE OR REPLACE SECRET r2_credentials (
    TYPE r2, 
    KEY_ID '{bronze_access_key_id}', 
    SECRET '{bronze_secret_access_key}', 
    ACCOUNT_ID '{account_id}', 
    ENDPOINT '{endpoint}', 
    REGION 'auto', 
    URL_STYLE 'path'
    );
    """
    )

con.execute(
    f"""
    SELECT COUNT(*) FROM read_parquet('r2://{bucket_name}/*.parquet')
    """
    ).fetchall()


IOException: IO Error: Could not establish connection error for HTTP GET to '//b56d9b394bfd8d8dfdb99fd809fd6091.r2.cloudflarestorage.com/reddit-bronze/?encoding-type=url&list-type=2&prefix='

In [None]:
query =
"""
SELECT COUNT(*) FROM read_parquet('s3://your-bucket-name/*.parquet');
"""

con.execute(query)
# print(query.fetchall()) 

    # '''
    # SELECT * FROM read_zst('abfs://inventory/*.zst')
    # '''

# """SELECT * FROM read_parquet('r2://r2-bucket-name/file');"""
# """SELECT * FROM read_json_auto('my_data.json.zst', compression='zstd');"""

SyntaxError: invalid syntax (1531959628.py, line 1)

In [15]:
print("count the number of records in the bucket")
query = con.execute(
  f"""
  SELECT * FROM read_parquet(
  '{endpoint}/{bucket_name}/test_pq_(direct_ZST-arrow_conv).parquet'
  );
  """
)
# print(query.fetchall()) 

    # '''
    # SELECT * FROM read_zst('abfs://inventory/*.zst')
    # '''

# """SELECT * FROM read_parquet('r2://r2-bucket-name/file');"""
# """SELECT * FROM read_json_auto('my_data.json.zst', compression='zstd');"""

count the number of records in the bucket


HTTPException: HTTP Error: HTTP GET error on 'https://b56d9b394bfd8d8dfdb99fd809fd6091.r2.cloudflarestorage.com/reddit-bronze/test_pq_(direct_ZST-arrow_conv).parquet' (HTTP 400)

In [None]:
import re
m = re.search(r'(\d{4})[-_]?(\d{2})', f)
m.group(1), m.group(2)


NameError: name 'f' is not defined

In [20]:
con.execute("""
  SELECT subreddit, count(*) 
  FROM 's3://reddit-bronze/*/*/*.zst'
  GROUP BY subreddit
  LIMIT 100
""").df()

  # ORDER BY 2 DESC
  # -- FROM 'r2://reddit-bronze/year=2008/month=01/rs-2008-01.zst'

CatalogException: Catalog Error: Table with name s3://reddit-bronze/*/*/*.zst does not exist!
Did you mean "pg_description"?

LINE 3:   FROM 's3://reddit-bronze/*/*/*.zst'
               ^

In [None]:
# Connect to a DuckDB database (in-memory or persistent)
con = duckdb.connect(database=':memory:') # or 'my_database.duckdb' for persistent

con.execute("""
CREATE SECRET ( 
    TYPE r2, 
    KEY_ID 'AKIAIOSFODNN7EXAMPLE', 
    SECRET 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', 
    ACCOUNT_ID 'your-33-character-hexadecimal-account-ID' 
);
""")

# You can optionally make the secret persistent so it's loaded automatically on subsequent connections
# con.execute("""
# CREATE OR REPLACE SECRET my_r2_secret PERSISTENT (
#     TYPE r2,
#     KEY_ID 'AKIAIOSFODNN7EXAMPLE',
#     SECRET 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
#     ACCOUNT_ID 'your-33-character-hexadecimal-account-ID'
# );
# """)

# After creating the secret, you can query data from R2, for example:
# try:
#     data = con.execute("SELECT * FROM read_parquet('r2://your-r2-bucket-name/your-file.parquet');").fetchdf()
#     print(data)
# except Exception as e:
#     print(f"Error querying R2: {e}")

# Close the connection
con.close()

In [None]:
### NEED TO SETTLE DEPENDENCY CONFLICT ISSUE

# import s3fs
# fs = s3fs.S3FileSystem(
#         key=os.getenv('BRONZE_ACCESS_KEY_ID'),
#         secret=os.getenv('BRONZE_SECRET_ACCESS_KEY'),
#         client_kwargs={
#             'endpoint_url': os.getenv('S3_ENDPOINT_URL')
#         }
#     )
# fs

ModuleNotFoundError: No module named 'botocore.context'

In [None]:
fs

### sqlite example

In [None]:
import sqlite3
import pandas as pd

def create_connection(db_path):
    """
    Creates a connection to the SQLite database specified by the db_path.
    :param db_path: The path to the SQLite database file.
    :return: The connection object or None if an error occurs.
    """
    try:
        conn = sqlite3.connect(db_path)
        return conn
    except sqlite3.Error as e:
        print(e)
    return None

def execute_query(conn, query):
    """
    Executes the given SQL query using the provided connection.
    :param conn: The connection object to the SQLite database.
    :param query: The SQL query string to execute.
    :return: The cursor object after executing the query.
    """
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        return cursor
    except sqlite3.Error as e:
        print(e)
    return None

def load_results_as_dataframe(cursor):
    """
    Loads the results from a cursor object into a pandas DataFrame.
    :param cursor: The cursor object containing the query results.
    :return: A pandas DataFrame containing the query results.
    """
    columns = [description[0] for description in cursor.description]
    return pd.DataFrame(cursor.fetchall(), columns=columns)

def close_connection(conn):
    """
    Closes the connection to the SQLite database.
    :param conn: The connection object to the SQLite database.
    """
    conn.close()    