### Polars Version

In [1]:
import polars as pl
import hashlib

# Function to convert text to SHA-256 hash
def text_to_sha256(text):
    # Ensure the input is converted to string and encoded to bytes
    return hashlib.sha256(str(text).encode('utf-8')).hexdigest()

# Initialize the DataFrame
df = pl.DataFrame({
    'Hash_Id': [1, 2, 3],
    'Cleaned_Claim': ['A', 'B', 'C']
})

# Apply the hashing function to the 'Hash_Id' column
df_hashed = df.with_columns(
    df['Hash_Id'].map_elements(text_to_sha256, return_dtype=pl.Utf8).alias('Hash_Id_hashed')
)

# Print the resulting DataFrame
print("DataFrame with Hashed Unique IDs:")
print(df_hashed)

DataFrame with Hashed Unique IDs:
shape: (3, 3)
┌─────────┬───────────────┬─────────────────────────────────┐
│ Hash_Id ┆ Cleaned_Claim ┆ Hash_Id_hashed                  │
│ ---     ┆ ---           ┆ ---                             │
│ i64     ┆ str           ┆ str                             │
╞═════════╪═══════════════╪═════════════════════════════════╡
│ 1       ┆ A             ┆ 6b86b273ff34fce19d6b804eff5a3f… │
│ 2       ┆ B             ┆ d4735e3a265e16eee03f59718b9b5d… │
│ 3       ┆ C             ┆ 4e07408562bedb8b60ce05c1decfe3… │
└─────────┴───────────────┴─────────────────────────────────┘


In [6]:
import polars as pl

# Initialize sample DataFrames
df = pl.DataFrame({
    'Hash_Id': [1, 2, 3,5],
    'Cleaned_Claim': ['Claim A', 'Claim B', 'Claim C', 'Claim E'],
    'FTC_Label': ['Label A', 'Label B', 'Label C', 'Label E']
})

df_new = pl.DataFrame({
    'Hash_Id': [1, 2, 3, 4,6],
    'Cleaned_Claim': ['Claim A','Claim B_updated', 'Claim C_updated', 'Claim D_new', 'Claim F_new'],
    'FTC_Label': ['Label A','Label B_updated', 'Label C_updated', 'Label D_new', 'Label F_new']
})

# Use 'full' join to merge DataFrames on 'Hash_Id'
merged_df = df.join(df_new, on='Hash_Id', how='full', suffix='_new')

# df = merged_df.with_columns(
#     pl.when(pl.col("Hash_Id_new").is_null())
#     .then(pl.col("Hash_Id"))
#     .otherwise(pl.col("Hash_Id_new"))
#     .alias('Hash_Id'),

#     pl.when(pl.col("Cleaned_Claim_new").is_null())
#     .then(pl.col("Cleaned_Claim"))
#     .otherwise(pl.col("Cleaned_Claim_new"))
#     .alias('Cleaned_Claim')
# ).select(['Hash_Id', 'Cleaned_Claim'])

result_df = merged_df.select([
    pl.when(pl.col('Hash_Id').is_not_null())
      .then(pl.col('Hash_Id'))
      .otherwise(pl.col('Hash_Id_new'))
      .alias('Hash_Id'),

    pl.when(pl.col('Cleaned_Claim').is_not_null())
      .then(pl.col('Cleaned_Claim'))
      .otherwise(pl.col('Cleaned_Claim_new'))
      .alias('Cleaned_Claim'),

    pl.when(pl.col('FTC_Label').is_not_null())
      .then(pl.col('FTC_Label'))
      .otherwise(pl.col('FTC_Label_new'))
      .alias('FTC_Label')
])


# Print the merged DataFrame
print("Merged DataFrame:")
print(result_df)

# updated_rows_filter = (merged_df['Cleaned_Claim'] != merged_df['Cleaned_Claim_new']) & merged_df['Cleaned_Claim_new'].is_not_null()
# updated_rows = updated_rows_filter.sum()
# print("Number of updated rows:", updated_rows)
# inserted_rows = df_new.select('Hash_Id').join(df, on='Hash_Id', how='anti').shape[0]
# print(f"Number of inserted rows: {inserted_rows}")

