# **Sinch Challenge**

### The challenge consists of analyzing and extracting information from a database originating from a chatbot and providing information that answers business questions.

### That said, let's evaluate what we can extract from this data

In [61]:
# Installing required packages
!pip install pyspark boto3

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [62]:
# Importing just what will be used
import os.path
import time
from datetime import timedelta

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.ml.feature import Bucketizer
from pyspark.sql.types import StringType

import boto3, botocore
from botocore import UNSIGNED
from botocore.config import Config

In [63]:
# Initializing processing time
START_TIME = time.monotonic()

In [64]:
# Defining variables
BUCKET = 'public-sinch-latam-case-data'
PATH = 'data-engineering/data.csv'
FILE = 'data.csv'

In [65]:
# Connecting to S3
s3 = boto3.resource('s3', config=Config(signature_version=UNSIGNED))

In [66]:
# Check if the file already exists and download it if not
if not os.path.isfile(FILE):
    try:
        s3.Bucket(BUCKET).download_file(PATH, FILE)
    except botocore.exceptions.ClientError as e: 
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise
else:
    print("File already exists.")

File already exists.


In [67]:
# Initializng the spark session
spark = SparkSession.builder.appName("sinch_app").getOrCreate()

In [68]:
# Create a DataFrame from provided file
df = spark.read.option("delimiter", ";").option("header", "true").option("encoding", "UTF-8").csv(FILE)

In [69]:
df.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 timestamp      | 1614567604.0                         
 messageId      | 210c2f5f-f7b2-4061-82bb-2b33b7200e4f 
 conversationId | 40b54398-77db-4810-b707-969bda09490b 
 userId         | bf6a4d079b51b73fddc73a8e0fdd1dd3     
 messageText    | começar                              
 channel        | whatsapp                             
 botId          | 1567                                 
 source         | user                                 
only showing top 1 row



In [70]:
df.count()

516642

In [71]:
# Changing the data from unix formate to timestamp
df = df.withColumn('timestamp', f.from_unixtime(f.col("timestamp"),"yyyy-MM-dd HH:mm:ss"))

In [72]:
df.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 timestamp      | 2021-03-01 03:00:04                  
 messageId      | 210c2f5f-f7b2-4061-82bb-2b33b7200e4f 
 conversationId | 40b54398-77db-4810-b707-969bda09490b 
 userId         | bf6a4d079b51b73fddc73a8e0fdd1dd3     
 messageText    | começar                              
 channel        | whatsapp                             
 botId          | 1567                                 
 source         | user                                 
only showing top 1 row



In [73]:
# Creating a view that will be used in the queries
df.createOrReplaceTempView("view")
spark.catalog.cacheTable("view")

In [74]:
# Answering the first question
sql_1 = """select channel, 
           count(distinct conversationId) as total 
           from view 
           group by channel"""
df_sql_1 = spark.sql(sql_1)
df_sql_1.show()

+-----------------+-----+
|          channel|total|
+-----------------+-----+
|        instagram|10038|
|facebook messeger| 9955|
|         telegram|10187|
|         whatsapp|10044|
|              sms| 9776|
+-----------------+-----+



In [75]:
# Answering the second question

# Showing the top 5 dates with higher volume of conversations
sql_2_top_5 = """select cast(timestamp as date) data, 
                 count(distinct conversationId) qtde 
                 from view 
                 group by data 
                 order by qtde desc"""
df_sql_2_top_5 = spark.sql(sql_2_top_5)
df_sql_2_top_5.show(5)

+----------+----+
|      data|qtde|
+----------+----+
|2021-04-26| 874|
|2021-03-08| 869|
|2021-03-27| 868|
|2021-03-03| 867|
|2021-04-15| 864|
+----------+----+
only showing top 5 rows



In [76]:
# And here we have the day with the highest conversation volume
sql_2 = """with t1 as (
           select cast(timestamp as date) data, 
           count(distinct conversationId) as qtde 
           from view group by data
           ) 
           select data from t1 
           where qtde = (select max(qtde) from t1)"""
df_sql_2 = spark.sql(sql_2)
df_sql_2.show()

+----------+
|      data|
+----------+
|2021-04-26|
+----------+



In [108]:
# Answering the third question

