In [130]:
import abc
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, desc, lit, when, count,concat, sum, asc, regexp_replace, explode, split, row_number

# Spark Inicialization

spark = SparkSession.builder \
.master("local[*]") \
.config('spark.jars', '/home/semantix/alertas/volume/postgresql-42.2.9.jre6.jar')\
.appName("teste")\
.getOrCreate()

In [131]:
import abc

NOT_IMPL_MSG = "This method need to be implemented, please check the documentation"

class DAO:
    __metaclass__ = abc.ABCMeta

    @abc.abstractmethod
    def select(self, db_table):
        raise NotImplementedError(NOT_IMPL_MSG)
        
    @abc.abstractmethod
    def insert(db_table, dataframe):
        raise NotImplementedError(NOT_IMPL_MSG)
        
    @staticmethod
    def from_csv(path, header=True):
        return spark.read.format("csv")\
        .option("header", header).load(path)

    @staticmethod
    def instanceOf(subclass):
        try:
            return next(filter(lambda x: x.__name__ == subclass, DAO.__subclasses__()))
        except StopIteration as error:
            raise Exception(f'Please verify if this feature was implemented: {error}')

In [132]:
import socket

class PostgresDAO(DAO):
    def __init__(self):
        """API to access Postgres RDBMS - DAO Implementation"""
        
        self.host = socket.gethostbyname(socket.gethostname())
        self.database = "postgres"
        self.user = "postgres"
        self.password = "postgres"
        self.port = 5432
        
    def select(self, db_table):
        """Query engine to this DAO implementation"""
        try:
            return spark.read\
            .format('jdbc')\
            .option('url', f'jdbc:postgresql://{self.host}:{self.port}/{self.database}')\
            .option('dbtable', f'{db_table}')\
            .option('user', self.user)\
            .option('password', self.password)\
            .option('driver', 'org.postgresql.Driver')\
            .load()     
        except Exception as error: 
            logger.error(error)
            
    def insert(self, db_table, dataframe):
        """Data ingestion engine to this DAO implementation"""
        try:
            dataframe.write\
            .format('jdbc')\
            .option('url', f'jdbc:postgresql://{self.host}:{self.port}/{self.database}')\
            .option('dbtable', f'{db_table}')\
            .option('user', self.user)\
            .option('password', self.password)\
            .option('driver', 'org.postgresql.Driver')\
            .mode('append')\
            .save()
        except Exception as error: 
            logger.error(error)

In [133]:
import abc

NOT_IMPL_MSG = 'This method need to be implemented, please check the documentation'

class Ingestor:
    __metaclass__ = abc.ABCMeta
    
    @abc.abstractmethod
    def start(self):
        raise NotImplementedError(NOT_IMPL_MSG)
    
    def load(self, metadata, data):
        try:
            sink = metadata.get('sink')
            db_table = f'{metadata.get("database")}.{metadata.get("table")}'
            access = DAO.instanceOf(sink)
            access().insert(db_table=db_table, dataframe=data)
        except Exception as error: 
            logger.error(error)
            
    @staticmethod
    def apply(f, metadata):
        return f(metadata)

In [134]:
from pyspark.sql.functions import col, explode, split, when, lit


