### Motivação

Trabalho de conclusão de módulo no qual foi solicitado:

**Desenvolva um notebook bem documentado em Python utilizando a biblioteca PySpark para treinar um modelo de aprendizado de máquina**

* Contextualize bem o problema e os dados disponíveis, assim como o algoritmo ou técnica que será utilizada (regressão linear, regressão logística, clusterização, etc.) 
* Mostre o tratamento realizado no dataframe
* Divida os dados em dois conjuntos diferentes: um de treinamento e outro de teste
* Treine um modelo e apresente métricas de desempenho do modelo gerado
* Aplique o modelo na base de teste e compare o desempenho com a base de treinamento
* Proponha sugestões para melhorar a qualidade do modelo e, se for possível, teste se essas sugestões melhoram o primeiro resultado obtido


### Definição da database

The Challenge: Predict student's drop
🎯 **You have to predict whether the user (student) will be able to successfully complete an online (university) course.**

- Data description:
    * events_train.csv - data on actions performed by students with steps
    * step_id - step id
    * user_id - anonymized user id
    * timestamp - time of occurrence of the event in unix date format
    * action - event, possible values:
    * discovered - the user switched to step
    * viewed - view step,
    * started_attempt - the beginning of an attempt to solve a step
    * passed - a good solution to a practical step

- submissions_train.csv - data on the time and status of submissions for practical tasks
    * step_id - step ID
    * timestamp - time of sending the solution in unix date format
    * submission_status - decision status
    * user_id - anonymized user id
    
Fonte do dataset: https://www.kaggle.com/datasets/kapturovalexander/predict-students-drop-out-of-the-course

### Imports and sessions

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.ml import Pipeline


Criando sessão spark

In [2]:
spark = SparkSession.builder.appName('prediction').getOrCreate()

Lendo os datasets

In [3]:
events_df = spark.read.csv('./students/event_data_train.csv', header=True, inferSchema=True)
submissions_df = spark.read.csv('./students/submissions_data_train.csv', header=True, inferSchema=True)

### EDA

#### Verificação dos dados

In [4]:
events_df.show(5)

+-------+----------+----------+-------+
|step_id| timestamp|    action|user_id|
+-------+----------+----------+-------+
|  32815|1434340848|    viewed|  17632|
|  32815|1434340848|    passed|  17632|
|  32815|1434340848|discovered|  17632|
|  32811|1434340895|discovered|  17632|
|  32811|1434340895|    viewed|  17632|
+-------+----------+----------+-------+
only showing top 5 rows



In [5]:
submissions_df.show(5)

+-------+----------+-----------------+-------+
|step_id| timestamp|submission_status|user_id|
+-------+----------+-----------------+-------+
|  31971|1434349275|          correct|  15853|
|  31972|1434348300|          correct|  15853|
|  31972|1478852149|            wrong|  15853|
|  31972|1478852164|          correct|  15853|
|  31976|1434348123|            wrong|  15853|
+-------+----------+-----------------+-------+
only showing top 5 rows



#### Verificação dtypes

In [6]:
events_df.dtypes

[('step_id', 'int'),
 ('timestamp', 'int'),
 ('action', 'string'),
 ('user_id', 'int')]

In [7]:
submissions_df.dtypes

[('step_id', 'int'),
 ('timestamp', 'int'),
 ('submission_status', 'string'),
 ('user_id', 'int')]

*Identificamos que a coluna timestamp dos 2 datasets estão em formato int, onde devería ser timestamp.

In [8]:
events_df = events_df.withColumn("timestamp", F.from_unixtime(F.col("timestamp")))
submissions_df = submissions_df.withColumn("timestamp", F.from_unixtime(F.col("timestamp")))


*Verificando os dtypes novamente*

In [9]:
events_df.show(5)

+-------+-------------------+----------+-------+
|step_id|          timestamp|    action|user_id|
+-------+-------------------+----------+-------+
|  32815|2015-06-15 01:00:48|    viewed|  17632|
|  32815|2015-06-15 01:00:48|    passed|  17632|
|  32815|2015-06-15 01:00:48|discovered|  17632|
|  32811|2015-06-15 01:01:35|discovered|  17632|
|  32811|2015-06-15 01:01:35|    viewed|  17632|
+-------+-------------------+----------+-------+
only showing top 5 rows



In [10]:
submissions_df.show(5)

+-------+-------------------+-----------------+-------+
|step_id|          timestamp|submission_status|user_id|
+-------+-------------------+-----------------+-------+
|  31971|2015-06-15 03:21:15|          correct|  15853|
|  31972|2015-06-15 03:05:00|          correct|  15853|
|  31972|2016-11-11 06:15:49|            wrong|  15853|
|  31972|2016-11-11 06:16:04|          correct|  15853|
|  31976|2015-06-15 03:02:03|            wrong|  15853|
+-------+-------------------+-----------------+-------+
only showing top 5 rows



