<a href="https://colab.research.google.com/github/ksachtleben/AFSSH/blob/main/rd_pss/r%26d_product_selection_software_cdu_curve_coefficients.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install apache-beam[gcp,interactive,dataframe]
!pip install pandas
!pip install gcsfs
!pip install decimal

In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners import DataflowRunner
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from decimal import Decimal
import apache_beam.runners.interactive.interactive_beam as ib
import logging
import datetime
import re

import os
import json
from google.cloud import bigquery
import pandas as pd


# Google Auth
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

# GCP Project
os.environ["GOOGLE_CLOUD_PROJECT"]= 'nidec-ga'

Authenticated


In [3]:
def select_columns(row, columns):
    return {column: row[column] for column in columns if column in row}


class DropColumnsTechnical(beam.DoFn):
    def __init__(self, cols_to_drop):
        self.cols_to_drop = cols_to_drop

    def process(self, element):
        for col in self.cols_to_drop:
            element.pop(col, None)
        yield element


class ConvertColumnToLower(beam.DoFn):
    def __init__(self, column_name):
        self.column_name = column_name

    def process(self, element):
        for column in self.column_name:
            if column in element and isinstance(element[column], str):
              element[column] = element[column].lower()

        yield element


class ReplacePatternsStartFn(beam.DoFn):
    def __init__(self, columns, pattern, replacement):
        self.columns = columns
        self.pattern = pattern
        self.replacement = replacement

    def process(self, element):
        import re
        for column in self.columns:
            if column in element and re.match(self.pattern, element[column]):
                element[column] = re.sub(self.pattern, self.replacement, element[column])
        yield element


class ColumnsToStringConverter(beam.DoFn):
    def __init__(self, columns_to_string):
        self.columns_to_string = columns_to_string

    def process(self, element):
        for column in self.columns_to_string:
           if column in element and isinstance(element[column], (int, float)):
              try:
                  element[column] = str(element[column])
              except ValueError:
                pass
        yield element

class ColumnsToFloatConverter(beam.DoFn):
    def __init__(self, columns_to_float):
        self.columns_to_float = columns_to_float

    def process(self, element):
        for column in self.columns_to_float:
            if column in element and isinstance(element[column], str):
                try:
                  element[column] = float(element[column])
                except ValueError:
                  pass
        yield element

class ColumnsToIntegerConverter(beam.DoFn):
    def __init__(self, columns_to_integer):
        self.columns_to_integer = columns_to_integer

    def process(self, element):
        for column in self.columns_to_integer:
            if column in element and isinstance(element[column], (str, float)):
                try:
                  if element[column] == "":
                    element[column] = 0
                  else:
                    element[column] = int(element[column])
                except ValueError:
                  pass
        yield element

class RenameColumns(beam.DoFn):
    def __init__(self, column_mapping):
        self.column_mapping = column_mapping

    def process(self, element):
        new_element = {self.column_mapping.get(k, k): v for k, v in element.items()}
        yield new_element


class ReplaceValues(beam.DoFn):
    def __init__(self, replacements):
        self.replacements = replacements

    def process(self, element):
        for column, current_value, replacement  in self.replacements:
            if column in element:
                if isinstance(element[column], str) and current_value == element[column]:
                  element[column] = replacement
        yield element



class HandleNaN(beam.DoFn):
    """
    Treatment of NaN values ​​in the column
    """
    def process(self, element):
        import math
        for key, value in element.items():
            if isinstance(value, list):
                # Replace nan in lists
                element[key] = ["" if isinstance(item, float) and math.isnan(item) else item for item in value]
            elif value is None or (isinstance(value, float) and math.isnan(value)):
                # Replace single nan values
                element[key] = ''

        yield element


class TypeAnalyzer(beam.DoFn):
    """
    For each columns print the type, only one row
    """
    def process(self, element):
        global first_row_processed

        if not first_row_processed:
        # for each element in PCollection
          for column, value in element.items():
              # Print the name of column and type
              print(f'Column: {column}, Type: {type(value).__name__}')
          first_row_processed = True
        # Emite o elemento inalterado
        yield element


