En esta notebook se calcula el LTV a partir del churn rate y ARPU

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import when, col

spark = SparkSession \
    .builder \
    .appName("lemon data challenge") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Import data

In [2]:
rates = spark.read.csv("rates.csv", header=True)
rates.show(1)

+-------------+--------------+----------+
|base_currency|quote_currency|     price|
+-------------+--------------+----------+
|          UST|          USDT|1.00000000|
+-------------+--------------+----------+
only showing top 1 row



In [21]:
transactions = spark.read.csv("transactions.csv",  header=True)
transactions.show(1)

+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+
|                  id|user_id|      amount|state|operation_type|transaction_type|currency|          createdat|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+
|00024dce-8d40-4ae...|3222205|100.02000000| DONE|        CREDIT|     CRYPTO_SALE|   MONEY|2022-03-03T14:36:05|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+
only showing top 1 row



In [4]:
users = spark.read.csv("users.csv",  header=True)
users.show(1)

+--------+------+----------+-------------------+
| user_id|gender| birthdate|          createdat|
+--------+------+----------+-------------------+
|12589454|     m|1994-12-28|2021-12-17T19:19:32|
+--------+------+----------+-------------------+
only showing top 1 row



### ARPU

In [5]:
#Calculo cantidad de currencies
n_curr = rates.select(countDistinct('base_currency'))
n_curr.show()

+-----------------------------+
|count(DISTINCT base_currency)|
+-----------------------------+
|                           15|
+-----------------------------+



In [6]:
rates.createOrReplaceTempView("rates")

In [7]:
#Filtro los rates para quedarme con los cambios a USD 
rates_DF = spark.sql("SELECT base_currency, price FROM rates where quote_currency = 'USDT'")
rates_DF.show()

+-------------+--------------+
|base_currency|         price|
+-------------+--------------+
|          UST|    1.00000000|
|         USDC|    0.99980000|
|          SOL|  113.31000000|
|          SLP|    0.01980000|
|         SAND|    2.96240000|
|          DOT|   19.83000000|
|         MANA|    2.31350000|
|          BTC|42670.00000000|
|          ETH| 3236.38000000|
|          ADA|    1.04600000|
|          UNI|    9.83000000|
|          AXS|   51.71000000|
|         ALGO|    0.77670000|
|         LUNA|   95.92000000|
+-------------+--------------+



In [8]:
rates_DF.select(countDistinct('base_currency')).show()

#Todas tienen cambio a USDT, no necesito hacer pases intermedios

+-----------------------------+
|count(DISTINCT base_currency)|
+-----------------------------+
|                           14|
+-----------------------------+



In [9]:
#Joineo para poder pasar a USD
transactions = transactions.join(rates_DF,transactions.currency ==  rates_DF.base_currency,"left")
transactions.show(1)

+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+
|                  id|user_id|      amount|state|operation_type|transaction_type|currency|          createdat|base_currency|price|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+
|00024dce-8d40-4ae...|3222205|100.02000000| DONE|        CREDIT|     CRYPTO_SALE|   MONEY|2022-03-03T14:36:05|         null| null|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+
only showing top 1 row



In [10]:
#Chequeo que currencies no tienen rate
transactions.createOrReplaceTempView("transac_check")
spark.sql("SELECT distinct(currency) FROM transac_check where isNull(price)").show()

+--------+
|currency|
+--------+
|   MONEY|
|     DAI|
|    USDT|
+--------+



In [11]:
#Asumo que money ya es USD
transactions = transactions.withColumn('amount_USD', when((col('currency') == 'MONEY')|(col('currency') == 'DAI')|(col('currency') == 'USDT'), col('amount')).otherwise(transactions.amount * transactions.price))
transactions.show(1)

+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+------------+
|                  id|user_id|      amount|state|operation_type|transaction_type|currency|          createdat|base_currency|price|  amount_USD|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+------------+
|00024dce-8d40-4ae...|3222205|100.02000000| DONE|        CREDIT|     CRYPTO_SALE|   MONEY|2022-03-03T14:36:05|         null| null|100.02000000|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+------------+
only showing top 1 row



In [12]:
transactions.createOrReplaceTempView("transac_check2")
spark.sql("SELECT distinct(currency) FROM transac_check2 where isNull(amount_USD)").show()

+--------+
|currency|
+--------+
+--------+



In [13]:
transactions.select('transaction_type').distinct().collect()

[Row(transaction_type='LEMON_CARD_PAYMENT'),
 Row(transaction_type='VIRTUAL_WITHDRAWAL'),
 Row(transaction_type='VIRTUAL_DEPOSIT'),
 Row(transaction_type='CASH_IN_CRYPTO'),
 Row(transaction_type='CRYPTO_SALE'),
 Row(transaction_type='CRYPTO_SWAP'),
 Row(transaction_type='CRYPTO_PURCHASE')]

