In [4]:
from datetime import datetime
from init_azure_db import create_tables
from sqlalchemy.engine.base import Engine
import pandas as pd
from connect_mssql import connect_mssql, get_mssql_engine
from load_mssql import empty_table, load_table, migrate_table, load_json_data

In [5]:
pd.set_option("display.max_columns", 150, "display.width", 2000)

In [6]:
engine = get_mssql_engine()

In [7]:
df = load_json_data(company="recom")

# create tables

In [8]:
create_tables(schema="recom")

# create product series

In [9]:
def create_product_series_data(input_data: pd.DataFrame):
    result_data = pd.DataFrame(data={"name": input_data["product_series"].unique()})

    return result_data


In [49]:
def upsert_table(
    data: pd.DataFrame, 
    table_name: str,
    column_identifier: str | None = None,
    schema: str = "recom", 
    db_engine: Engine = engine
):

    # get existing products series
    df_series = pd.read_sql_table(table_name=table_name, schema=schema, con=db_engine)

    # init mask to filter values to be inserted
    insert_mask = pd.Series([True] * data.shape[0])

    if column_identifier:
        # Drop NaN values from the database dataframe
        df_series = df_series.dropna(subset=[column_identifier])
        
        # Create mask to identify non-NaN values in input data
        non_na_mask = ~(data[column_identifier].isna())
        
        # For non-NaN values, check if they exist in df_series
        exists_mask = data.loc[non_na_mask, column_identifier].isin(df_series[column_identifier])
        
        # Final mask: True for rows that should be inserted
        # - Either they have NaN in column_identifier (handled by second branch below)
        # - Or they have non-NaN value not present in df_series
        insert_mask = non_na_mask & ~exists_mask
        
        # Handle NaN case separately if needed - do you want to insert NaN values?
        # Uncomment this if you want to insert rows with NaN identifiers
        # na_mask = data[column_identifier].isna()
        # insert_mask = insert_mask | na_mask
        
        data_to_insert = data.loc[insert_mask].copy()
    else:
        data_diff = data.merge(df_series, how='left', indicator=True)
        data_to_insert = data_diff[data_diff['_merge'] == "left_only"].drop('_merge', axis=1)
        
    load_table(data=data_to_insert, table_name=table_name, db_engine=db_engine, schema_name=schema)

    return data_to_insert.shape

In [52]:
upsert_table(data=create_product_series_data(df), table_name="product_series", column_identifier="name")

(0, 1)

# create certifications

In [53]:
def create_certifications_data(input_data: pd.DataFrame) -> pd.DataFrame:

    result = pd.DataFrame(data={"name": input_data["certifications"].explode().unique()})
    result["name"] = result["name"].str.strip()
    result["name"] = result["name"].str.lower()
    
    return result.drop_duplicates(subset="name")

In [54]:
t = create_certifications_data(df)

In [55]:
t = t.dropna(subset='name')

In [56]:
t.isna().sum()

name    0
dtype: int64

In [57]:
upsert_table(data=t, table_name="certifications", column_identifier="name")

(0, 1)

# create protections

In [58]:
def create_protections_data(input_data: pd.DataFrame) -> pd.DataFrame:
    result_data = pd.DataFrame(data={"name": input_data["protections"].explode().unique()})
    result_data["name"] = result_data["name"].str.strip()
    result_data["name"] = result_data["name"].str.lower()
    result_data = result_data.drop_duplicates(subset="name")

    return result_data.loc[~result_data["name"].isna()]

In [59]:
upsert_table(data=create_protections_data(df), table_name="protections", column_identifier="name")

(0, 1)

In [60]:
df.columns

Index(['product_series', 'part_number', 'converter_type', 'ac_voltage_input_min', 'ac_voltage_input_max', 'dc_voltage_input_min', 'dc_voltage_input_max', 'input_voltage_tolerance', 'power', 'is_regulated', 'regulation_voltage_range', 'efficiency', 'isolation_test_voltage', 'voltage_output_1', 'voltage_output_2', 'voltage_output_3', 'i_out1', 'i_out2', 'i_out3', 'output_type', 'pins', 'package', 'packaging_type', 'dimensions', 'certifications', 'protections', 'operating_temperature', 'power_derating', 'company'], dtype='object')

# create converters

In [61]:
def create_converters_data(input_data: pd.DataFrame, company: str, schema: str="recom", db_engine: Engine=engine) -> pd.DataFrame:
    result = df.copy()

    product_series_df = pd.read_sql_table(table_name="product_series", schema=schema, con=db_engine)
    product_series_df = product_series_df.rename(columns={"id": "product_series_id"})

    # join the product series table to get DB id from schema
    result = result.merge(product_series_df, left_on="product_series", right_on="name", how="left")
    # drop redundant name col
    result = result.drop(columns=["product_series", "name"])
    
    result["pin_count"] = result["pins"].map(len)

    for k in ["mounting_type", "connection_type"]:
        result[k] = result["package"].map(lambda x: x.get(k) if x is not None else None)
    
    for k in ["unit", "length", "width", "height"]:
        result[f"dimensions_{k}"] = result["dimensions"].map(lambda x: x.get(k) if x is not None else None)    
    
    for k in ["min", "max"]:
        result[f"operating_temp_{k}"] = result["operating_temperature"].map(lambda x: x.get(k) if x is not None else None)
    
    result['company'] = company

    result["created_at"] = datetime.now()
    result["updated_at"] = datetime.now()

    res_columns = [
        "company", 
        "product_series_id",
        "part_number",
        "converter_type",
        "ac_voltage_input_min",
        "ac_voltage_input_max",
        "dc_voltage_input_min",
        "dc_voltage_input_max",
        "input_voltage_tolerance",
        "power",
        "is_regulated",
        "regulation_voltage_range",
        "efficiency",
        "voltage_output_1",
        "voltage_output_2",
        "voltage_output_3",
        "i_out1",
        "i_out2",
        "i_out3",
        "output_type",
        "pin_count",
        "mounting_type",
        "connection_type",
        "dimensions_unit",
        "dimensions_length",
        "dimensions_width",
        "dimensions_height",
        "operating_temp_min",
        "operating_temp_max",
        "created_at",
        "updated_at"
    ]
        

    return result[res_columns].copy().drop_duplicates(subset="part_number")

    