# Starting for query only the question "Qual a sua cidade ?" and your answer
sql_3 = """select distinct userId, pergunta, cidade from (
            select userId,
            messageText as pergunta,
            case
            when (lead(timestamp) over (order by conversationId)) = timestamp
            then (lead(messageText,2) over (order by conversationId))
            else (lead(messageText) over (order by conversationId))
            end as cidade
            from view
            order by conversationId, timestamp  asc
         ) as t
         where pergunta = 'Qual a sua cidade?'
         """
df_sql_3 = spark.sql(sql_3)
df_sql_3.show(truncate=False)

+--------------------------------+------------------+--------------------+
|userId                          |pergunta          |cidade              |
+--------------------------------+------------------+--------------------+
|17dfbcad57ceec09a9fcbd3674842e85|Qual a sua cidade?|macapá              |
|bc96cb0f882cd446acfcad4791c49783|Qual a sua cidade?|teresina            |
|ce3c2420998872a83a56a648368cad72|Qual a sua cidade?|ipatinga            |
|44ae631e5a361d7b0527eee5b227c283|Qual a sua cidade?|Santos              |
|6e17fda734ee734552c24b4aaf049938|Qual a sua cidade?|Maua                |
|d86fac73037b4cd7e4ca4da3a7d1fbc0|Qual a sua cidade?|contagem            |
|a88c9578716e23fa11893cd92a79d5f0|Qual a sua cidade?|teresina            |
|e5a4d5acc5287ba78add44226eff3b51|Qual a sua cidade?|Carapicuiba         |
|d9c7d10110ab80c29fa5c97b64ce4d5b|Qual a sua cidade?|Maceió              |
|9f9107a74ef5ed82a7af117623d8e271|Qual a sua cidade?|Paulista            |
|5fb72a891a19391b76df75df

### Here we have a more complex situation and the something relevant to note. Some user answers have the same timestamp of bot questions and the data is a good way for take the answers. Since, considering a timeline, right after the question we should have a user's answer this is a good point to improve data consistency. Putting a slightly longer delay for the next bot question can be an alternative.

### To work around this situation, a mechanism was placed in the query that checks if the next message has the same timestamp of the actual and jumps to the subsequent message.

In [109]:
# Check the number of cities
df_sql_3.select('cidade').distinct().count()

302

In [110]:
# Lowering the letters to standardize the writing
df_sql_3 = df_sql_3.withColumn("cidade",f.lower(f.col("cidade")))
df_sql_3.show(truncate=False)

+--------------------------------+------------------+--------------------+
|userId                          |pergunta          |cidade              |
+--------------------------------+------------------+--------------------+
|17dfbcad57ceec09a9fcbd3674842e85|Qual a sua cidade?|macapá              |
|bc96cb0f882cd446acfcad4791c49783|Qual a sua cidade?|teresina            |
|ce3c2420998872a83a56a648368cad72|Qual a sua cidade?|ipatinga            |
|44ae631e5a361d7b0527eee5b227c283|Qual a sua cidade?|santos              |
|6e17fda734ee734552c24b4aaf049938|Qual a sua cidade?|maua                |
|d86fac73037b4cd7e4ca4da3a7d1fbc0|Qual a sua cidade?|contagem            |
|a88c9578716e23fa11893cd92a79d5f0|Qual a sua cidade?|teresina            |
|e5a4d5acc5287ba78add44226eff3b51|Qual a sua cidade?|carapicuiba         |
|d9c7d10110ab80c29fa5c97b64ce4d5b|Qual a sua cidade?|maceió              |
|9f9107a74ef5ed82a7af117623d8e271|Qual a sua cidade?|paulista            |
|5fb72a891a19391b76df75df

In [111]:
# Remove additional spaces
df_sql_3 = df_sql_3.withColumn("cidade",f.trim(f.col("cidade")))
df_sql_3.show(truncate=False)