class Dimension(Ingestor):
    def __init__(self, source, model, field, database='default', table='table', sink='HiveDAO', embbebedList=False):
        self.metadata = dict({
            'source': source,
            'database': database,
            'table': table,
            'sink': sink,
            'model': model,
            'embbebedList': embbebedList,
            'description_field': field})

    def save(self):
        data = Ingestor.apply(Dimension.rule, self.metadata)
        self.load(self.metadata, data)

    def get(self):
        return Ingestor.apply(Dimension.rule, self.metadata)

    @staticmethod
    def checkConstraint(field, dataframe):
        """ Responsible for verifying the field's integrity given dataframe"""
        return dataframe.\
            withColumn(field, when(col(field).isNull(), lit('Not Specified'))\
            .otherwise(col(field)))
    
    @staticmethod
    def create_index(field, source):
        return map(lambda x, y: (x+1, y.asDict()[field]), range(0, len(source)), source)

    @staticmethod
    def rule(metadata):
        """ Responsible to implement business logic to dimensional tables """
        model = metadata.get('model')
        field = metadata.get('description_field')

        if metadata.get('embbebedList') == False:
            data = Dimension.checkConstraint(field=field, dataframe=metadata.get('source'))\
                .select(field)\
                .distinct()\
                .collect()
        else:
            data = Dimension.checkConstraint(field=field, dataframe=metadata.get('source'))\
                .select(field)\
                .withColumn(field, split(col(field), ';'))\
                .withColumn(field, explode(col(field)))\
                .distinct()\
                .collect()

        return spark.createDataFrame(Dimension.create_index(field, data), model().schema)

    @staticmethod
    def build(datasource, metadata):
        return Dimension(source=datasource,
            model=metadata.get('model'),
            field=metadata.get('field'),
            database=metadata.get('database'),
            table=metadata.get('table'),
            embbebedList=metadata.get('embbebedList'),
            sink=metadata.get('sink'))

In [135]:
from pyspark.sql.types import StructField, StringType, IntegerType, StringType, StructType, DoubleType

class CommunicationToolModel:
    @property
    def schema(self):    
        return StructType([
            StructField('ferramenta_comunic_id', IntegerType(), False),
            StructField('nome', StringType(), False)])

class SOModel(StructType):
    @property
    def schema(self):
        return StructType([
                    StructField('sistema_operacional_id', IntegerType(), False),
                    StructField('nome', StringType(), False)])
    
class LanguageWorkedWithModel:
    @property
    def schema(self):    
        return StructType([
            StructField('linguagem_programacao_id', IntegerType(), False),
            StructField('nome', StringType(), False)])
    
class CompanyModel:
    @property
    def schema(self):    
        return StructType([
            StructField('empresa_id', IntegerType(), False),
            StructField('tamanho', StringType(), False)])

class CountryModel:
    @property
    def schema(self):    
        return StructType([
            StructField('pais_id', IntegerType(), False),
            StructField('nome', StringType(), False)])

class RespLanguageModel:
    @property
    def schema(self):    
        return StructType([
            StructField('resp_usa_linguagem_id', IntegerType(), False),
            StructField('respondente_id', IntegerType(), False),
            StructField('linguagem_programacao_id', IntegerType(), False),
            StructField('momento', IntegerType(), False)])
    
class RespToolsModel:
    @property
    def schema(self):    
        return StructType([
            StructField('resp_usa_ferramenta_id', IntegerType(), False),
            StructField('respondente_id', IntegerType(), False),
            StructField('ferramenta_comunic_id', IntegerType(), False)])
    
class RespondentModel:
    @property
    def schema(self):    
        return StructType([
            StructField("respondente_id", IntegerType(), False),
            StructField("nome", StringType(), False),
            StructField("contrib_open_source", IntegerType(), False),
            StructField("programa_hobby", IntegerType(), False),
            StructField("salario", DoubleType(), False),
            StructField("sistema_operacional_id", IntegerType(), False),
            StructField("pais_id", IntegerType(), False),
            StructField("empresa_id", IntegerType(), False)])

In [181]:
from pyspark.sql.functions import col, explode, split, when, lit
from pyspark.sql.window import Window  