In [62]:
t = create_converters_data(df, company='recom')

In [63]:
upsert_table(data=t, table_name="converters", column_identifier="part_number", schema="recom")

(0, 31)

# helper functions f. normalization

In [64]:
def expand_list_of_dicts(df, column_name):

    # Explode column with lists of dicts
    exploded_df = df.explode(column_name)
    
    # Normalize the dictionaries into columns and drop
    normalized = pd.json_normalize(exploded_df[column_name])
    exploded_df = exploded_df.drop(column_name, axis=1)
    
    # Concatenate original df with the normalized dictionaries
    result = pd.concat([exploded_df.reset_index(drop=True), normalized], axis=1)
    
    return result

In [65]:
def map_converter_id(
    input_data: pd.DataFrame, 
    db_engine: Engine = engine,
    mapping_column_name: str = "part_number",
    schema_name: str = "recom"
) -> pd.DataFrame:

    mapping_df = pd.read_sql_table(table_name='converters', con=engine, schema=schema_name)

    result = input_data.merge(mapping_df[["part_number", "id"]], left_on=mapping_column_name, right_on="part_number", how="left")
    result = result.copy()
    # result = result.rename(columns={"id", "converter_id"})

    return result.rename(columns={"id": "converter_id"}).copy()

# isolation test

In [66]:
t = expand_list_of_dicts(df[["isolation_test_voltage", "part_number"]], "isolation_test_voltage")

In [67]:
t = map_converter_id(t)

In [68]:
t.rename(columns={"id": "converter_id"}).copy()

Unnamed: 0,part_number,duration_sec,unit,voltage,converter_id
0,REM1-0505S,60.0,VDC,5200,1
1,REM1-0505S,60.0,VAC,4000,1
2,REM1-0512S,60.0,VDC,5200,2
3,REM1-053.3S,60.0,VDC,5200,3
4,REM1-053.3S,60.0,VAC,4000,3
...,...,...,...,...,...
500,RAC05-05SK/277/W,60.0,VAC,4200,418
501,RAC05-12SK/277/W,60.0,VAC,4200,419
502,RAC05-15SK/277/W,60.0,VAC,4200,420
503,RAC05-24SK/277/W,60.0,VAC,4200,421


In [69]:
def create_isolation_tests_data(input_data: pd.DataFrame) -> pd.DataFrame:

    result = expand_list_of_dicts(input_data[["isolation_test_voltage", "part_number"]], "isolation_test_voltage")
    result = map_converter_id(result)

    return result[["converter_id", "duration_sec", "unit", "voltage"]].copy()

In [71]:
upsert_table(data=create_isolation_tests_data(df), table_name="isolation_tests")

(0, 5)

# pins

In [73]:
def create_pins_data(input_data: pd.DataFrame) -> pd.DataFrame:

    result = expand_list_of_dicts(input_data[["part_number", "pins"]], "pins")
    result = map_converter_id(result)

    result = result.rename(columns={"type": "pin_type"})
    result = result.drop(columns=["part_number"])
    result = result.copy()
    
    return result[["converter_id", "pin_id", "pin_type"]]

In [78]:
empty_table(table_name='pins', schema_name="recom")

In [79]:
upsert_table(data=create_pins_data(df), table_name="pins")

(2597, 4)

# power derating

In [80]:
df.columns

Index(['product_series', 'part_number', 'converter_type', 'ac_voltage_input_min', 'ac_voltage_input_max', 'dc_voltage_input_min', 'dc_voltage_input_max', 'input_voltage_tolerance', 'power', 'is_regulated', 'regulation_voltage_range', 'efficiency', 'isolation_test_voltage', 'voltage_output_1', 'voltage_output_2', 'voltage_output_3', 'i_out1', 'i_out2', 'i_out3', 'output_type', 'pins', 'package', 'packaging_type', 'dimensions', 'certifications', 'protections', 'operating_temperature', 'power_derating', 'company'], dtype='object')

In [83]:
df['power_derating'].values[0]

[{'threshold': {'temperature': 85, 'unit': 'C'}, 'unit': '%', 'rate': 0.0}]

In [96]:
def create_derating_data(input_data: pd.DataFrame) -> pd.DataFrame:

    result = expand_list_of_dicts(input_data[['power_derating', 'part_number']], 'power_derating')
    result = map_converter_id(result)
    result = result.drop(columns=["part_number"])

    result = result.rename(columns={"threshold.temperature": "threshold_temperature", "threshold.unit": "threshold_unit"})

    result = result[["converter_id", "threshold_temperature", "threshold_unit", "unit", "rate"]].copy()

    return result

In [98]:
upsert_table(data=create_derating_data(df), table_name="power_derating")

(0, 6)