Merged DataFrame:
shape: (6, 3)
┌─────────┬───────────────┬─────────────┐
│ Hash_Id ┆ Cleaned_Claim ┆ FTC_Label   │
│ ---     ┆ ---           ┆ ---         │
│ i64     ┆ str           ┆ str         │
╞═════════╪═══════════════╪═════════════╡
│ 1       ┆ Claim A       ┆ Label A     │
│ 2       ┆ Claim B       ┆ Label B     │
│ 3       ┆ Claim C       ┆ Label C     │
│ 4       ┆ Claim D_new   ┆ Label D_new │
│ 6       ┆ Claim F_new   ┆ Label F_new │
│ 5       ┆ Claim E       ┆ Label E     │
└─────────┴───────────────┴─────────────┘


In [None]:
# pl.__version__

'1.14.0'

In [9]:
import polars as pl

# Suppose df1 and df2 are your DataFrames
df1 = pl.DataFrame({
    "A": [1, 2, 3],
    "B": ["a", "b", "c"]
})

df2 = pl.DataFrame({
    "A": [3, 1, 2],
    "B": ["c", "a", "b"]
})

# Sort both dataframes by all columns
sorted_df1 = df1.sort(by=df1.columns)
sorted_df2 = df2.sort(by=df2.columns)

# Check if the sorted DataFrames are equal
are_equal = sorted_df1.equals(sorted_df2)

# Output the result
print("The DataFrames are equal up to row permutations:", are_equal)

# pl.testing.assert_frame_equal(sorted_df1, sorted_df2)

The DataFrames are equal up to row permutations: True


### Pandas Upsert

In [None]:
import pandas as pd

# Initialize the DataFrames
df = pd.DataFrame({
    'Hash_Id': [1, 2, 3],
    'Cleaned_Claim': ['A', 'B', 'C']
})

df_new = pd.DataFrame({
    'Hash_Id': [2, 3, 4],
    'Cleaned_Claim': ['B_updated', 'C_updated', 'D']
})

# Set 'Hash_Id' as the index for both DataFrames
df.set_index('Hash_Id', inplace=True)
df_new.set_index('Hash_Id', inplace=True)

# Copy the original DataFrame to use for the updated row count
original_df = df.copy()

# Update the DataFrame (UPSERT existing rows)
df.update(df_new)

# Identify updated rows by comparing the Cleaned_Claims
updated_rows = df_new.index.intersection(original_df.index).size

# Combine the DataFrames (this will add new rows)
df_combined = df.combine_first(df_new)

# Count the number of inserted rows by finding new indices
inserted_rows = df_combined.index.difference(original_df.index).size

# Reset index to make 'Hash_Id' a column again
df_combined.reset_index(inplace=True)

# Print results
# print(f"Updated rows: {updated_rows}")
print(f"{df_new.index.intersection(original_df.index.to_list())=}")
print(f"Inserted rows: {inserted_rows}")
print("Final DataFrame:")
print(df_combined)

In [None]:
import pandas as pd

# Initialize the DataFrames
df = pd.DataFrame({
    'Hash_Id': [1, 2, 3],
    'Cleaned_Claim': ['A', 'B', 'C']
})

df_new = pd.DataFrame({
    'Hash_Id': [2, 3, 4],
    'Cleaned_Claim': ['B_updated', 'C_updated', 'D']
})

# Set 'Hash_Id' as the index for both DataFrames
df.set_index('Hash_Id', inplace=True)
df_new.set_index('Hash_Id', inplace=True)

# Copy the original DataFrame to use for the updated row count
original_df = df.copy()

# Update the DataFrame (UPSERT existing rows)
df.update(df_new)

# Identify updated rows by comparing the Cleaned_Claims
# updated_rows = (df.loc[df_new.index.intersection(original_df.index)] != original_df).any(axis=1).sum()

