In [120]:
# Importing libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, TimestampType, FloatType

In [59]:
# Initializing Spark Session
spark = SparkSession.builder.appName('case_2').getOrCreate()

In [92]:
# Loading the data from CSV files
dim_users = spark.read.option('header','true').csv('dim_users.csv')
fact_operations = spark.read.option('header','true').csv('fact_operations.csv')

In [93]:
# Previewing the datasets
dim_users.show(5)

+-----------+-------------------+--------+-------------------+---------------------+-------------------+
|id_dim_user|         created_at| country| first_operation_at|first_operation_value|         updated_at|
+-----------+-------------------+--------+-------------------+---------------------+-------------------+
|      19101|2020-01-12 17:34:12|Portugal|2020-01-13 11:11:12|             10250.50|2020-01-03 11:11:12|
|      22111|2020-02-09 22:11:11|  Brazil|2020-03-12 10:01:04|              5000.23|2020-04-12 12:12:04|
|      33345|2021-01-12 08:01:11|  Brazil|2021-01-29 12:12:10|              7000.54|2021-01-29 12:12:10|
|       1324|2021-03-12 12:33:33|  Brazil|2021-04-29 10:03:12|              4998.10|2021-04-29 10:03:12|
+-----------+-------------------+--------+-------------------+---------------------+-------------------+



In [94]:
fact_operations.show(5)

+-------+-----------+-------------------+-------+--------------------+--------------+---------------+
|id_fact|id_dim_user|       processed_at|  value|              nature|country_region|country_destiny|
+-------+-----------+-------------------+-------+--------------------+--------------+---------------+
| 122331|      19101|2020-01-13 11:11:12|10250.5|recebimento de sa...|      Portugal|         Brazil|
| 125841|      22111|2020-03-12 10:01:04|5000.23| envio para familiar|        Brazil|        England|
| 213452|      33345|2021-01-29 12:12:10|7000.54|recebimento de sa...|        France|         Brazil|
| 100037|       1324|2021-04-29 10:03:12|4998.10|  compra de produtos|        Brazil|          China|
| 412444|      19101|2019-02-01 15:29:05|3951.78| pagamento de boleto|         China|         Brazil|
+-------+-----------+-------------------+-------+--------------------+--------------+---------------+
only showing top 5 rows



## Data Wrangling

### Casting

In [95]:
# Casting Timestamp and Float columns
dim_users_casted = dim_users \
    .withColumn('created_at', F.col('created_at').cast(TimestampType())) \
    .withColumn('first_operation_at', F.col('first_operation_at').cast(TimestampType())) \
    .withColumn('first_operation_value', F.col('first_operation_value').cast(FloatType())) \
    .withColumn('updated_at', F.col('updated_at').cast(TimestampType())) 

In [96]:
fact_operations_casted = fact_operations \
    .withColumn('processed_at', F.col('processed_at').cast(TimestampType())) \
    .withColumn('value', F.col('value').cast(FloatType()))

## Primeira Tarefa

### 1 - Primeira e última operação de cada usuário

In [99]:
fact_operations \
    .groupBy('id_dim_user') \
    .agg(
        # Calculating the user's FIRST operation
        F.min('processed_at').alias('first_operation'),
        # Calculating the user's LAST operation
        F.max('processed_at').alias('last_operation')) \
    # Selecting only users whose first operation was made before January 01, 2020
    .filter(F.col('first_operation') >= '2020-01-01') \
    .show()

+-----------+-------------------+-------------------+
|id_dim_user|    first_operation|     last_operation|
+-----------+-------------------+-------------------+
|      33345|2020-01-03 23:13:10|2021-01-29 12:12:10|
|       1324|2021-01-04 12:24:02|2021-04-29 10:03:12|
+-----------+-------------------+-------------------+



### 2 - Quantidade de operações de cada usuário no período total

In [117]:
fact_operations \
    .groupBy('id_dim_user') \
    .agg(
        # Counting distinct operations by user 
        F.countDistinct('id_fact')) \
    .show()

+-----------+--------------+
|id_dim_user|count(id_fact)|
+-----------+--------------+
|      33345|             3|
|      22111|             4|
|      19101|             4|
|       1324|             2|
+-----------+--------------+



### 3 - Valores mínimos, médios e máximos operados para cada usuário

In [119]:
fact_operations \
    .groupBy('id_dim_user') \
    .agg(
        # Calculating the minimum value
        F.min('value').alias('minimum_value'),
        # Calculating the maximum value
        F.max('value').alias('maximum_value'),
        # Calculating the mean value and round the decimals to two places
        F.round(F.mean('value'),2).alias('mean_value')) \
    .show()

+-----------+-------------+-------------+----------+
|id_dim_user|minimum_value|maximum_value|mean_value|
+-----------+-------------+-------------+----------+
|      33345|       422.45|      7000.54|   4807.84|
|      22111|      4421.09|      5839.19|   5170.17|
|      19101|      10250.5|      3951.78|   6725.82|
|       1324|       458.10|      4998.10|    2728.1|
+-----------+-------------+-------------+----------+



## Segunda Tarefa

In [150]:
# Creating the Window
month_window = Window \
    .orderBy(F.col('processed_month')) \
    .rowsBetween(-3,-1)

In [157]:
# Creating the new table required
montlhy_operations = fact_operations \
    .groupBy( 
        # Truncating 'processed_at' to month
        F.date_trunc(
            'month',
            F.col('processed_at'))
        .alias('processed_month')) \
    .agg(
        # Counting distinct operations by month
        F.countDistinct('id_fact')
        .alias('operations')) \
    .withColumn(
        # Creating moving average column and making some adjustments
        'avg_operations_last_3m',
        F.round(
            F.coalesce(
                F.mean(F.col('operations')).over(month_window),
                F.lit(0.0)),
            2))

In [158]:
montlhy_operations.show()

+-------------------+----------+----------------------+
|    processed_month|operations|avg_operations_last_3m|
+-------------------+----------+----------------------+
|2019-02-01 00:00:00|         2|                   0.0|
|2019-10-01 00:00:00|         1|                   2.0|
|2020-01-01 00:00:00|         2|                   1.5|
|2020-02-01 00:00:00|         1|                  1.67|
|2020-03-01 00:00:00|         1|                  1.33|
|2020-05-01 00:00:00|         1|                  1.33|
|2021-01-01 00:00:00|         3|                   1.0|
|2021-04-01 00:00:00|         2|                  1.67|
+-------------------+----------+----------------------+