+--------------------------------+------------------+--------------------+
|userId                          |pergunta          |cidade              |
+--------------------------------+------------------+--------------------+
|17dfbcad57ceec09a9fcbd3674842e85|Qual a sua cidade?|macapá              |
|bc96cb0f882cd446acfcad4791c49783|Qual a sua cidade?|teresina            |
|ce3c2420998872a83a56a648368cad72|Qual a sua cidade?|ipatinga            |
|44ae631e5a361d7b0527eee5b227c283|Qual a sua cidade?|santos              |
|6e17fda734ee734552c24b4aaf049938|Qual a sua cidade?|maua                |
|d86fac73037b4cd7e4ca4da3a7d1fbc0|Qual a sua cidade?|contagem            |
|a88c9578716e23fa11893cd92a79d5f0|Qual a sua cidade?|teresina            |
|e5a4d5acc5287ba78add44226eff3b51|Qual a sua cidade?|carapicuiba         |
|d9c7d10110ab80c29fa5c97b64ce4d5b|Qual a sua cidade?|maceió              |
|9f9107a74ef5ed82a7af117623d8e271|Qual a sua cidade?|paulista            |
|5fb72a891a19391b76df75df

In [112]:
# Mapping and changing letters with accents
df_sql_3 = df_sql_3.withColumn('cidade', f.translate('cidade',
                                                   'ãäáöüẞáäčďéěêíĺľňóôõŕšťúůýž ',
                                                   'aaaousaacdeeeillnooorstuuyz_'))
df_sql_3.show(truncate=False)

+--------------------------------+------------------+--------------------+
|userId                          |pergunta          |cidade              |
+--------------------------------+------------------+--------------------+
|17dfbcad57ceec09a9fcbd3674842e85|Qual a sua cidade?|macapa              |
|bc96cb0f882cd446acfcad4791c49783|Qual a sua cidade?|teresina            |
|ce3c2420998872a83a56a648368cad72|Qual a sua cidade?|ipatinga            |
|44ae631e5a361d7b0527eee5b227c283|Qual a sua cidade?|santos              |
|6e17fda734ee734552c24b4aaf049938|Qual a sua cidade?|maua                |
|d86fac73037b4cd7e4ca4da3a7d1fbc0|Qual a sua cidade?|contagem            |
|a88c9578716e23fa11893cd92a79d5f0|Qual a sua cidade?|teresina            |
|e5a4d5acc5287ba78add44226eff3b51|Qual a sua cidade?|carapicuiba         |
|d9c7d10110ab80c29fa5c97b64ce4d5b|Qual a sua cidade?|maceio              |
|9f9107a74ef5ed82a7af117623d8e271|Qual a sua cidade?|paulista            |
|5fb72a891a19391b76df75df

In [113]:
# Check again the number of cities
df_sql_3.select('cidade').distinct().count()

117

In [114]:
# Grouping and counting 
df_sql_3_grouped = df_sql_3.groupBy("cidade").count()

In [84]:
# Top 10 cities with more unique users
df_sql_3_grouped.sort(f.col('count').desc()).show(10)

+------------+-----+
|      cidade|count|
+------------+-----+
|    joinvile|  524|
|  piracicaba|  523|
|ponta_grossa|  518|
|    paulista|  504|
|     barueri|  502|
|      maceio|  501|
|      olinda|  499|
|   cariacica|  497|
|     guaruja|  496|
|        maua|  495|
+------------+-----+
only showing top 10 rows



In [115]:
# And finally just the city with more unique users
df_sql_3_grouped.sort(f.col('count').desc()).show(1)

+--------+-----+
|  cidade|count|
+--------+-----+
|joinvile|  524|
+--------+-----+
only showing top 1 row



In [85]:
# Answering the fourth question
sql_4_pok = """
        select conversationId, userId, pergunta, pokemons--, pokemons2, pokemons3, ...
            from (
            select conversationId,
            userId,
            messageText as pergunta,
            case
            when (lead(timestamp) over (order by conversationId)) = timestamp
            then (lead(messageText,2) over (order by conversationId))
            else (lead(messageText) over (order by conversationId))
            end as pokemons
            --lead(messageText,2) over (order by conversationId) as pokemons2,
            --lead(messageText,3) over (order by conversationId) as pokemons3
            from view
            order by conversationId, userId, timestamp  asc
         ) as t
         where pergunta = 'Quais são seus pokemons favoritos?'
         """
df_sql_4_pok = spark.sql(sql_4_pok)
df_sql_4_pok.show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244       
 userId         | 17dfbcad57ceec09a9fcbd3674842e85           
 pergunta       | Quais são seus pokemons favoritos?         
 pokemons       | pupitar, moltres, latios-mega              