# Combine the DataFrames (this will add new rows)
df_combined = df.combine_first(df_new)

# Count the number of inserted rows by finding new indices
inserted_rows = df_combined.index.difference(original_df.index).size

# Reset index to make 'Hash_Id' a column again
df_combined.reset_index(inplace=True)

# Print results
# print(f"Updated rows: {updated_rows}")
print(f"Inserted rows: {inserted_rows}")
print("Final DataFrame:")
print(df_combined)

In [None]:
import pandas as pd

# Initialize sample DataFrames
df = pd.DataFrame({
    'Hash_Id': [1, 2, 3],
    'Cleaned_Claim': ['A', 'B', 'C']
})

df_new = pd.DataFrame({
    'Hash_Id': [2, 3, 4],
    'Cleaned_Claim': ['B_updated', 'C_updated', 'D']
})

# Perform UPSERT operation
df.set_index('Hash_Id', inplace=True)
df_new.set_index('Hash_Id', inplace=True)

# Keep a copy of the original DataFrame for comparison
original_df = df.copy()

# Update existing rows
df.update(df_new)

# Count the updated rows
updated_rows = (df != original_df).any(axis=1).sum()

# Append new rows
df = df.combine_first(df_new)

# Reset index to ensure Hash_Id is a column
df.reset_index(inplace=True)

# Print the number of updated rows
print("Number of updated rows:", updated_rows)

# Print the final DataFrame
print("Upserted DataFrame:")
print(df)

In [None]:
import polars as pl

# Initialize sample DataFrames
df = pl.DataFrame({
    'Hash_Id': [1, 2, 3],
    'Cleaned_Claim': ['A', 'B', 'C']
})

df_new = pl.DataFrame({
    'Hash_Id': [2, 3, 4],
    'Cleaned_Claim': ['B_updated', 'C_updated', 'D']
})

# Perform a full join on 'Hash_Id' to identify and update existing rows
joined_df = df.join(df_new, on='Hash_Id', how='full', suffix='_new')

# Determine updated rows where the 'Cleaned_Claim' in df is different from 'df_new'
updated_rows_filter = (joined_df['Cleaned_Claim'] != joined_df['Cleaned_Claim_new']) & joined_df['Cleaned_Claim_new'].is_not_null()
updated_rows = updated_rows_filter.sum()

# Update Cleaned_Claims where available
df_updated = joined_df.with_columns([
    pl.when(joined_df['Cleaned_Claim_new'].is_null())
    .then(joined_df['Cleaned_Claim'])
    .otherwise(joined_df['Cleaned_Claim_new'])
    .alias('final_Cleaned_Claim')
])

# Select columns and resolve renaming
df_upserted = df_updated.select(['Hash_Id', 'final_Cleaned_Claim']).rename({'final_Cleaned_Claim': 'Cleaned_Claim'})

# Count the number of inserted rows
# Inserted rows are determined by rows in df_new not appearing in the original df
inserted_rows = df_new.select('Hash_Id').join(df, on='Hash_Id', how='anti').shape[0]

# Print the counts and the final DataFrame
print(f"Number of updated rows: {updated_rows}")
print(f"Number of inserted rows: {inserted_rows}")
print("Upserted DataFrame:")
print(df_upserted)

In [None]:
import polars as pl

# Initialize sample DataFrames
df = pl.DataFrame({
    'Hash_Id': [1, 2, 3],
    'Cleaned_Claim': ['A', 'B', 'C']
})

df_new = pl.DataFrame({
    'Hash_Id': [2, 3, 4],
    'Cleaned_Claim': ['B_updated', 'C_updated', 'D']
})

# Perform a full join on 'Hash_Id' to identify and update existing rows
joined_df = df.join(df_new, on='Hash_Id', how='full', suffix='_new')

