In [3]:
from core.connection import get_session, save_on_database, get_from_database
from core.config import settings
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark: SparkSession = get_session()

24/06/11 08:14:19 WARN Utils: Your hostname, IdeaPad-Gaming-3-15IHU6 resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlp0s20f3)
24/06/11 08:14:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/06/11 08:14:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Load data from csv

In [4]:
data: DataFrame = spark.read.csv(
    f"{settings.BASE_DIR / 'data_csv' / 'base_de_respostas_10k_amostra.csv'}",
    sep=",",
    header=True,
    inferSchema=True,
)

data.show(5)

24/06/11 08:14:27 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+-----+----------+-------------+--------------+------------------+--------------------+--------------------+--------------------+--------------------+-----------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------+----------------+---------------------+---------------------+---------------------+---------------------+---------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+----------------+------+----------+---------------+--------------+--------------------+-------------------+--------------------+------------------

### Select only data used in Data Warehouse

In [5]:
data = data.select(
    "Respondent",
    "Hobby",
    "OpenSource",
    "ConvertedSalary",
    "CommunicationTools",
    "LanguageWorkedWith",
    "OperatingSystem",
    "CompanySize",
    "Country",
)

data.show(5)

+----------+-----+----------+---------------+--------------------+--------------------+---------------+--------------------+-------------+
|Respondent|Hobby|OpenSource|ConvertedSalary|  CommunicationTools|  LanguageWorkedWith|OperatingSystem|         CompanySize|      Country|
+----------+-----+----------+---------------+--------------------+--------------------+---------------+--------------------+-------------+
|    101346|   No|        No|        50000.0|               Slack|C#;JavaScript;SQL...|        Windows|100 to 499 employees|United States|
|     44791|  Yes|        No|        79552.0|Jira;Other wiki t...|C;C++;Java;JavaSc...|        Windows|1,000 to 4,999 em...|      Germany|
|     32306|  Yes|       Yes|       125000.0|Office / producti...|Assembly;C;C++;C#...|        Windows|10,000 or more em...|United States|
|     37142|  Yes|        No|           NULL|        Slack;Trello|C++;Java;JavaScri...|          MacOS|  10 to 19 employees|United States|
|     21745|  Yes|       Ye

### Add id column

In [6]:
data = data.withColumn("id", F.col("Respondent"))
data.show(5)

+----------+-----+----------+---------------+--------------------+--------------------+---------------+--------------------+-------------+------+
|Respondent|Hobby|OpenSource|ConvertedSalary|  CommunicationTools|  LanguageWorkedWith|OperatingSystem|         CompanySize|      Country|    id|
+----------+-----+----------+---------------+--------------------+--------------------+---------------+--------------------+-------------+------+
|    101346|   No|        No|        50000.0|               Slack|C#;JavaScript;SQL...|        Windows|100 to 499 employees|United States|101346|
|     44791|  Yes|        No|        79552.0|Jira;Other wiki t...|C;C++;Java;JavaSc...|        Windows|1,000 to 4,999 em...|      Germany| 44791|
|     32306|  Yes|       Yes|       125000.0|Office / producti...|Assembly;C;C++;C#...|        Windows|10,000 or more em...|United States| 32306|
|     37142|  Yes|        No|           NULL|        Slack;Trello|C++;Java;JavaScri...|          MacOS|  10 to 19 employees|

### Change respondent name as described in the challenge.


In [7]:
@F.udf(returnType=T.StringType())
def _add_name_prefix(x):
    return f"respondent_{x}"


data = data.withColumn("name", _add_name_prefix(F.col("Respondent"))).drop("Respondent")
data.show(5)

+-----+----------+---------------+--------------------+--------------------+---------------+--------------------+-------------+------+-----------------+
|Hobby|OpenSource|ConvertedSalary|  CommunicationTools|  LanguageWorkedWith|OperatingSystem|         CompanySize|      Country|    id|             name|
+-----+----------+---------------+--------------------+--------------------+---------------+--------------------+-------------+------+-----------------+
|   No|        No|        50000.0|               Slack|C#;JavaScript;SQL...|        Windows|100 to 499 employees|United States|101346|respondent_101346|
|  Yes|        No|        79552.0|Jira;Other wiki t...|C;C++;Java;JavaSc...|        Windows|1,000 to 4,999 em...|      Germany| 44791| respondent_44791|
|  Yes|       Yes|       125000.0|Office / producti...|Assembly;C;C++;C#...|        Windows|10,000 or more em...|United States| 32306| respondent_32306|
|  Yes|        No|           NULL|        Slack;Trello|C++;Java;JavaScri...|      