class IntermediaryEntity(Ingestor):
    def __init__(self, source, model, field, options, database='default', table='table', sink='HiveDAO',
                 embbebedList=False):
        self.metadata = dict({
            'source': source,
            'database': database,
            'table': table,
            'sink': sink,
            'model': model,
            'embbebedList': embbebedList,
            'description_field': field,
            'options': options
        })

    def save(self):
        data = Ingestor.apply(IntermediaryEntity.rule, self.metadata)
        self.load(self.metadata, data)

    def get(self):
        return Ingestor.apply(IntermediaryEntity.rule, self.metadata)

    @staticmethod
    def checkConstraint(field, dataframe):
        """ Responsible for verifying the field's integrity given dataframe"""
        return dataframe.\
            withColumn(field, when(col(field).isNull(), lit('Not Specified'))\
            .otherwise(col(field)))
    
    @staticmethod
    def create_index(columns, source):
        return map(lambda x, y: (x, y.asDict()[columns]), range(1, len(source)), source)
    
    @staticmethod
    def createIndex(ordered_column, index, source):
        w = Window.orderBy(asc(ordered_column))
        return source.withColumn(index, row_number().over(w))
    
    @udf()
    def conversor(field):
        try:
            return MAPPER.value[field]
        except KeyError as err:
            return field
    
    @staticmethod
    def rule(metadata):
        """ Responsible to implement business logic to dimensional tables """
        model = metadata.get('model')
        options = metadata.get('options')
        field = metadata.get('description_field')
        table = options.get('relationTable')
        database = options.get("relationDatabase")
        print(options.get("originalColumn"))
        dimension = PostgresDAO().select(f'{database}.{table}')
        
        if metadata.get('embbebedList') == False:
            data = Dimension.checkConstraint(field=field, dataframe=metadata.get('source'))\
                .select(options.get('intermediaryColumns'))\
                .join(dimension, col(options.get('originalColumn')) == getattr(dimension, \
                                                                                   options.get("dimensionalColumn")))
            data = data.withColumnRenamed(options.get('relationTableId'), options.get('relationTableId'))
            return IntermediaryEntity.createIndex(options.get('originalIdColumn'), options.get('relationTbID'), data)
        else:
            data = Dimension.checkConstraint(field=field, dataframe=metadata.get('source'))\
                .select(options.get('intermediaryColumns'))\
                .withColumn(field, split(col(options.get("originalColumn")), ';'))\
                .withColumn(field, explode(col(options.get("originalColumn"))))\
                .join(dimension, col(options.get('originalColumn')) == getattr(dimension, \
                                                                                   options.get("dimensionalColumn")))
            data = data.withColumnRenamed(options.get('originalIdColumn'), options.get('relationTableId'))
            data = data.withColumn(options.get('relationTableId'), col(options.get('relationTableId')).cast(IntegerType()))
            data = IntermediaryEntity.createIndex(options.get('relationTableId'), options.get('relationTbID'), data)
            if options.get("hasAdicionalColumns") == True:
                for i in options.get("adicionalColumns"):
                    setup = options.get("adicionalColumnsMapper")[i]
                    data = data.withColumnRenamed(i, setup["name"])
                    if setup.get("hasValueConversion") == True:
                        global MAPPER
                        MAPPER = spark.sparkContext.broadcast(setup["valuesConversor"])
                        data = data.withColumn(setup['name'], IntermediaryEntity.conversor(setup['name']))
                    data = data.withColumn(setup['name'], col(setup['name']).cast(setup['type']))
            return data.select(model().schema.fieldNames())
        
    @staticmethod
    def build(datasource, metadata):
        return IntermediaryEntity(source=datasource,
            model=metadata.get('model'),
            field=metadata.get('field'),
            database=metadata.get('database'),
            table=metadata.get('table'),
            embbebedList=metadata.get('embbebedList'),
            sink=metadata.get('sink'),
            options=metadata.get("options"))

NameError: name 'datasource' is not defined

In [139]:
import json
from requests import Session
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from requests.models import Response