# Update Cleaned_Claims where available
df_updated = joined_df.with_columns([
    pl.when(joined_df['Cleaned_Claim_new'].is_null())
    .then(joined_df['Cleaned_Claim'])
    .otherwise(joined_df['Cleaned_Claim_new'])
    .alias('final_Cleaned_Claim')
])

# Select the resultant columns and correct the index
df_upserted = df_updated.select(['Hash_Id', 'final_Cleaned_Claim']).rename({'final_Cleaned_Claim': 'Cleaned_Claim'})

# Set 'Hash_Id' as a column instead of allowing it to become an index by avoiding any drop errors
df_upserted = df_upserted.filter(df_upserted['Hash_Id'].is_not_null())

# Determine updated rows where the 'Cleaned_Claim' in df is different from 'df_new'
updated_rows_filter = (joined_df['Cleaned_Claim'] != joined_df['Cleaned_Claim_new']) & joined_df['Cleaned_Claim_new'].is_not_null()
updated_rows = updated_rows_filter.sum()

# Count the number of inserted rows
inserted_rows = df_new.select('Hash_Id').join(df, on='Hash_Id', how='anti').shape[0]

# Print the counts and the final DataFrame
print(f"Number of updated rows: {updated_rows}")
print(f"Number of inserted rows: {inserted_rows}")
print("Upserted DataFrame:")
print(df_upserted)

In [None]:
import polars as pl

# Initialize sample DataFrames
df = pl.DataFrame({
    'Hash_Id': [1, 2, 3],
    'Cleaned_Claim': ['A', 'B', 'C']
})

df_new = pl.DataFrame({
    'Hash_Id': [2, 3, 4],
    'Cleaned_Claim': ['B_updated', 'C_updated', 'D']
})

# Perform upsert operation in a more concise way
df_upserted = (
    df.join(
        df_new,
        on='Hash_Id',
        how='outer'
    ).with_columns(
        pl.coalesce('Cleaned_Claim_right', 'Cleaned_Claim').alias('Cleaned_Claim')
    ).select(['Hash_Id', 'Cleaned_Claim'])
)

# Calculate metrics
updated_rows = (
    df.join(df_new, on='Hash_Id')
    .filter(pl.col('Cleaned_Claim') != pl.col('Cleaned_Claim_right'))
    .shape[0]
)

inserted_rows = df_new.join(df, on='Hash_Id', how='anti').shape[0]

# Print results
print(f"Number of updated rows: {updated_rows}")
print(f"Number of inserted rows: {inserted_rows}")
print("Upserted DataFrame:")
print(df_upserted)

In [None]:
import polars as pl

# Initialize sample DataFrames
df = pl.DataFrame({
    'Hash_Id': [1, 2, 3],
    'Cleaned_Claim': ['A', 'B', 'C']
})

df_new = pl.DataFrame({
    'Hash_Id': [2, 3, 4],
    'Cleaned_Claim': ['B_updated', 'C_updated', 'D']
})

# Perform upsert operation using how='full'
df_upserted = (
    df.join(
        df_new,
        on='Hash_Id',
        how='full'
    ).with_columns(
        pl.coalesce('Cleaned_Claim_right', 'Cleaned_Claim').alias('Cleaned_Claim')
    ).select(['Hash_Id', 'Cleaned_Claim'])
)

# Calculate metrics
updated_rows = (
    df.join(df_new, on='Hash_Id')
    .filter(pl.col('Cleaned_Claim') != pl.col('Cleaned_Claim_right'))
    .shape[0]
)

inserted_rows = df_new.join(df, on='Hash_Id', how='anti').shape[0]

# Print results
print(f"Number of updated rows: {updated_rows}")
print(f"Number of inserted rows: {inserted_rows}")
print("Upserted DataFrame:")
print(df_upserted)


In [None]:
import polars as pl

# Initialize sample DataFrames
df = pl.DataFrame({
    'Hash_Id': [1, 2, 3],
    'Cleaned_Claim': ['A', 'B', 'C']
})