### Insert data in operation_system

In [None]:
operating_system = (
    data.select(F.col("OperatingSystem").alias("name")).distinct().dropna()
)

save_on_database(operating_system, "operation_system")

### Insert data in country

In [None]:
country = data.select(F.col("Country").alias("name")).distinct().dropna()

save_on_database(country, "country")

### Insert data in company

In [None]:
company = data.select(F.col("CompanySize").alias("size")).distinct().dropna()
save_on_database(company, "company")

## Insert data in respondent

#### Create function to reference id from database & replace value

In [9]:
def get_reference_from_database(
    spark_session: SparkSession, table_name: str
) -> dict[str, int]:
    try:
        df: DataFrame = get_from_database(spark_session, table_name)
    except Exception:
        print("Connection Error")

    return {v: k for k, v in dict(df.collect()).items()}


def replace_value(x, to_replace: dict):
    return to_replace.get(x, T.NullType())

### create column company_id

In [10]:
company_db = get_reference_from_database(spark, "company")


@F.udf(returnType=T.IntegerType())
def company_replace_udf(x):
    return replace_value(x, company_db)


data = data.withColumn("company_id", company_replace_udf(F.col("CompanySize"))).drop(
    "CompanySize"
)
data.show(5)

+-----+----------+---------------+--------------------+--------------------+---------------+-------------+------+-----------------+----------+
|Hobby|OpenSource|ConvertedSalary|  CommunicationTools|  LanguageWorkedWith|OperatingSystem|      Country|    id|             name|company_id|
+-----+----------+---------------+--------------------+--------------------+---------------+-------------+------+-----------------+----------+
|   No|        No|        50000.0|               Slack|C#;JavaScript;SQL...|        Windows|United States|101346|respondent_101346|         2|
|  Yes|        No|        79552.0|Jira;Other wiki t...|C;C++;Java;JavaSc...|        Windows|      Germany| 44791| respondent_44791|         4|
|  Yes|       Yes|       125000.0|Office / producti...|Assembly;C;C++;C#...|        Windows|United States| 32306| respondent_32306|         7|
|  Yes|        No|           NULL|        Slack;Trello|C++;Java;JavaScri...|          MacOS|United States| 37142| respondent_37142|         8|

#### create column operation_system_id

In [11]:
os_db = get_reference_from_database(spark, "operation_system")


@F.udf(returnType=T.IntegerType())
def os_replace_udf(x):
    return replace_value(x, os_db)


data = data.withColumn(
    "operation_system_id", os_replace_udf(F.col("OperatingSystem"))
).drop("OperatingSystem")
data.show(5)

+-----+----------+---------------+--------------------+--------------------+-------------+------+-----------------+----------+-------------------+
|Hobby|OpenSource|ConvertedSalary|  CommunicationTools|  LanguageWorkedWith|      Country|    id|             name|company_id|operation_system_id|
+-----+----------+---------------+--------------------+--------------------+-------------+------+-----------------+----------+-------------------+
|   No|        No|        50000.0|               Slack|C#;JavaScript;SQL...|United States|101346|respondent_101346|         2|                  4|
|  Yes|        No|        79552.0|Jira;Other wiki t...|C;C++;Java;JavaSc...|      Germany| 44791| respondent_44791|         4|                  4|
|  Yes|       Yes|       125000.0|Office / producti...|Assembly;C;C++;C#...|United States| 32306| respondent_32306|         7|                  4|
|  Yes|        No|           NULL|        Slack;Trello|C++;Java;JavaScri...|United States| 37142| respondent_37142|   

#### create column country_id

In [12]:
country_db = get_reference_from_database(spark, "country")