In [14]:
transactions = transactions.withColumn('percent', when(col('transaction_type') == 'CRYPTO_SWAP', 0.01).when((col('transaction_type') == 'CRYPTO_SALE')|(col('transaction_type') == 'CRYPTO_PURCHASE'), 0.02).when(col('transaction_type') == 'LEMON_CARD_PAYMENT', 0.05).otherwise(0))
transactions.show(1)

+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+------------+-------+
|                  id|user_id|      amount|state|operation_type|transaction_type|currency|          createdat|base_currency|price|  amount_USD|percent|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+------------+-------+
|00024dce-8d40-4ae...|3222205|100.02000000| DONE|        CREDIT|     CRYPTO_SALE|   MONEY|2022-03-03T14:36:05|         null| null|100.02000000|   0.02|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+------------+-------+
only showing top 1 row



In [15]:
transactions = transactions.withColumn('revenue',transactions.amount_USD * transactions.percent)
transactions.show(1)

+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+------------+-------+-------+
|                  id|user_id|      amount|state|operation_type|transaction_type|currency|          createdat|base_currency|price|  amount_USD|percent|revenue|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+------------+-------+-------+
|00024dce-8d40-4ae...|3222205|100.02000000| DONE|        CREDIT|     CRYPTO_SALE|   MONEY|2022-03-03T14:36:05|         null| null|100.02000000|   0.02| 2.0004|
+--------------------+-------+------------+-----+--------------+----------------+--------+-------------------+-------------+-----+------------+-------+-------+
only showing top 1 row



In [16]:
transactions.createOrReplaceTempView("rev")
rev = spark.sql("select count(distinct(user_id)) as total_usuarios, sum(abs(revenue)) as revenue_total from rev")
rev.show(1)

+--------------+-----------------+
|total_usuarios|    revenue_total|
+--------------+-----------------+
|           380|461248.0122836356|
+--------------+-----------------+



In [17]:
rev = rev.withColumn('arpu', rev.revenue_total/rev.total_usuarios)
rev.show(1)

+--------------+-----------------+------------------+
|total_usuarios|    revenue_total|              arpu|
+--------------+-----------------+------------------+
|           380|461248.0122836356|1213.8105586411464|
+--------------+-----------------+------------------+



In [18]:
arpu = rev.select(['arpu']).first()[0]
arpu

1213.8105586411464

### Churn Rate

In [24]:
#Solo me quedo con las transacciones del ultimo mes cerrado (marzo) y las agrupo por usuario
transactions.createOrReplaceTempView("grouped_transactions_march")
grouped_transactions_march = spark.sql("SELECT count(id) as cantidad_transac, user_id FROM grouped_transactions_march where createdat >= '2022-03-01' and createdat < '2022-04-01' group by user_id ")
grouped_transactions_march.show(1)

+----------------+-------+
|cantidad_transac|user_id|
+----------------+-------+
|              41|2695455|
+----------------+-------+
only showing top 1 row



In [25]:
#Cantidad de usuarios de febrero
transactions.createOrReplaceTempView("usuarios_activos_feb")
usuarios_activos_feb = spark.sql("SELECT count(distinct(user_id)) FROM usuarios_activos_feb where date(createdat) >= '2022-02-01' and date(createdat) < '2022-03-01'")
usuarios_activos_feb.show(1)

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                    167|
+-----------------------+



In [26]:
#Base de usuarios de febrero
transactions.createOrReplaceTempView("usuarios_activos_feb")
usuarios_activos_feb = spark.sql("SELECT distinct(user_id) FROM usuarios_activos_feb where date(createdat) >= '2022-02-01' and date(createdat) < '2022-03-01'")
usuarios_activos_feb.show(1)

+-------+
|user_id|
+-------+
|2695455|
+-------+
only showing top 1 row



In [28]:
#Cantidad de usuarios de marzo
transactions.createOrReplaceTempView("usuarios_activos_mar")
usuarios_activos_mar = spark.sql("SELECT count(distinct(user_id)) FROM usuarios_activos_mar where date(createdat) >= '2022-03-01' and date(createdat) < '2022-04-01'")
usuarios_activos_mar.show(1)

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                    256|
+-----------------------+



In [29]:
#Base de usuarios de marzo
transactions.createOrReplaceTempView("usuarios_activos_mar")
usuarios_activos_mar = spark.sql("SELECT distinct(user_id) FROM usuarios_activos_mar where date(createdat) >= '2022-03-01' and date(createdat) < '2022-04-01'")
usuarios_activos_mar.show(1)

+-------+
|user_id|
+-------+
|2695455|
+-------+
only showing top 1 row



In [31]:
#Cuento los usuarios de febrero que no tuvieron transacciones en marzo
users_churn=usuarios_activos_feb.join(usuarios_activos_mar, ["user_id"], "leftanti").count()
users_churn

48

In [32]:
usuarios_act_feb = usuarios_activos_feb.select(countDistinct('user_id')).collect()[0][0]
usuarios_act_feb

167

In [34]:
churn = users_churn/usuarios_act_feb
churn

0.2874251497005988

### LTV

In [35]:
LTV = arpu/churn
LTV

4223.049235272322