In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import os 
import sys 
import json 

os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

In [2]:
spark = (
    SparkSession
        .builder
        .appName('poll_to_idxs')
        .getOrCreate()
    )

spark

In [3]:
# the csv poll dir
poll_dir = "../data/poll_data.csv"

# load the csv poll as a dataframe
df = (
    spark.read.format("csv")
    # .option("encoding", "UTF-8")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(poll_dir)
)

# drop unscored columns
df = (
    df.drop("Informazioni cronologiche")
    .drop(
        "Ho letto e accettato l'informativa e confermo inoltre di avere più di 18 anni"
    )
    .drop("Quanti anni hai?")
    .drop("Genere")
    .drop("Da quante persone è composto il tuo nucleo familiare?")
    .drop("Occupazione")
    .drop("Quanto è grande la tua azienda?")
    .drop("Da che regione provieni?")
    .drop("Provincia di provenienza")
    .drop("In che regione lavori/studi?")
    .drop("Provincia del luogo di lavoro/studio")
    .drop("Invalidità")
    .drop("Tipo di residenza")
    .drop("Numero di persone con cui convivi")
    .drop("Entrate Familiari Mensili Nette")
    .drop("Entrate Personali Mensili Nette ")
)

# load the questions json
with open("../data/questions.json", "r", encoding="utf-8") as questions_file:
    questions = json.load(questions_file)

# create a question_text:question_idx map, useful to alias the columns
questions_idxs = {questions[q]["question_text"]: q for q in questions.keys()}

# alias the columns (long column names cause bugs and are hard to use)
for k, v in questions_idxs.items():
    df = df.withColumnRenamed(k, v)

df.show(5)

+---+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+----+--------------------+--------------------+--------------------+----+-------------------+--------------------+-------+--------------------+--------------------+---+---+------------------+--------------+---------------+----+--------------+------------+------------+--------------------+------------+----+--------------+-------------+--------------------+-------+----------------+--------------------+---------------+--------------------+-------+--------------------+----------------+--------------------+---------------+--------------------+--------------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+-----------+-------+-------+-------+-----------+-----------+-------+-----------+------+-----+-----------------+
|S_1|S_2|                 S_3|    

In [4]:
# iterate over the df's columns to create the scores dictionary
scores = {}
for col in df.columns:
    # shortcuts for some useful values
    question_type = questions[col]["question_type"]
    question_score = questions[col]["question_score"]
    question_answers = questions[col]["answers"]

    # skip unscored questions
    if len(question_answers) == 0:
        continue

    # collect the Row values for the current column and access its value (index 0)
    answers = [str(d[0]) for d in df.select(col).collect()]

    # handle score computation differently based on question_type
    row_scores = []
    if question_type in ["basic", "multivalue"]:
        for a in answers:
            row_scores.append(question_answers[a]["answer_score"] * question_score)

        scores[col] = row_scores

    elif question_type == "comma_separated":
        # todo: implement comma_separated questions handler
        scores[col] = [0 for i in range(52)]

In [8]:
# create the scores dataframe from the scores dictionary
df_scores = spark.createDataFrame(pd.DataFrame(scores))
df_scores.show(5)

+----+----+-----+----+----+-----+----+----+-----+----+-----+-----+----+-----+---+---+-----+-----+-----+---+---+-----+-----+----+----+----+----+----+----+-----+----+----+-----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+-------+-------+-------+-------+-------+-------+-------+-----+-----+----+
| S_1| S_2|  S_3| S_4| S_5|  S_6| S_7| S_8|  S_9|V_10| V_11| V_12|C_13| C_14|m_1|m_2|  m_3|  m_4|  m_5|m_6|m_7|  m_8|  m_9|m_10|m_11|m_12|m_13|m_14|m_15| m_16|m_17|m_18| m_19|m_20|eh1|eh4|eh5|ew1|ew4|ww1|ww2|ww3|ww4|wh1|wh2|wh3|wh4|wastew1|wastew2|wastew3|wastew4|wasteh1|wasteh2|wasteh3|wasteh4| ER_1| ER_2|ER_3|
+----+----+-----+----+----+-----+----+----+-----+----+-----+-----+----+-----+---+---+-----+-----+-----+---+---+-----+-----+----+----+----+----+----+----+-----+----+----+-----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+-------+-------+-------+-------+-------+-------+-------+-----+-----+----+
|37.5|37.5|  0.0|49.5|99.0|150.0|99.0|99.0| 50.0|37.5|150.

In [6]:
# the indexes dictionary, which will be used to compute the indexes dataframe
indexes_cols = {}

## Purchases indexes

In [7]:
df_s = df_scores.select([f"S_{d}" for d in range(1, 10)]).collect()

i_S_col = []
for s in df_s:
    i_S_col.append(sum([val for val in s])/1450)

indexes_cols['i_S'] = i_S_col

## Mobility indexes

## Energy indexes