df_new = pl.DataFrame({
    'Hash_Id': [2, 3, 4],
    'Cleaned_Claim': ['B_updated', 'C_updated', 'D']
})

# Perform upsert operation with proper indexing
df_upserted = (
    df.join(
        df_new,
        on='Hash_Id',
        how='full'
    ).with_columns(
        pl.coalesce('Cleaned_Claim_right', 'Cleaned_Claim').alias('Cleaned_Claim')
    ).select(['Hash_Id', 'Cleaned_Claim'])
    .sort('Hash_Id')  # Add sorting to maintain index order
)

# Calculate metrics
updated_rows = (
    df.join(df_new, on='Hash_Id')
    .filter(pl.col('Cleaned_Claim') != pl.col('Cleaned_Claim_right'))
    .shape[0]
)

inserted_rows = df_new.join(df, on='Hash_Id', how='anti').shape[0]

# Print results
print(f"Number of updated rows: {updated_rows}")
print(f"Number of inserted rows: {inserted_rows}")
print("Upserted DataFrame:")
print(df_upserted)


### [Upsert and Merge with Delta Lake Tables in Python Polars](https://stuffbyyuki.com/upsert-and-merge-with-delta-lake-tables-in-python-polars/)

In [None]:
import polars as pl

df = pl.DataFrame(
    {
        'key': [1, 2, 3],
        'letter': ['a', 'b', 'c'],
        'Cleaned_Claim': [100, 200, 300]
    }
)
print(df)

In [None]:
output_table_path = './my_delta_lake_table'

df.write_delta(output_table_path, mode='overwrite')
print('The target Delta Lake table output:\n', pl.read_delta(output_table_path))


In [None]:
df_col_updated_and_row_deleted = (
    df
    .with_columns(
        pl.when(pl.col('letter')=='c')  # update a column 
        .then(pl.lit('d'))
        .otherwise(pl.col('letter'))  
        .alias('letter')
    )
    .filter(pl.col('key') != 1)  # delete a row 
)

df_with_changes = (
    pl.concat(
        [
            df_col_updated_and_row_deleted,
            pl.DataFrame({'key': 4, 'letter': 'd', 'Cleaned_Claim': 400})  # a new row
        ],
        how='vertical'
    )
)
print(df_with_changes)

In [None]:
(
    df_with_changes
    .write_delta(
        output_table_path,
        mode='merge',
        delta_merge_options={
            'predicate': 'source.key = target.key',
            'source_alias': 'source',
            'target_alias': 'target',
        },
    )
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .when_not_matched_by_source_delete()
    .execute()
)  

In [None]:
CREATE VOLATILE TABLE result_table, MULTISET AS (
    SELECT customer_id, 
           SUM(order_amount) as total_spent
    FROM orders 
    WHERE order_date >= DATE '2024-01-01'
    GROUP BY customer_id
) WITH DATA
PRIMARY INDEX (customer_id)
ON COMMIT PRESERVE ROWS;


In [None]:
# SELECT TO_CHAR(timestamp_column, 'YYYY-MM-DD HH:MI:SS') FROM table_name;


### Pandas Fastload

In [None]:
import teradatasql
import pandas as pd

# Create sample DataFrame
df = pd.DataFrame({
    'Hash_Id': ['value1', 'value2', 'value3'],
    'Clean_Claim': ['text1', 'text2', 'text3'],
    'FTC_Label': ['data1', 'data2', 'data3']
})

# Connection parameters
conn_params = {
    'host': 'your_host',
    'user': 'your_user',
    'password': 'your_password'
}

create_tables_sql = """
CREATE MULTISET TABLE table_name (
    Hash_Id CHAR(128),
    Clean_Claim VARCHAR(80),
    FTC_Label VARCHAR(20)
) PRIMARY INDEX (Hash_Id);

CREATE MULTISET TABLE error_table_1 (
    Hash_Id CHAR(128),
    Clean_Claim VARCHAR(80),
    FTC_Label VARCHAR(20),
    error_code INTEGER,
    error_desc VARCHAR(256)
) PRIMARY INDEX (Hash_Id);

CREATE MULTISET TABLE error_table_2 (
    Hash_Id CHAR(128),
    Clean_Claim VARCHAR(80),
    FTC_Label VARCHAR(20),
    error_code INTEGER,
    error_desc VARCHAR(256)
) PRIMARY INDEX (Hash_Id);
"""