*Verificando se há valores nulos*

*Esta list comprehension se lê da seguinte forma (vou tentar ser o mais didático possível) **Count** em todas as linhas, **when** a **col** 'c' for nula (isNull) fazer isto para cada coluna no dataframe*

In [11]:
events_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in events_df.columns]).show()

+-------+---------+------+-------+
|step_id|timestamp|action|user_id|
+-------+---------+------+-------+
|      0|        0|     0|      0|
+-------+---------+------+-------+



In [12]:
submissions_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in submissions_df.columns]).show()

+-------+---------+-----------------+-------+
|step_id|timestamp|submission_status|user_id|
+-------+---------+-----------------+-------+
|      0|        0|                0|      0|
+-------+---------+-----------------+-------+



*Verificando estatísticas / describe*

In [13]:
events_df.describe().show()

+-------+------------------+-------------------+----------+------------------+
|summary|           step_id|          timestamp|    action|           user_id|
+-------+------------------+-------------------+----------+------------------+
|  count|           3480703|            3480703|   3480703|           3480703|
|   mean| 34873.57537572151|               NULL|      NULL|13314.677731193957|
| stddev|14222.427500816972|               NULL|      NULL| 7779.098635540614|
|    min|             30456|2015-06-15 01:00:48|discovered|                 1|
|    max|            158433|2018-05-19 20:33:31|    viewed|             26798|
+-------+------------------+-------------------+----------+------------------+



In [14]:
submissions_df.describe().show()

+-------+-----------------+-------------------+-----------------+------------------+
|summary|          step_id|          timestamp|submission_status|           user_id|
+-------+-----------------+-------------------+-----------------+------------------+
|  count|           509104|             509104|           509104|            509104|
|   mean|33732.59143318458|               NULL|             NULL|13198.874945001415|
| stddev|9379.377465332262|               NULL|             NULL| 7820.240677658117|
|    min|            31971|2015-06-15 01:02:24|          correct|                 2|
|    max|           120745|2018-05-19 20:07:07|            wrong|             26798|
+-------+-----------------+-------------------+-----------------+------------------+



*Verificando a distribuição das actions e submission_status*

In [15]:
events_df.groupBy('action').count().show()
submissions_df.groupBy('submission_status').count().show()

+---------------+-------+
|         action|  count|
+---------------+-------+
|         viewed|1830830|
|         passed| 650331|
|started_attempt| 310047|
|     discovered| 689495|
+---------------+-------+

+-----------------+------+
|submission_status| count|
+-----------------+------+
|            wrong|286399|
|          correct|222705|
+-----------------+------+



### Pré-processamento

#### Enriquecendo a base com dados de tempo

In [16]:
window_spec = Window.partitionBy("user_id").orderBy("timestamp")

In [17]:
events_df = events_df.withColumn("time_diff", (F.unix_timestamp("timestamp") - F.lag(F.unix_timestamp("timestamp")).over(window_spec)))

In [18]:
events_df = events_df.withColumn("time_diff", F.when(F.isnull("time_diff"), 0).otherwise(F.col("time_diff")))

In [19]:
events_df = events_df.withColumn("first_event_timestamp", F.min("timestamp").over(Window.partitionBy("user_id")))

In [20]:
events_df = events_df.withColumn("time_since_first_event", (F.unix_timestamp("timestamp") - F.unix_timestamp("first_event_timestamp")))

*Vamos agrupar por user_id os 2 dataframes e mesclá-los a fim de criar um só dataset para realizar o treinamento*

In [21]:
events_1 = events_df.groupBy("user_id").agg(
    F.count(F.when(events_df["action"] == "passed", 1)).alias("passed_count"),
    F.sum("time_diff").alias("total_time_diff_events"),
    F.max("time_diff").alias("max_time_diff_events"),
    F.min("time_diff").alias("min_time_diff_events"),
    F.avg("time_diff").alias("avg_time_diff_events"),
    F.sum("time_since_first_event").alias("total_time_since_first_event"),
    F.max("time_since_first_event").alias("max_time_since_first_event"),
    F.min("time_since_first_event").alias("min_time_since_first_event"),
    F.avg("time_since_first_event").alias("avg_time_since_first_event")
)

In [22]:
submissions_1 = submissions_df.groupBy('user_id').pivot('submission_status').agg(F.count('submission_status'))


In [23]:
grouped_data = events_1.join(submissions_1, 'user_id', 'inner').fillna(0)