-RECORD 1----------------------------------------------------
 conversationId | 0003ad4a-810d-43ec-a73d-0c753283ec2a       
 userId         | bc96cb0f882cd446acfcad4791c49783           
 pergunta       | Quais são seus pokemons favoritos?         
 pokemons       | drednaw purrloin serperior                 
-RECORD 2----------------------------------------------------
 conversationId | 00040e3e-a8c0-4709-a45b-90f82c8533be       
 userId         | ce3c2420998872a83a56a648368cad72           
 pergunta       | Quais são seus pokemons favoritos?         
 pokemons       | steelix, klang                             
-RECORD 3----------------------------------------------------
 convers

### Here we have another critical point. Here the user can answer the same question more than once, but how many more times? Does the bot wait some time before starting the next question? There is a certain randomness in this situation, making it difficult to establish a pattern for capturing this information. Here comes another point of improvement for the bot: setting the user a message limit for this question or a fixed time in which he can answer or even request that he only respond in one message. As a last suggestion, to try to be the as generalist as possible, make use of a translation table to map the pokemons in the messages, assuming that we have a finite number of them.

In [86]:
# Remove spaces and set comma as the separator
df_sql_4_pok = df_sql_4_pok.withColumn("pokemons",f.lower(f.col("pokemons"))). \
                    withColumn("pokemons",f.trim(f.col("pokemons"))). \
                    withColumn('pokemons', f.translate('pokemons',',','')). \
                    withColumn('pokemons', f.translate('pokemons',' ',','))

df_sql_4_pok.show(100, truncate=False)

+------------------------------------+--------------------------------+----------------------------------+-----------------------------------------------+
|conversationId                      |userId                          |pergunta                          |pokemons                                       |
+------------------------------------+--------------------------------+----------------------------------+-----------------------------------------------+
|000109a0-f902-4bd0-bb3e-d064e9f73244|17dfbcad57ceec09a9fcbd3674842e85|Quais são seus pokemons favoritos?|pupitar,moltres,latios-mega                    |
|0003ad4a-810d-43ec-a73d-0c753283ec2a|bc96cb0f882cd446acfcad4791c49783|Quais são seus pokemons favoritos?|drednaw,purrloin,serperior                     |
|00040e3e-a8c0-4709-a45b-90f82c8533be|ce3c2420998872a83a56a648368cad72|Quais são seus pokemons favoritos?|steelix,klang                                  |
|0006f192-ce5e-4b5a-863c-50b976d9d67f|44ae631e5a361d7b0527eee5b227c283

In [87]:
# Get the age of the users
sql_4_age = """
        select conversationId, pergunta, idade
            from (
            select conversationId,
            messageText as pergunta,
            case
            when (lead(timestamp) over (order by conversationId)) = timestamp
            then (lead(messageText,2) over (order by conversationId))
            else (lead(messageText) over (order by conversationId))
            end as idade
            from view
            order by conversationId, timestamp  asc
         ) as t
         where pergunta = 'Qual a sua idade?'
         """
df_sql_4_age = spark.sql(sql_4_age)
df_sql_4_age.show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244 
 pergunta       | Qual a sua idade?                    
 idade          | 43                                   
-RECORD 1----------------------------------------------
 conversationId | 0003ad4a-810d-43ec-a73d-0c753283ec2a 
 pergunta       | Qual a sua idade?                    
 idade          | 43                                   
-RECORD 2----------------------------------------------
 conversationId | 00040e3e-a8c0-4709-a45b-90f82c8533be 
 pergunta       | Qual a sua idade?                    
 idade          | 45                                   
-RECORD 3----------------------------------------------
 conversationId | 0006f192-ce5e-4b5a-863c-50b976d9d67f 
 pergunta       | Qual a sua idade?                    
 idade          | 48                                   
-RECORD 4----------------------------------------------
 conversationId | 0007ed28-c2a7-4487-8bb7-133092

In [88]:
# Joining dfs
df_sql_4 = df_sql_4_pok.join(df_sql_4_age,["conversationId"])
df_sql_4.show(5, truncate=False, vertical=True)

-RECORD 0--------------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244     
 userId         | 17dfbcad57ceec09a9fcbd3674842e85         
 pergunta       | Quais são seus pokemons favoritos?       
 pokemons       | pupitar,moltres,latios-mega              
 pergunta       | Qual a sua idade?                        
 idade          | 43                                       