class FilterMissingValuesDoFn(beam.DoFn):
    def process(self, element):
        # Columns to check for missing values
        columns_to_check = [
            'Catalog Availability'
        ]

        # Check if any specified column has a missing value
        if all(col in element for col in columns_to_check):
            yield element


class DeriveColumn(beam.DoFn):
    def __init__(self, source_column, target_column):
        self.source_column = source_column
        self.target_column = target_column

    def process(self, element):
        element[self.target_column] = element[self.source_column]
        yield element


class TransformColumnApplication(beam.DoFn):
    def process(self, element):

        col_value = element['Application']
        result = col_value

        if col_value == 'L/MBP':
            result = 'LBP'
        elif col_value == 'M/HBP':
            result = 'MBP'

        element['Application'] = result
        yield element


class UniqueValueForNewColum(beam.DoFn):
    def __init__(self, new_column, value_column):
      self.new_column = new_column
      self.value_column = value_column

    def process(self, element):
        element[self.new_column] = self.value_column
        yield element


class CustomTransform(beam.DoFn):
    def process(self, element):

        cc_unit = element['Units']
        for i, col_suffix in enumerate('ABCDEFGHIJ'):
            col_name = f'CC_{col_suffix}'
            if cc_unit == 'w' or cc_unit == 'W':
                element[col_name] = element[col_name]
            elif cc_unit == 'kcal/h':
                element[col_name] = element[col_name] * 1.163
            elif cc_unit == 'btu/h':
                element[col_name] = element[col_name] * 0.293071
            else:
                pass
        yield element


class ConvertToStringEcodesign(beam.DoFn):
    def process(self, element):
        # Converte o valor 'YES' para string
        element['ECODESIGN'] = str(element['ECODESIGN']) if element['ECODESIGN'] is not None else None
        yield element

In [4]:
class LeftJoinFn(beam.DoFn):
    def __init__(self, columns_to_include=None):
        # Store the columns from TABELA2 to include in the output, default to None
        self.columns_to_include = columns_to_include

    def process(self, element):
        key, grouped_values = element
        tabela1_values = grouped_values['TABELA1']
        tabela2_values = grouped_values['TABELA2']

        for tabela1 in tabela1_values:
            tabela1_value = tabela1[1]  # Unpack the tuple, assuming the record is the second element

            if tabela2_values:
                for tabela2 in tabela2_values:
                    tabela2_value = tabela2[1]

                    # Filter the columns of tabela2_value if columns_to_include is provided
                    if self.columns_to_include is not None:
                        filtered_tabela2_value = {k: v for k, v in tabela2_value.items() if k in self.columns_to_include}
                    else:
                        filtered_tabela2_value = tabela2_value

                    yield {**tabela1_value, **filtered_tabela2_value}
            else:
                yield tabela1_value


class KeyByComposite(beam.DoFn):
    def __init__(self, key_columns):
        self.key_columns = key_columns

    def process(self, element):
        composite_key = {k:element[k] for k in self.key_columns if k in element}
        grouped_data = {k:element[k] for k in element}
        yield (composite_key, grouped_data)


class CreateKeyDoFn(beam.DoFn):
    def __init__(self, key_columns):
        self.key_columns = key_columns

    def process(self, element):
        # Assuming element is a tuple and the dictionary is the first item in the tuple
        dictionary = element[0]
        key = tuple(dictionary[k] for k in self.key_columns)
        yield (key, element)

In [5]:
class DeriveRatedCapacity(beam.DoFn):
    def process(self, element):
        try:
            tevap = float(element.get('Tevap', 0))
            tamb = float(element.get('Tamb', 0))

            #Create a caractere with ASCII
            coeffs = [float(element.get(f'CC_{chr(65 + i)}', 0)) for i in range(10)]

            rated_capacity = (coeffs[0] +
                             coeffs[1] * tevap +
                             coeffs[2] * tamb +
                             coeffs[3] * tevap * tamb +
                             coeffs[4] * tevap**2 +
                             coeffs[5] * tamb**2 +
                             coeffs[6] * tamb * tevap**2 +
                             coeffs[7] * tevap * tamb**2 +
                             coeffs[8] * tevap**3 +
                             coeffs[9] * tamb**3)

            element['RATED_CAPACITY'] = float(rated_capacity)
            yield element

        except Exception as e:

            element['RATED_CAPACITY'] =  0.0
            yield element