**O desafio está nos dizendo que ele quer aprender a identificar quem irá concluir ou não o curso. Vamos subentender que para concluir o curso ele precisa passar por todas as atividades, não iremos verificar se as questões estavam certas ou erradas, vamos deixar isso para um outro momento**

*Neste describe demonstra que a quantidade máxima de discovered e passed é 198, o que podemos aferir que são ao total 198 questões e para concluir o curso devemos treinar com estes 198*

In [24]:
passed_df = events_df.filter(F.col('action') == 'passed')
user_df = passed_df.groupBy('user_id').count()
concluded_user = user_df.filter(F.col('count') == 198)


In [25]:
concluded_user.describe().show()

+-------+------------------+-----+
|summary|           user_id|count|
+-------+------------------+-----+
|  count|               394|  394|
|   mean|12652.131979695432|198.0|
| stddev| 7847.656972764596|  0.0|
|    min|                22|  198|
|    max|             26781|  198|
+-------+------------------+-----+



In [26]:
submissions_df.select("user_id").distinct().describe().show()

+-------+------------------+
|summary|           user_id|
+-------+------------------+
|  count|              9940|
|   mean|13312.917002012073|
| stddev| 7795.938271033181|
|    min|                 2|
|    max|             26798|
+-------+------------------+



*no concluded_user podemos verificar que somente 394 alunos concluiram o curso, sendo que temos 9940 que iniciaram o curso, temos por volta de 4% de alunos que concluiram. É um percentual pequeno para fazer o treino / teste *

### Treinamento

In [27]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, PCA
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
from pyspark.sql.functions import expr

import matplotlib.pyplot as plt

*Vamos vetorizar as colunas usando o Vector Assembler e adicionar esta coluna ao dataframe grouped_data*

In [28]:
assembler = VectorAssembler(inputCols=grouped_data.columns[1:], outputCol="input_features")

In [29]:
grouped_data = assembler.transform(grouped_data)

In [30]:
'''
silhouette_scores = []

for k in k_values:
    kmeans = KMeans().setK(k).setSeed(1)
    model = kmeans.fit(grouped_data)
    
    # Faça previsões
    predictions = model.transform(grouped_data)
    
    # Avalie o modelo usando o ClusteringEvaluator
    evaluator = ClusteringEvaluator()
    silhouette_score = evaluator.evaluate(predictions)
    
    silhouette_scores.append(silhouette_score)

# Exiba o coeficiente de silhueta para cada valor de k
for i, k in enumerate(k_values):
    print(f"Silhouette Score for k={k}: {silhouette_scores[i]}")

# Encontre o valor de k que maximiza o coeficiente de silhueta
best_k = k_values[silhouette_scores.index(max(silhouette_scores))]
print(f"O melhor valor de k é: {best_k}")
'''

'\nsilhouette_scores = []\n\nfor k in k_values:\n    kmeans = KMeans().setK(k).setSeed(1)\n    model = kmeans.fit(grouped_data)\n    \n    # Faça previsões\n    predictions = model.transform(grouped_data)\n    \n    # Avalie o modelo usando o ClusteringEvaluator\n    evaluator = ClusteringEvaluator()\n    silhouette_score = evaluator.evaluate(predictions)\n    \n    silhouette_scores.append(silhouette_score)\n\n# Exiba o coeficiente de silhueta para cada valor de k\nfor i, k in enumerate(k_values):\n    print(f"Silhouette Score for k={k}: {silhouette_scores[i]}")\n\n# Encontre o valor de k que maximiza o coeficiente de silhueta\nbest_k = k_values[silhouette_scores.index(max(silhouette_scores))]\nprint(f"O melhor valor de k é: {best_k}")\n'

In [48]:
'''
plt.figure(figsize=(10, 5))
plt.plot(k_values, silhouette_scores, 'bo-')
plt.title('Coeficiente de Silhueta para Valores de K')
plt.xlabel('Número de Clusters (K)')
plt.ylabel('Coeficiente de Silhueta')
plt.grid(True)
'''

"\nplt.figure(figsize=(10, 5))\nplt.plot(k_values, silhouette_scores, 'bo-')\nplt.title('Coeficiente de Silhueta para Valores de K')\nplt.xlabel('Número de Clusters (K)')\nplt.ylabel('Coeficiente de Silhueta')\nplt.grid(True)\n"

In [42]:
kmeans = KMeans(k=3, seed=1, featuresCol="input_features", predictionCol="cluster")


In [43]:
pipeline = Pipeline(stages=[kmeans])

In [44]:
model = pipeline.fit(grouped_data)

In [56]:
clustered_data = model.transform(grouped_data).toPandas()

In [57]:
clustered_data.to_excel('./predition.xlsx',index=False)