-RECORD 1--------------------------------------------------
 conversationId | 0003ad4a-810d-43ec-a73d-0c753283ec2a     
 userId         | bc96cb0f882cd446acfcad4791c49783         
 pergunta       | Quais são seus pokemons favoritos?       
 pokemons       | drednaw,purrloin,serperior               
 pergunta       | Qual a sua idade?                        
 idade          | 43                                       
-RECORD 2--------------------------------------------------
 conversationId | 00040e3e-a8c0-4709-a45b-90f82c8533be     
 userId         | ce3c2420998872a83a56a6

In [89]:
# Exploding pokemons in single rows
df_sql_4 = df_sql_4.withColumn('pokemons',f.explode(f.split('pokemons', ',')))
df_sql_4.show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244 
 userId         | 17dfbcad57ceec09a9fcbd3674842e85     
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | pupitar                              
 pergunta       | Qual a sua idade?                    
 idade          | 43                                   
-RECORD 1----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244 
 userId         | 17dfbcad57ceec09a9fcbd3674842e85     
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | moltres                              
 pergunta       | Qual a sua idade?                    
 idade          | 43                                   
-RECORD 2----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244 
 userId         | 17dfbcad57ceec09a9fcbd3674842e85     
 pergunta       | Quais são seus pokemons favori

In [90]:
# Changing data type of age
df_sql_4 = df_sql_4.withColumn("idade",df_sql_4.idade.cast('int'))

In [91]:
# Creating bins for age
bucketizer = Bucketizer(splits=[0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100], inputCol="idade", outputCol="bin")
df_sql_4 = bucketizer.setHandleInvalid("keep").transform(df_sql_4)

df_sql_4.show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244 
 userId         | 17dfbcad57ceec09a9fcbd3674842e85     
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | pupitar                              
 pergunta       | Qual a sua idade?                    
 idade          | 43                                   
 bin            | 4.0                                  
-RECORD 1----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244 
 userId         | 17dfbcad57ceec09a9fcbd3674842e85     
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | moltres                              
 pergunta       | Qual a sua idade?                    
 idade          | 43                                   
 bin            | 4.0                                  
-RECORD 2----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9

In [92]:
# Setting the result in a new dataframe (This df will be reused in the next questions)
df_sql_4_result = df_sql_4.select('pokemons','bin').where(f.col('bin')==2.0).groupBy("pokemons").count()

In [93]:
# The top 5 pokemons between 20 and 30 years old
df_sql_4_result.sort(f.col('count').desc()).show(5)

+-----------------+-----+
|         pokemons|count|
+-----------------+-----+
|          dewgong|   35|
|    venusaur-mega|   35|
|wishiwashi-school|   34|
|           grotle|   34|
|     yamask-galar|   34|
|    slowbro-galar|   33|
|          tympole|   33|
|        porygon-z|   33|
|       sandaconda|   33|
|          florges|   33|
+-----------------+-----+
only showing top 10 rows



In [94]:
# Answering the fifth question
# Reusing df from last question (to get pokemons answers)
df_sql_5_pok = df_sql_4_pok
df_sql_5_pok.show(5, truncate=False, vertical=True)

-RECORD 0--------------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244     
 userId         | 17dfbcad57ceec09a9fcbd3674842e85         
 pergunta       | Quais são seus pokemons favoritos?       
 pokemons       | pupitar,moltres,latios-mega              
-RECORD 1--------------------------------------------------
 conversationId | 0003ad4a-810d-43ec-a73d-0c753283ec2a     
 userId         | bc96cb0f882cd446acfcad4791c49783         
 pergunta       | Quais são seus pokemons favoritos?       
 pokemons       | drednaw,purrloin,serperior               
-RECORD 2--------------------------------------------------
 conversationId | 00040e3e-a8c0-4709-a45b-90f82c8533be     
 userId         | ce3c2420998872a83a56a648368cad72         
 pergunta       | Quais são seus pokemons favoritos?       
 pokemons       | steelix,klang                            
-RECORD 3--------------------------------------------------
 conversationId | 0006f192-ce5e-4b5a-863