class DeriveRatedPower(beam.DoFn):
    def process(self, element):
        try:
            tevap = float(element.get('Tevap', 0))
            tamb = float(element.get('Tamb', 0))

            #Create a caractere with ASCII
            coeffs = [float(element.get(f'CON_{chr(65 + i)}', 0)) for i in range(10)]

            rated_power = (coeffs[0] +
                             coeffs[1] * tevap +
                             coeffs[2] * tamb +
                             coeffs[3] * tevap * tamb +
                             coeffs[4] * tevap**2 +
                             coeffs[5] * tamb**2 +
                             coeffs[6] * tamb * tevap**2 +
                             coeffs[7] * tevap * tamb**2 +
                             coeffs[8] * tevap**3 +
                             coeffs[9] * tamb**3)

            element['RATED_POWER'] = float(rated_power)
            yield element

        except Exception as e:

            element['RATED_POWER'] =  0.0
            yield element


class DeriveRatedCurrent(beam.DoFn):
    def process(self, element):
        try:
            tevap = float(element.get('Tevap', 0))
            tamb = float(element.get('Tamb', 0))

            #Create a caractere with ASCII
            coeffs = [float(element.get(f'CURR_{chr(65 + i)}', 0)) for i in range(10)]

            rated_current = (coeffs[0] +
                             coeffs[1] * tevap +
                             coeffs[2] * tamb +
                             coeffs[3] * tevap * tamb +
                             coeffs[4] * tevap**2 +
                             coeffs[5] * tamb**2 +
                             coeffs[6] * tamb * tevap**2 +
                             coeffs[7] * tevap * tamb**2 +
                             coeffs[8] * tevap**3 +
                             coeffs[9] * tamb**3)

            element['RATED_CURRENT'] = float(rated_current)
            yield element

        except Exception as e:

            element['RATED_CURRENT'] =  0.0
            yield element

class DeriveRatedCOP(beam.DoFn):
    def process(self, element):
        rated_capacity = element['RATED_CAPACITY']
        rated_power = element['RATED_POWER']

        if rated_capacity is not None and rated_power is not None and rated_power != 0:
          element['RATED_COP'] = rated_capacity / rated_power
        else:
          element['RATED_COP'] = None

        return [element]


class ColumnCompressorFreqSpeed(beam.DoFn):
    def process(self, element):
        compressor_rpm = element.get('VCC Compressor RPM')
        power_supply_freq = element.get(' Power supply Freq')

        # Check if {VCC Compressor RPM} is 'N/A'
        if compressor_rpm == '' :
            element['COMPRESSOR_FREQ_SPEED'] = power_supply_freq
        else:
            element['COMPRESSOR_FREQ_SPEED'] = compressor_rpm

        return [element]


class SetTypeToString(beam.DoFn):
    def process(self, element):
        element['CURR_A'] = str(element.get('CURR_A', ''))
        return [element]


def handle_nan(element):
    import pandas as pd
    for key, value in element.items():
        if pd.isna(value) or pd.isnull(value):
            element[key] = ''
    return element


class RemoveDecimalPoint(beam.DoFn):
    def __init__(self, columns_to_remove):
      self.columns_to_remove = columns_to_remove

    def process(self, element):
        for key in self.columns_to_remove:
            if key in element and isinstance(element[key], float) and element[key].is_integer():
                element[key] = str(int(element[key]))
        yield element

##Runner

In [6]:
#logging.getLogger().setLevel(logging.DEBUG)

#pipeline_options = {
#    'project':'nidec-ga'
#}

#"""
pipeline_options = {
    'project':'nidec-ga',
    'runner':'DataflowRunner',
    'region':'us-central1',
    'staging_location':'gs://nidec-ga-temp/data-flow-pipelines/r&d-product-selection-software/staging',
    'temp_location':'gs://nidec-ga-temp/data-flow-pipelines/r&d-product-selection-software/temp',
    'template_location':'gs://nidec-ga-temp/data-flow-pipelines/r&d-product-selection-software/template/r&d-product-selection-software-curvecoefficient',
    'requirements_file':'requirements.txt'
}
#"""

pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
pipeline = beam.Pipeline(options=pipeline_options)