@F.udf(returnType=T.IntegerType())
def country_replace_udf(x):
    return replace_value(x, country_db)


data = data.withColumn("country_id", country_replace_udf(F.col("Country"))).drop(
    "Country"
)

data.show(5)

+-----+----------+---------------+--------------------+--------------------+------+-----------------+----------+-------------------+----------+
|Hobby|OpenSource|ConvertedSalary|  CommunicationTools|  LanguageWorkedWith|    id|             name|company_id|operation_system_id|country_id|
+-----+----------+---------------+--------------------+--------------------+------+-----------------+----------+-------------------+----------+
|   No|        No|        50000.0|               Slack|C#;JavaScript;SQL...|101346|respondent_101346|         2|                  4|        27|
|  Yes|        No|        79552.0|Jira;Other wiki t...|C;C++;Java;JavaSc...| 44791| respondent_44791|         4|                  4|         8|
|  Yes|       Yes|       125000.0|Office / producti...|Assembly;C;C++;C#...| 32306| respondent_32306|         7|                  4|        27|
|  Yes|        No|           NULL|        Slack;Trello|C++;Java;JavaScri...| 37142| respondent_37142|         8|                  3|    

#### transform salary null to 0.0

In [13]:
data = data.fillna(0.0, subset="ConvertedSalary")
data.show(5)

+-----+----------+---------------+--------------------+--------------------+------+-----------------+----------+-------------------+----------+
|Hobby|OpenSource|ConvertedSalary|  CommunicationTools|  LanguageWorkedWith|    id|             name|company_id|operation_system_id|country_id|
+-----+----------+---------------+--------------------+--------------------+------+-----------------+----------+-------------------+----------+
|   No|        No|        50000.0|               Slack|C#;JavaScript;SQL...|101346|respondent_101346|         2|                  4|        27|
|  Yes|        No|        79552.0|Jira;Other wiki t...|C;C++;Java;JavaSc...| 44791| respondent_44791|         4|                  4|         8|
|  Yes|       Yes|       125000.0|Office / producti...|Assembly;C;C++;C#...| 32306| respondent_32306|         7|                  4|        27|
|  Yes|        No|            0.0|        Slack;Trello|C++;Java;JavaScri...| 37142| respondent_37142|         8|                  3|    

#### Transform salary

In [14]:
@F.udf(returnType=T.DoubleType())
def convert_salary(x):
    return round((x / settings.YEAR_MONTHS) * settings.DOLLAR_EXCHANGE, 2)


data = data.withColumn("salary", convert_salary(F.col("ConvertedSalary"))).drop(
    "ConvertedSalary"
)
data.show(5)

+-----+----------+--------------------+--------------------+------+-----------------+----------+-------------------+----------+--------+
|Hobby|OpenSource|  CommunicationTools|  LanguageWorkedWith|    id|             name|company_id|operation_system_id|country_id|  salary|
+-----+----------+--------------------+--------------------+------+-----------------+----------+-------------------+----------+--------+
|   No|        No|               Slack|C#;JavaScript;SQL...|101346|respondent_101346|         2|                  4|        27| 15875.0|
|  Yes|        No|Jira;Other wiki t...|C;C++;Java;JavaSc...| 44791| respondent_44791|         4|                  4|         8|25257.76|
|  Yes|       Yes|Office / producti...|Assembly;C;C++;C#...| 32306| respondent_32306|         7|                  4|        27| 39687.5|
|  Yes|        No|        Slack;Trello|C++;Java;JavaScri...| 37142| respondent_37142|         8|                  3|        27|     0.0|
|  Yes|       Yes|        Slack;Trello|C;

#### create respondent table

In [15]:
respondent = (
    data.select(
        "id",
        "name",
        "OpenSource",
        "Hobby",
        "salary",
        "operation_system_id",
        "country_id",
        "company_id",
    )
    .withColumns(
        {
            "open_source": F.col("OpenSource").cast(T.BooleanType()),
            "hobby": F.col("Hobby").cast(T.BooleanType()),
        }
    )
    .drop("OpenSource")
)
respondent.show(5)