In [95]:
# Reusing df from third question to get the city of joinvile (the city with highest unique users)
df_sql_5_city = df_sql_3.where("cidade='joinvile'")
df_sql_5_city.show(5, truncate=False)

+--------------------------------+------------------+--------+
|userId                          |pergunta          |cidade  |
+--------------------------------+------------------+--------+
|7fa292156b2d6f8edd5510a28434c27a|Qual a sua cidade?|joinvile|
|0293488653d7f84b33c2feb289a4d9c5|Qual a sua cidade?|joinvile|
|8ff32a70afe3a1548bd78fe7f77ab1cc|Qual a sua cidade?|joinvile|
|90c073629ef6adc9e71d53455dec6198|Qual a sua cidade?|joinvile|
|32cfeccc3a8c69f09e66a624cba63f58|Qual a sua cidade?|joinvile|
+--------------------------------+------------------+--------+
only showing top 5 rows



In [96]:
# Joining dataframes
df_sql_5 = df_sql_5_pok.join(df_sql_5_city,["userId"])
df_sql_5.show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 userId         | 006d83000e0c9ebe57ba7f9fe91c55ac     
 conversationId | 76d54659-50ce-4f38-b4f2-0e9154bada8b 
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | togekiss                             
 pergunta       | Qual a sua cidade?                   
 cidade         | joinvile                             
-RECORD 1----------------------------------------------
 userId         | 00b61716991305190cd3b23f49398957     
 conversationId | 8cd36caa-5426-4d0f-af77-91599de6b331 
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | uxie                                 
 pergunta       | Qual a sua cidade?                   
 cidade         | joinvile                             
-RECORD 2----------------------------------------------
 userId         | 01269853445c4fcfe06c7748c02e7b62     
 conversationId | 5961caff-6734-4872-bf9b-825eb47e5c78 
 pergunta       | Quais são seus pokemons favori

In [97]:
# Exploding pokemons in single rows
df_sql_5 = df_sql_5.withColumn('pokemons',f.explode(f.split('pokemons', ',')))
df_sql_5.show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 userId         | 006d83000e0c9ebe57ba7f9fe91c55ac     
 conversationId | 76d54659-50ce-4f38-b4f2-0e9154bada8b 
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | togekiss                             
 pergunta       | Qual a sua cidade?                   
 cidade         | joinvile                             
-RECORD 1----------------------------------------------
 userId         | 00b61716991305190cd3b23f49398957     
 conversationId | 8cd36caa-5426-4d0f-af77-91599de6b331 
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | uxie                                 
 pergunta       | Qual a sua cidade?                   
 cidade         | joinvile                             
-RECORD 2----------------------------------------------
 userId         | 01269853445c4fcfe06c7748c02e7b62     
 conversationId | 5961caff-6734-4872-bf9b-825eb47e5c78 
 pergunta       | Quais são seus pokemons favori

In [98]:
# Counting pokemons
df_sql_5 = df_sql_5.select('pokemons').groupBy("pokemons").count()

In [99]:
# Showing the top 10 pokemons in the city with highest unique users (joinvile)
df_sql_5.sort(f.col('count').desc()).show(10)

+--------------------+-----+
|            pokemons|count|
+--------------------+-----+
|             blipbug|    5|
|               entei|    5|
|           rillaboom|    4|
|           cryogonal|    4|
|           volcarona|    4|
|              gligar|    4|
|           obstagoon|    4|
|             ninjask|    4|
|raticate-totem-alola|    4|
|          galvantula|    4|
+--------------------+-----+
only showing top 10 rows



In [100]:
# Answering the sixth question
# Reusing df from fourth question (to get pokemons and age answers)
df_sql_6_pok = df_sql_4
df_sql_6_pok.show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244 
 userId         | 17dfbcad57ceec09a9fcbd3674842e85     
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | pupitar                              
 pergunta       | Qual a sua idade?                    
 idade          | 43                                   
 bin            | 4.0                                  
-RECORD 1----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9f73244 
 userId         | 17dfbcad57ceec09a9fcbd3674842e85     
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | moltres                              
 pergunta       | Qual a sua idade?                    
 idade          | 43                                   
 bin            | 4.0                                  
-RECORD 2----------------------------------------------
 conversationId | 000109a0-f902-4bd0-bb3e-d064e9