def run_pipeline(temp_location, input_file_CDU_Curve_Coefficients, input_file_Envelope_Specification, input_file_CDU_Standard_Temperatures,  output_table):
    import pandas as pd

    cols_to_drop_performance = ['MODEL', 'CODE', 'STANDARD', 'MOTOR_TYPE', 'FLUID', 'VOLTAGE', 'FREQUENCY', 'CC_UNIT', 'CC0', 'CC1', 'CC2', 'CC3', 'CC4', 'CC5', 'CC6', 'CC7',
    'CC8', 'CC9', 'CP0', 'CP1', 'CP2', 'CP3', 'CP4', 'CP5', 'CP6', 'CP7', 'CP8', 'CP9']

    column_mapping_application = {
        'Application': 'Test_application',
    }

    column_mapping_application_envelope = {
        'Refrigerant': 'TEST_REFRIGERANT',
        'ECO DESIGN DATASHEET': 'ECODESIGN',
        'points': 'POINTS_OPERATION',
        'Standard': 'STANDARD',
        'out_ambient_min': 'OUT_AMBIENT_MIN',
        'out_ambient_max': 'OUT_AMBIENT_MAX',
        'out_evaporating_min': 'OUT_EVAPORATING_MIN',
        'out_evaporating_max': 'OUT_EVAPORATING_MAX',
        'PD_Config': 'PD_CONFIG',
        'performance_curve_points': 'PERFORMANCE_CURVE_POINTS',
    }

    performance_replacements = [
        ('CC_A', '#VALOR!', ''),('CC_A', '#REF!', ''),('CC_A', '#VALUE!', ''),('CC_A', '#HODNOTA!', ''),
        ('CC_B', '#VALOR!', ''),('CC_B', '#REF!', ''),('CC_B', '#VALUE!', ''),('CC_B', '#HODNOTA!', ''),
        ('CC_C', '#VALOR!', ''),('CC_C', '#REF!', ''),('CC_C', '#VALUE!', ''),('CC_C', '#HODNOTA!', ''),
        ('CC_D', '#VALOR!', ''),('CC_D', '#REF!', ''),('CC_D', '#VALUE!', ''),('CC_D', '#HODNOTA!', ''),
        ('CC_E', '#VALOR!', ''),('CC_E', '#REF!', ''),('CC_E', '#VALUE!', ''),('CC_E', '#HODNOTA!', ''),
        ('CC_F', '#VALOR!', ''),('CC_F', '#REF!', ''),('CC_F', '#VALUE!', ''),('CC_F', '#HODNOTA!', ''),
        ('CC_G', '#VALOR!', ''),('CC_G', '#REF!', ''),('CC_G', '#VALUE!', ''),('CC_G', '#HODNOTA!', ''),
        ('CC_H', '#VALOR!', ''),('CC_H', '#REF!', ''),('CC_H', '#VALUE!', ''),('CC_H', '#HODNOTA!', ''),
        ('CC_I', '#VALOR!', ''),('CC_I', '#REF!', ''),('CC_I', '#VALUE!', ''),('CC_I', '#HODNOTA!', ''),
        ('CC_J', '#VALOR!', ''),('CC_J', '#REF!', ''),('CC_J', '#VALUE!', ''),('CC_J', '#HODNOTA!', ''),
        ('CON_A', '#VALOR!', ''),('CON_A', '#REF!', ''),('CON_A', '#VALUE!', ''),('CON_A', '#HODNOTA!', ''),
        ('CON_B', '#VALOR!', ''),('CON_B', '#REF!', ''),('CON_B', '#VALUE!', ''),('CON_B', '#HODNOTA!', ''),
        ('CON_C', '#VALOR!', ''),('CON_C', '#REF!', ''),('CON_C', '#VALUE!', ''),('CON_C', '#HODNOTA!', ''),
        ('CON_D', '#VALOR!', ''),('CON_D', '#REF!', ''),('CON_D', '#VALUE!', ''),('CON_D', '#HODNOTA!', ''),
        ('CON_E', '#VALOR!', ''),('CON_E', '#REF!', ''),('CON_E', '#VALUE!', ''),('CON_E', '#HODNOTA!', ''),
        ('CON_F', '#VALOR!', ''),('CON_F', '#REF!', ''),('CON_F', '#VALUE!', ''),('CON_F', '#HODNOTA!', ''),
        ('CON_G', '#VALOR!', ''),('CON_G', '#REF!', ''),('CON_G', '#VALUE!', ''),('CON_G', '#HODNOTA!', ''),
        ('CON_H', '#VALOR!', ''),('CON_H', '#REF!', ''),('CON_H', '#VALUE!', ''),('CON_H', '#HODNOTA!', ''),
        ('CON_I', '#VALOR!', ''),('CON_I', '#REF!', ''),('CON_I', '#VALUE!', ''),('CON_I', '#HODNOTA!', ''),
        ('CON_J', '#VALOR!', ''),('CON_J', '#REF!', ''),('CON_J', '#VALUE!', ''),('CON_J', '#HODNOTA!', ''),
        ('CURR_A', '#VALOR!', ''),('CURR_A', '#REF!', ''),('CURR_A', '#VALUE!', ''),('CURR_A', '#HODNOTA!', ''),
        ('CURR_B', '#VALOR!', ''),('CURR_B', '#REF!', ''),('CURR_B', '#VALUE!', ''),('CURR_B', '#HODNOTA!', ''),
        ('CURR_C', '#VALOR!', ''),('CURR_C', '#REF!', ''),('CURR_C', '#VALUE!', ''),('CURR_C', '#HODNOTA!', ''),
        ('CURR_D', '#VALOR!', ''),('CURR_D', '#REF!', ''),('CURR_D', '#VALUE!', ''),('CURR_D', '#HODNOTA!', ''),
        ('CURR_E', '#VALOR!', ''),('CURR_E', '#REF!', ''),('CURR_E', '#VALUE!', ''),('CURR_E', '#HODNOTA!', ''),
        ('CURR_F', '#VALOR!', ''),('CURR_F', '#REF!', ''),('CURR_F', '#VALUE!', ''),('CURR_F', '#HODNOTA!', ''),
        ('CURR_G', '#VALOR!', ''),('CURR_G', '#REF!', ''),('CURR_G', '#VALUE!', ''),('CURR_G', '#HODNOTA!', ''),
        ('CURR_H', '#VALOR!', ''),('CURR_H', '#REF!', ''),('CURR_H', '#VALUE!', ''),('CURR_H', '#HODNOTA!', ''),
        ('CURR_I', '#VALOR!', ''),('CURR_I', '#REF!', ''),('CURR_I', '#VALUE!', ''),('CURR_I', '#HODNOTA!', ''),
        ('CURR_J', '#VALOR!', ''),('CURR_J', '#REF!', ''),('CURR_J', '#VALUE!', ''),('CURR_J', '#HODNOTA!', ''),
        ]

    input_CDU_curve_coefficients_data = (
        pipeline
        | 'Performance Coefficients -- Create File Path Excel' >> beam.Create([input_file_CDU_Curve_Coefficients])
        | 'Performance Coefficients -- Read Excel -> Origin' >> beam.FlatMap(lambda file:pd.read_csv(file, delimiter=',', engine='python').to_dict('records'))
    )

    grouped_clean_CDU_curve_coefficients_data = (
        input_CDU_curve_coefficients_data
        | "handle NaN with ' '" >> beam.ParDo(HandleNaN())
        | "new column derive Static" >> beam.ParDo(UniqueValueForNewColum(new_column='COEFFICIENTS_TEMPERATURE_UNIT', value_column='C'))
        | "apply transformation in column Application" >> beam.ParDo(TransformColumnApplication())
        | "rename column Application" >> beam.ParDo(RenameColumns(column_mapping = column_mapping_application))
        #| 'Count all elements 1' >> beam.combiners.Count.Globally()
        #| beam.Map(print)
    )


    input_CDU_envelope_specification_data = (
        pipeline
        | 'Performance Envelope -- Create File Path Excel' >> beam.Create([input_file_Envelope_Specification])
        | 'Performance Envelope -- Read Excel -> Origin' >> beam.FlatMap(lambda file:pd.read_csv(file, delimiter=',', engine='python').to_dict('records'))
    )

    grouped_CDU_curve_coefficients = (grouped_clean_CDU_curve_coefficients_data
        | "Grouped Business CurveCoefficients -- Standard Temperature -- Create Composite Key" >> beam.ParDo(KeyByComposite(['Envelope']))
        | 'Grouped Business CurveCoefficients -- Key StandTemp' >> beam.ParDo(CreateKeyDoFn(['Envelope']))
    )

    grouped_CDU_envelope_specification = (input_CDU_envelope_specification_data
        | "Grouped Business EnvelopeEspecification -- Standard Temperature -- Create Composite Key" >> beam.ParDo(KeyByComposite(['envelope']))
        | 'Grouped Business EnvelopeEspecification -- Key StandTemp' >> beam.ParDo(CreateKeyDoFn(['envelope']))
    )

    joined_data = ({'TABELA1':grouped_CDU_curve_coefficients, 'TABELA2':grouped_CDU_envelope_specification}
        | "CoGroupByKey" >> beam.CoGroupByKey()
        | 'Left Join' >> beam.ParDo(LeftJoinFn())
    )

    clean_CDU_envelope_coefficients_data = (
        joined_data
        | "new column derive Coefficients" >> beam.ParDo(UniqueValueForNewColum(new_column='COEFFICIENTS_OUTPUT_UNIT', value_column='W'))
        | "transform columns CCA~CCJ "  >> beam.ParDo(CustomTransform())
        | "rename columns" >> beam.ParDo(RenameColumns(column_mapping = column_mapping_application_envelope))
        | "drop selected columns" >> beam.ParDo(DropColumnsTechnical(cols_to_drop = ['Envelope', 'PD_Config1', 'Exists in products']))
        | "convert in string ecodesign" >> beam.ParDo(ColumnsToStringConverter(columns_to_string = ['ECODESIGN']))
        | "replacespaces CC_A~CURR_J" >> beam.ParDo(ReplaceValues(replacements = performance_replacements))
    )

    grouped_CDU_envelope_coefficients = (clean_CDU_envelope_coefficients_data
        | "Grouped Business CurveEnvelopeCoefficients -- Standard Temperature -- Create Composite Key" >> beam.ParDo(KeyByComposite(['STANDARD', 'Test_application', 'COEFFICIENTS_TEMPERATURE_UNIT']))
        | 'Grouped Business CurveEnvelopeCoefficients -- Key StandTemp' >> beam.ParDo(CreateKeyDoFn(['STANDARD', 'Test_application', 'COEFFICIENTS_TEMPERATURE_UNIT']))
    )


    input_CDU_standard_temperatures_data = (
        pipeline
        | 'Standard Temperatures -- Create File Path Excel' >> beam.Create([input_file_CDU_Standard_Temperatures])
        | 'Standard Temperatures -- Read Excel -> Origin' >> beam.FlatMap(lambda file:pd.read_csv(file, delimiter=',', engine='python').to_dict('records'))
    )

    grouped_CDU_standard_temperatures = (input_CDU_standard_temperatures_data
        | "Grouped Business Standard Temperatures -- Standard Temperature -- Create Composite Key" >> beam.ParDo(KeyByComposite(['Standard', 'Application', 'T_coef_input']))
        | 'Grouped Business Standard Temperatures -- Key StandTemp' >> beam.ParDo(CreateKeyDoFn(['Standard', 'Application', 'T_coef_input']))
    )

    joined_structured_data = ({'TABELA1':grouped_CDU_envelope_coefficients, 'TABELA2':grouped_CDU_standard_temperatures}
        | "CoGroupByKeyStructured" >> beam.CoGroupByKey()
        | 'Left JoinStructured' >> beam.ParDo(LeftJoinFn())
    )


    joined_business_data = (
        joined_structured_data
        #| "drop TEST_APPLICATION column" >> beam.ParDo(DropColumnsTechnical(cols_to_drop = ['TEST_APPLICATION']))
        | "convert in float" >> beam.ParDo(ColumnsToFloatConverter(columns_to_float = ['CC_A', 'CON_A','Tevap','Treturn']))
        | "Derive Rated Capacity" >> beam.ParDo(DeriveRatedCapacity())
        | "Derive Rated Power" >> beam.ParDo(DeriveRatedPower())
        | "Derive Rated Current" >> beam.ParDo(DeriveRatedCurrent())
        | "Derive Rated Cop" >> beam.ParDo(DeriveRatedCOP())
        | "Column Compressor Freq Speed" >> beam.ParDo(ColumnCompressorFreqSpeed())
        | "CURR_A to string" >> beam.ParDo(SetTypeToString())
        | "drop selected columns POWER/VCC" >> beam.ParDo(DropColumnsTechnical(cols_to_drop = [' Power supply Freq', 'VCC Compressor RPM']))
        | "rename columns joined data" >> beam.ParDo(RenameColumns(column_mapping = {'Tevap': 'RATED_T_EVAP' , 'Tamb': 'RATED_T_AMB', 'Treturn': 'RATED_T_RETURN', 'SUB': 'RATED_T_SUB', 'SUP': 'RATED_T_SUP',
          'Test_application': 'TEST_APPLICATION','Units': 'UNITS', 'Model': 'MODEL', 'Voltage1': 'VOLTAGE1', 'Freq1': 'FREQUENCY1',  'Voltage2': 'VOLTAGE2', 'Freq2': 'FREQUENCY2', 'Motor_type': 'MOTOR_TYPE'
                                                       }))
        | "convert in integer" >> beam.ParDo(ColumnsToIntegerConverter(columns_to_integer = ['FREQUENCY1', 'FREQUENCY2']))
        | "replacespatterns" >> beam.ParDo(ReplaceValues(replacements = [('VOLTAGE1', ' ', ''),('VOLTAGE2', ' ', '')]))
        | "convert in integer Voltage2" >> beam.ParDo(ColumnsToIntegerConverter(columns_to_integer = ['VOLTAGE2']))
        | "drop selected for match with schema" >> beam.ParDo(DropColumnsTechnical(cols_to_drop = [ 'envelope', 'Standard', 'Application', 'T_coef_input', 'Test Application ']))
        | "handle NAN" >> beam.Map(handle_nan)
        | "convert in float for match with schema" >> beam.ParDo(ColumnsToFloatConverter(columns_to_float = ['CON_J', 'CON_I', 'CON_H', 'CON_G', 'CON_F', 'CON_E', 'CON_D', 'CON_C', 'CON_B', 'CURR_J', 'CURR_I', 'CURR_H',
            'CURR_G', 'CURR_F', 'CURR_E', 'CURR_D', 'CURR_C', 'CURR_B', 'CURR_A']))
        | "convert in integer typing" >> beam.ParDo(RemoveDecimalPoint(columns_to_remove = ['COMPRESSOR_FREQ_SPEED', 'RATED_T_SUP']))
        #| 'ConvertToStringEcodesign' >> beam.ParDo(ConvertToStringEcodesign())
        #| 'Count all elements' >> beam.combiners.Count.Globally()
        #| beam.Map(print)
        #| 'Type Analysis each columns' >> beam.ParDo(TypeAnalyzer())
    )


    joined_business_data | 'Write To BigQuery -> PSS DAD Trusted' >> beam.io.WriteToBigQuery(
        table=output_table,
        schema='SCHEMA_AUTODETECT',
        create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        custom_gcs_temp_location = temp_location
    )


    #ib.show (joined_business_data)

    pipeline.run().wait_until_finish()

if __name__ == '__main__':
    temp_location = 'gs://nidec-ga-temp/data-flow-pipelines/r&d-product-selection-software/temp'

    input_file_CDU_Curve_Coefficients = 'gs://nidec-ga-transient/r&d-product-selection-software/cdu/CDU PSS Database - Curve Coefficients.csv'

    input_file_Envelope_Specification = 'gs://nidec-ga-transient/r&d-product-selection-software/cdu/CDU PSS Database - Envelope Specification.csv'

    input_file_CDU_Standard_Temperatures = 'gs://nidec-ga-transient/r&d-product-selection-software/cdu/CDU PSS Database - StandardTemp.csv'

    output_table = 'nidec-ga:bq_trusted.PSS_CURVE_COEFFICIENTS_CDU'

    run_pipeline(temp_location, input_file_CDU_Curve_Coefficients, input_file_Envelope_Specification, input_file_CDU_Standard_Temperatures,  output_table)