class ExchangeDAO(DAO):
    
    def __init__(self, version="v4", currency="BRL", result='rates', **kwargs):
        self.response_data = result
        self.sleep = kwargs.get('sleep')
        self.retries = kwargs.get('retries')
        self.timeout = kwargs.get('timeout')
        self.backoff_factor = kwargs.get('backoff_factor')
        
        self.endpoint = f'https://api.exchangerate-api.com/{version}/latest/{currency}'
        self.session = ExchangeDAO.__requestSetup(self.retries, self.sleep)
    
    def __transform(self, data):
        """Impõe valor de R$ 3.81 para o dolar conforme Regra de Negócio"""
        data = {key: 1/value for key, value in json.loads(data.text)[self.response_data].items()}
        data.update({'USD': 3.81})
        return data
    
    @staticmethod
    def __requestSetup(retries, sleep):
        session = Session()
        retry = Retry(total=retries, read=retries, backoff_factor=sleep)
        adapter = HTTPAdapter(max_retries=retry)
        session.headers = {'Content-type': 'application/json'}
        session.mount('http://', adapter)
        return session

    def collect(self):
        """Carrega os valores atuais de cada moeda, aplicando as 
        transformações necessárias para disponibilização dos dados"""
        try:
            request = ExchangeDAO.__requestSetup(retries=self.retries, sleep=self.sleep)
            data = request.get(self.endpoint, timeout=self.timeout)
            return self.__transform(data)
        except Exception as err:
            """TODO"""
            raise Exception

In [359]:
class FactRules(object):
    
    @staticmethod
    def removeNullable(metadata, field, in_null_cases='Not Specified'):
        """ Responsible for verifying the field's integrity given dataframe """
        return metadata.get('source').\
            withColumn(field, when(col(field).isNull(), lit(in_null_cases))\
            .otherwise(col(field)))
    
    @staticmethod
    def salary_standardization(metadata):
        return metadata.get('source')\
        .withColumn("ConvertedSalary", col('ConvertedSalary')/12)\
        .na.fill({'ConvertedSalary': 0})\
        .withColumn("ConvertedSalary", exchange_conversion("CurrencySymbol","ConvertedSalary"))\
        .withColumn("ConvertedSalary", col('ConvertedSalary').cast(DoubleType()))\
        
            
    @staticmethod
    def respondent_name_creation(metadata):
        return metadata.get('source').withColumn('nome', concat(lit('respondente_'), col('Respondent')))
    
    @staticmethod
    def columns_renames(metadata):
        return metadata.get('source')\
        .withColumnRenamed("ConvertedSalary", "salario")\
        .withColumnRenamed("OpenSource", "contrib_open_source")\
        .withColumnRenamed("Hobby", "programa_hobby")\
        .withColumnRenamed("Respondent", "respondente_id")\
        .withColumn("respondente_id", col("respondente_id").cast(IntegerType()))
    
    @staticmethod
    def treat_embbebed_list(metadata):
        return metadata.get("source")\
            .withColumn("LanguageWorkedWith", split(col("LanguageWorkedWith"), ';'))\
            .withColumn("LanguageWorkedWith", explode(col("LanguageWorkedWith")))\
            .withColumn("CommunicationTools", split(col("CommunicationTools"), ';'))\
            .withColumn("CommunicationTools", explode(col("CommunicationTools")))

    @staticmethod
    def fact_enrichment(metadata):
        pais = PostgresDAO().select('stackoverflow.pais').withColumnRenamed("nome", "pais_nome")
        sistema_operacional = PostgresDAO().select('stackoverflow.sistema_operacional').withColumnRenamed("nome", "so_nome")
        empresa = PostgresDAO().select('stackoverflow.empresa')

        return metadata.get('source')\
        .join(pais, pais.pais_nome == col("Country"))\
        .join(sistema_operacional, sistema_operacional.so_nome == col("OperatingSystem"))\
        .join(empresa, empresa.tamanho == col("CompanySize"))
    
    @staticmethod
    def field_to_boolean(metadata):
        return metadata.get('source')\
        .withColumn("contrib_open_source", when(col("contrib_open_source") == 'Yes', lit(1)).otherwise(lit(0)))\
        .withColumn("programa_hobby", when(col("programa_hobby") == 'Yes', lit(1)).otherwise(lit(0)))\
        .withColumn("salario", col("salario").cast(DoubleType()))