with teradatasql.connect(**conn_params) as conn:
    with conn.cursor() as cur:
        # Create tables
        for stmt in create_tables_sql.split(';'):
            if stmt.strip():
                cur.execute(stmt)
        
        # FastLoad setup
        cur.execute("SESSIONS 8")
        cur.execute("ERRLIMIT 50")
        cur.execute("""
            BEGIN LOADING table_name
            ERRORFILES error_table_1, error_table_2;
        """)
        cur.execute("SET RECORD VARTEXT ','")
        cur.execute("""
            DEFINE Hash_Id (CHAR(128)),
                   Clean_Claim (VARCHAR(80)),
                   FTC_Label (VARCHAR(20))
        """)
        cur.execute("""
            INSERT INTO table_name (Hash_Id, Clean_Claim, FTC_Label)
            VALUES (:Hash_Id, :Clean_Claim, :FTC_Label)
        """)
        
        # Load data from DataFrame using plain tuples
        for row in df.itertuples(index=False, name=None):
            cur.execute("INSERT INTO table_name VALUES (?, ?, ?)", row)
        
        # End loading
        cur.execute("END LOADING")


### Polars FastLoad

In [None]:
import teradatasql
import polars as pl

# Create sample DataFrame
df = pl.DataFrame({
    'Hash_Id': ['value1', 'value2', 'value3'],
    'Clean_Claim': ['text1', 'text2', 'text3'],
    'FTC_Label': ['data1', 'data2', 'data3']
})

# Connection parameters
conn_params = {
    'host': 'your_host',
    'user': 'your_user',
    'password': 'your_password'
}

create_tables_sql = """
CREATE MULTISET TABLE table_name (
    Hash_Id CHAR(128),
    Clean_Claim VARCHAR(80),
    FTC_Label VARCHAR(20)
) PRIMARY INDEX (Hash_Id);

CREATE MULTISET TABLE error_table_1 (
    Hash_Id CHAR(128),
    Clean_Claim VARCHAR(80),
    FTC_Label VARCHAR(20),
    error_code INTEGER,
    error_desc VARCHAR(256)
) PRIMARY INDEX (Hash_Id);

CREATE MULTISET TABLE error_table_2 (
    Hash_Id CHAR(128),
    Clean_Claim VARCHAR(80),
    FTC_Label VARCHAR(20),
    error_code INTEGER,
    error_desc VARCHAR(256)
) PRIMARY INDEX (Hash_Id);
"""

with teradatasql.connect(**conn_params) as conn:
    with conn.cursor() as cur:
        # Create tables
        for stmt in create_tables_sql.split(';'):
            if stmt.strip():
                cur.execute(stmt)
        
        # FastLoad setup
        cur.execute("SESSIONS 8")
        cur.execute("ERRLIMIT 50")
        cur.execute("""
            BEGIN LOADING table_name
            ERRORFILES error_table_1, error_table_2;
        """)
        cur.execute("SET RECORD VARTEXT ','")
        cur.execute("""
            DEFINE Hash_Id (CHAR(128)),
                   Clean_Claim (VARCHAR(80)),
                   FTC_Label (VARCHAR(20))
        """)
        cur.execute("""
            INSERT INTO table_name (Hash_Id, Clean_Claim, FTC_Label)
            VALUES (:Hash_Id, :Clean_Claim, :FTC_Label)
        """)
        
        # Load data from DataFrame using Polars iteration
        for row in df.iter_rows():
            cur.execute("INSERT INTO table_name VALUES (?, ?, ?)", row)
        
        # End loading
        cur.execute("END LOADING")