+------+-----------------+-----+--------+-------------------+----------+----------+-----------+
|    id|             name|hobby|  salary|operation_system_id|country_id|company_id|open_source|
+------+-----------------+-----+--------+-------------------+----------+----------+-----------+
|101346|respondent_101346|false| 15875.0|                  4|        27|         2|      false|
| 44791| respondent_44791| true|25257.76|                  4|         8|         4|      false|
| 32306| respondent_32306| true| 39687.5|                  4|        27|         7|       true|
| 37142| respondent_37142| true|     0.0|                  3|        27|         8|      false|
| 21745| respondent_21745| true|     0.0|                  4|        85|         5|       true|
+------+-----------------+-----+--------+-------------------+----------+----------+-----------+
only showing top 5 rows



#### Insert data in respondent

In [None]:
save_on_database(respondent, "respondent")

                                                                                

### Create function to separate values in CommunicationTools and LanguageWorkedWith

In [None]:
def sep_col_values(df: DataFrame, col: str, sep: str = ";") -> DataFrame:

    col_separated = data.select((F.split(col, sep))).rdd.flatMap(lambda x: x).collect()

    col_separated = set(
        [item for sublist in col_separated if sublist is not None for item in sublist]
    )

    return spark.createDataFrame(
        col_separated,
        schema=T.StringType(),
    ).withColumnRenamed("value", "name")

### Insert data in programming_language

In [33]:
programming_language = sep_col_values(data, "LanguageWorkedWith")
save_on_database(programming_language, "programming_language")
programming_language.show(5)

+----------+
|      name|
+----------+
|        F#|
|         C|
|   Haskell|
|TypeScript|
|      Perl|
+----------+
only showing top 5 rows



### Insert data in communications_tools

In [37]:
communications_tools = sep_col_values(data, "CommunicationTools")
save_on_database(communications_tools, "communications_tools")
communications_tools.show(5)

+--------------------+
|                name|
+--------------------+
|Google Hangouts/Chat|
|                Jira|
|Other wiki tool (...|
|Office / producti...|
|             HipChat|
+--------------------+
only showing top 5 rows



### Create function to separate every language in row

In [94]:
def col_list_to_df(df: DataFrame, col: str, sep: str = ";") -> DataFrame:

    data_rdd = (
        df.select("id", F.split(col, ";")).dropna().rdd.collect()
    )

    data_list = [list(zip([b] * len(c), c)) for b, c in data_rdd]

    split_data = [y for x in data_list for y in x]
    
    return spark.createDataFrame(
        split_data,
        T.StructType(
            [
                T.StructField("respondent_id", T.LongType()),
                T.StructField("name", T.StringType()),
            ]
        ),
    )

#### Create dataframe resp_programming_language

In [96]:
programming_language_ref = get_reference_from_database(spark, "programming_language")

resp_programming_language = col_list_to_df(data, "LanguageWorkedWith")

@F.udf(returnType=T.IntegerType())
def programming_replace_udf(x):
    return replace_value(x, programming_language_ref)

resp_programming_language = resp_programming_language.withColumn(
    "programming_language_id",
    programming_replace_udf(F.col("name"))
    ).drop("name")

resp_programming_language.show(5)

+-------------+-----------------------+
|respondent_id|programming_language_id|
+-------------+-----------------------+
|       101346|                      9|
|       101346|                     15|
|       101346|                     22|
|       101346|                     11|
|       101346|                      1|
+-------------+-----------------------+
only showing top 5 rows



#### Create dataframe resp_tools

In [106]:
communications_tools_ref = get_reference_from_database(spark, "communications_tools")

resp_tools = col_list_to_df(data, "CommunicationTools")

@F.udf(returnType=T.IntegerType())
def communication_replace_udf(x):
    return replace_value(x, communications_tools_ref)

resp_tools = resp_tools.withColumn(
    "communications_tools_id",
    communication_replace_udf(F.col("name"))
    ).drop("name")

resp_tools.show(5)

+-------------+-----------------------+
|respondent_id|communications_tools_id|
+-------------+-----------------------+
|       101346|                      2|
|        44791|                      3|
|        44791|                     10|
|        32306|                     11|
|        32306|                      2|
+-------------+-----------------------+
only showing top 5 rows