In [101]:
# Reusing df from third question to get users from sao_paulo
df_sql_6_city = df_sql_3.where("cidade='sao_paulo'")
df_sql_6_city.show(5, truncate=False)

+--------------------------------+------------------+---------+
|userId                          |pergunta          |cidade   |
+--------------------------------+------------------+---------+
|314f70a2c72ea0c200ace6b0bcf226ec|Qual a sua cidade?|sao_paulo|
|7ffd81fb498d265b874ad3734d6de4ee|Qual a sua cidade?|sao_paulo|
|8430083012ba11acdc56660b85ffcbb6|Qual a sua cidade?|sao_paulo|
|2616fc0048a0e600120f55fe57d35152|Qual a sua cidade?|sao_paulo|
|2b3af2ca55c80f75412392d98feee433|Qual a sua cidade?|sao_paulo|
+--------------------------------+------------------+---------+
only showing top 5 rows



In [102]:
# Joining dataframes
df_sql_6 = df_sql_6_pok.join(df_sql_6_city,["userId"])
df_sql_6.show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 userId         | 61dcd68f180b727297798823fb6667d9     
 conversationId | 2fbc1811-c0a0-4157-ade9-127fb73499ef 
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | minior-violet                        
 pergunta       | Qual a sua idade?                    
 idade          | 32                                   
 bin            | 3.0                                  
 pergunta       | Qual a sua cidade?                   
 cidade         | sao_paulo                            
-RECORD 1----------------------------------------------
 userId         | 61dcd68f180b727297798823fb6667d9     
 conversationId | 2fbc1811-c0a0-4157-ade9-127fb73499ef 
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | slowking                             
 pergunta       | Qual a sua idade?                    
 idade          | 32                                   
 bin            | 3.0                           

In [103]:
# Creating a dictionary to map age ranges
dictionary = {0.0:"0-9 anos", 1.0: "10-19 anos", 2.0:"20-29 anos", 3.0: "30-39 anos",
              4.0:"40-49 anos", 5.0: "50-59 anos", 6.0:"60-69 anos", 7.0: "70-79 anos",
              8.0:"80-89 anos", 9.0: "90-99 anos"}
udf_dict = f.udf(lambda x: dictionary[x], StringType())

In [104]:
# Defining a result dataframe with age ranges
df_sql_6_result = df_sql_6.withColumn("faixa", udf_dict("bin"))
df_sql_6_result.show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 userId         | 61dcd68f180b727297798823fb6667d9     
 conversationId | 2fbc1811-c0a0-4157-ade9-127fb73499ef 
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | minior-violet                        
 pergunta       | Qual a sua idade?                    
 idade          | 32                                   
 bin            | 3.0                                  
 pergunta       | Qual a sua cidade?                   
 cidade         | sao_paulo                            
 faixa          | 30-39 anos                           
-RECORD 1----------------------------------------------
 userId         | 61dcd68f180b727297798823fb6667d9     
 conversationId | 2fbc1811-c0a0-4157-ade9-127fb73499ef 
 pergunta       | Quais são seus pokemons favoritos?   
 pokemons       | slowking                             
 pergunta       | Qual a sua idade?                    
 idade          | 32                            

In [105]:
# Grouping and counting pokemons by age group in sao_paulo city
df_sql_6_result = df_sql_6_result.select('pokemons','faixa').groupBy("pokemons", "faixa").count()

In [106]:
# Showing the top 10
df_sql_6_result.sort(f.col('count').desc()).show(10)

+-------------+----------+-----+
|     pokemons|     faixa|count|
+-------------+----------+-----+
|    bounsweet|30-39 anos|    3|
|       mothim|40-49 anos|    3|
|   incineroar|20-29 anos|    2|
|      linoone|40-49 anos|    2|
|     deerling|20-29 anos|    2|
|deoxys-attack|20-29 anos|    2|
| kyurem-white|20-29 anos|    2|
|   aromatisse|30-39 anos|    2|
|      metapod|50-59 anos|    2|
|    exeggcute|30-39 anos|    2|
+-------------+----------+-----+
only showing top 10 rows



In [107]:
END_TIME = time.monotonic()
print(f"Tempo de processsamento: {timedelta(seconds=END_TIME - START_TIME)}")

Tempo de processsamento: 0:02:06.754616