@udf()
def exchange_conversion(currencySymbol, salary):
    global currencies
    defaul_value = 0.0
    try:
        return currencies.value[currencySymbol] * salary if salary is not None else defaul_value
    except KeyError as err:
        return defaul_value

In [363]:

currencies = spark.sparkContext.broadcast(ExchangeDAO(sleep=0.5, retries=2, timeout=5).collect())

class Fact(Ingestor):    
    def __init__(self, source, model, database='default', table='table', sink='HiveDAO'):
        self.metadata = dict({
            'source': source,
            'database': database,
            'table': table,
            'sink': sink,
            'model': model})

    def get(self):
        self.update_source(Ingestor.apply(FactRules.salary_standardization, self.metadata))
        self.update_source(Ingestor.apply(FactRules.respondent_name_creation, self.metadata))
        self.update_source(Ingestor.apply(FactRules.columns_renames, self.metadata))
        
        self.update_source(FactRules.removeNullable(self.metadata, "CommunicationTools"))
        self.update_source(FactRules.removeNullable(self.metadata, "OperatingSystem"))
        self.update_source(FactRules.removeNullable(self.metadata, "LanguageWorkedWith"))
        self.update_source(FactRules.removeNullable(self.metadata, "CompanySize"))
        self.update_source(FactRules.removeNullable(self.metadata, "Country"))
        self.update_source(Ingestor.apply(FactRules.treat_embbebed_list, self.metadata))
        self.update_source(Ingestor.apply(FactRules.fact_enrichment, self.metadata))
        self.update_source(Ingestor.apply(FactRules.field_to_boolean, self.metadata))
        return self.filter_schema()
    
    def filter_schema(self):
        model = self.metadata.get('model')
        return self.metadata.get('source').select(model().schema.fieldNames()).distinct()
    
    def save(self):
        data = self.get()
        self.load(self.metadata, data)
    
    def update_source(self, data):
        self.metadata.update({'source': data})


<h2> Entrypoint </h2> 

In [312]:
datasource = DAO.from_csv("base_de_respostas_10k_amostra.csv")

Dimension(source=datasource,
            model=CountryModel,
            field="Country",
            database="stackoverflow",
            table="pais",
            embbebedList=False,
            sink='PostgresDAO').save()

Dimension(source=datasource,
            model=CommunicationToolModel,
            field="CommunicationTools",
            database="stackoverflow",
            table="ferramenta_comunic",
            embbebedList=True,
            sink='PostgresDAO').save()

Dimension(source=datasource,
            model=LanguageWorkedWithModel,
            field="LanguageWorkedWith",
            database="stackoverflow",
            table="linguagem_programacao",
            embbebedList=True,
            sink='PostgresDAO').save()

Dimension(source=datasource,
            model=SOModel,
            field="OperatingSystem",
            database="stackoverflow",
            table="sistema_operacional",
            embbebedList=False,
            sink='PostgresDAO').save()

Dimension(source=datasource,
            model=CompanyModel,
            field="CompanySize",
            database="stackoverflow",
            table="empresa",
            embbebedList=False,
            sink='PostgresDAO').save()



An error occurred while calling o12545.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 507.0 failed 1 times, most recent failure: Lost task 0.0 in stage 507.0 (TID 26008, localhost, executor driver): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO stackoverflow.pais ("pais_id","nome") VALUES (1,'Paraguay') was aborted: ERROR: duplicate key value violates unique constraint "pais_pkey"
  Detail: Key (pais_id)=(1) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:155)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2242)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:508)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:850)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch

An error occurred while calling o12643.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 510.0 failed 1 times, most recent failure: Lost task 7.0 in stage 510.0 (TID 26229, localhost, executor driver): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO stackoverflow.ferramenta_comunic ("ferramenta_comunic_id","nome") VALUES (11,'Facebook') was aborted: ERROR: duplicate key value violates unique constraint "ferramenta_comunic_pkey"
  Detail: Key (ferramenta_comunic_id)=(11) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:155)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2242)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:508)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:850)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873

An error occurred while calling o12741.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 513.0 failed 1 times, most recent failure: Lost task 1.0 in stage 513.0 (TID 26437, localhost, executor driver): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO stackoverflow.linguagem_programacao ("linguagem_programacao_id","nome") VALUES (5,'Bash/Shell') was aborted: ERROR: duplicate key value violates unique constraint "linguagem_programacao_pkey"
  Detail: Key (linguagem_programacao_id)=(5) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:155)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2242)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:508)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:850)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatem

An error occurred while calling o12833.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 516.0 failed 1 times, most recent failure: Lost task 7.0 in stage 516.0 (TID 26657, localhost, executor driver): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO stackoverflow.sistema_operacional ("sistema_operacional_id","nome") VALUES (5,'Windows') was aborted: ERROR: duplicate key value violates unique constraint "sistema_operacional_pkey"
  Detail: Key (sistema_operacional_id)=(5) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:155)
	at org.postgresql.core.ResultHandlerDelegate.handleError(ResultHandlerDelegate.java:50)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2242)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:508)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(P

An error occurred while calling o12925.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 519.0 failed 1 times, most recent failure: Lost task 5.0 in stage 519.0 (TID 26869, localhost, executor driver): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO stackoverflow.empresa ("empresa_id","tamanho") VALUES (6,'500 to 999 employees') was aborted: ERROR: duplicate key value violates unique constraint "empresa_pkey"
  Detail: Key (empresa_id)=(6) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:155)
	at org.postgresql.core.ResultHandlerDelegate.handleError(ResultHandlerDelegate.java:50)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2242)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:508)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:850)
	at org.pos

In [368]:

df = Fact(source=datasource,
            model=RespondentModel,
            database="stackoverflow",
            table="respondente",
            sink='PostgresDAO').save()

options = dict({"originalIdColumn": "Respondent", "relationTableId":"respondente_id",
                "intermediaryColumns": ['Respondent', 'LanguageWorkedWith', 'Hobby'],
                "hasAdicionalColumns": True,
                "adicionalColumns": ["Hobby"],
                "adicionalColumnsMapper": {"Hobby":{"name": "momento", "hasValueConversion": True,
                                                    "valuesConversor": {"No": 0, "Yes": 1},
                                                        "type": IntegerType()}},
                'originalColumn': 'LanguageWorkedWith',
                'dimensionalColumn': 'nome', "relationTbID":"resp_usa_linguagem_id",
               'relationTable': 'linguagem_programacao', 'relationDatabase': 'stackoverflow'})

df = IntermediaryEntity(source=datasource,
            model=RespLanguageModel,
            options=options,
            field="LanguageWorkedWith",
            database="stackoverflow",
            table="resp_usa_linguagem",
            embbebedList=True,
            sink='PostgresDAO').save()

options = dict({"originalIdColumn": "Respondent", "relationTableId":"respondente_id",
                "intermediaryColumns": ['Respondent', 'CommunicationTools'], 
                'originalColumn': 'CommunicationTools',
                'dimensionalColumn': 'nome', "relationTbID":"resp_usa_ferramenta_id",
                'relationTable': 'ferramenta_comunic', 'relationDatabase': 'stackoverflow'})

df = IntermediaryEntity(source=datasource,
            model=RespToolsModel,
            options=options,
            field="CommunicationTools",
            database="stackoverflow",
            table="resp_usa_ferramenta",
            embbebedList=True,
            sink='PostgresDAO').save()

LanguageWorkedWith
CommunicationTools
