# Load Modules

In [1]:
import os
os.chdir("../")

In [2]:
from pyspark.sql import SparkSession
import pandas as pd
import json
from functools import partial

In [3]:
from pyspark.sql.functions import udf, lit, col, current_date, datediff, timestamp_seconds, current_timestamp

In [4]:
from schemas.users import USERS_FIELD_DATA_SCHEMA
from schemas.transactions import TRANSACTIONS_FIELD_DATA_SCHEMA

In [5]:
from pipeline.extract import create_df_columns, read_csv, convert_column_to_json

In [6]:
# spark_session = SparkSession.builder.appName("Test").getOrCreate()
spark_session = (
    SparkSession.builder.appName("CreditBook ETL")
    .config("spark.executor.cores", 8)
    .config("spark.task.cpus", 8)
    .config("spark.cores.max", 24)
    .config("spark.driver.extraClassPath", "jars/postgresql-42.3.3.jar")
    .config("spark.executor.memory", "8g")
    .config("spark.executor.instance", 4)
    .config("spark.driver.memory", "8g")
    .config("spark.driver.maxResultSize", "8g")
    .getOrCreate()
)
# spark_session = (
#     SparkSession.builder.appName("Python Spark SQL Hive integration example")
#     .config("spark.executor.cores", 16)
#     .config("spark.task.cpus", 2)
#     .config("spark.cores.max", 24)
#     .config("spark.executor.memory", "8g")
#     .config("spark.executor.instance", 16)
#     .config("spark.driver.memory", "8g")
#     # .config("spark.driver.extraClassPath", "../jars/postgresql-42.3.3.jar")
#     .getOrCreate()
# )
# spark_session = (
#     SparkSession.builder.appName("Python Spark SQL Hive integration example")
#     .config("spark.executor.memory", "8g")
#     .config("spark.executor.instance", 4)
#     .config("spark.driver.memory", "8g")
#     .getOrCreate()
# )


# Extract

In [7]:
users = read_csv(spark_session, "datasets/users.csv", "users")
users = convert_column_to_json(users, "data", USERS_FIELD_DATA_SCHEMA)
users = create_df_columns(users, USERS_FIELD_DATA_SCHEMA, "data")

In [8]:
analytics = read_csv(spark_session, "./datasets/analytics.csv", "analytics")

In [9]:
transactions = read_csv(spark_session, "./datasets/transactions.csv", "transactions")
transactions = convert_column_to_json(transactions, "data", TRANSACTIONS_FIELD_DATA_SCHEMA)
transactions = create_df_columns(transactions, TRANSACTIONS_FIELD_DATA_SCHEMA, "data")

In [10]:
users.show(5, truncate=10)

+---+----------+----------+-------------+---------+-----------+----------+----------+------------------+----------------+------+
|_c0| timestamp|  event_id|document_name|operation|document_id|      data|        id|user_last_activity|user_signup_date|rating|
+---+----------+----------+-------------+---------+-----------+----------+----------+------------------+----------------+------+
|  0|2021-10...|cf0f424...|   project...|   UPDATE| DrcOaVj...|{rating...|DrcOaVj...|        1633051286|      1598745600|   4.0|
|  1|2021-08...|2ccb8df...|   project...|   UPDATE| DrcOaVj...|{rating...|DrcOaVj...|        1628868069|      1598745600|   4.0|
|  2|2021-07...|4c4cdb6...|   project...|   UPDATE| DrcOaVj...|{rating...|DrcOaVj...|        1625326718|      1598745600|   4.0|
|  3|2020-12...|79ed83b...|   project...|   UPDATE| DrcOaVj...|{rating...|DrcOaVj...|        1607841920|            null|  null|
|  4|2021-08...|c53137d...|   project...|   UPDATE| DrcOaVj...|{rating...|DrcOaVj...|        1628

In [11]:
transactions.show(5, truncate=10)

+---+----------+----------+-------------+---------+-----------+----------+-------+------------------+--------------------------------------+----------+---------------------+----------------+-------+----------+
|_c0| timestamp|  event_id|document_name|operation|document_id|      data| amount|creation_timestamp|customer_net_balance_after_transaction|      note|transaction_timestamp|transaction_type|   type|   user_id|
+---+----------+----------+-------------+---------+-----------+----------+-------+------------------+--------------------------------------+----------+---------------------+----------------+-------+----------+
|  0|2020-11...|8f11bd0...|   project...|   CREATE| QgNGgxE...|{note=1...| 1000.0|              null|                               -9000.0|     17.11|                 null|          credit|default|ba5e447...|
|  1|2020-11...|ad7c8ad...|   project...|   CREATE| HCJ1trN...|{note=S...|   60.0|              null|                                 838.0|     Sabon|         

In [12]:
analytics.show(5, truncate=5)

+---+----------+---------------+----------+-------+--------------+------------+----------+---------------+----------+-----------+
|_c0|event_date|event_timestamp|event_name|user_id|user_pseudo_id|device_model|android_os|device_language|city_geoIp|app_version|
+---+----------+---------------+----------+-------+--------------+------------+----------+---------------+----------+-----------+
|  0|     20...|          20...|     se...|  22...|         2f...|       mo...|     An...|          en-us|     Ka...|      2....|
|  1|     20...|          20...|     Cl...|  3b...|         df...|       mo...|     An...|          en-au|     Ka...|      2....|
|  2|     20...|          20...|     Cl...|  3b...|         df...|       mo...|     An...|          en-au|     Ka...|      2....|
|  3|     20...|          20...|     Cl...|  22...|         2f...|       mo...|     An...|          en-us|     Ka...|      2....|
|  4|     20...|          20...|     Vi...|  22...|         2f...|       mo...|     An...|

In [13]:
users.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- document_name: string (nullable = true)
 |-- operation: string (nullable = true)
 |-- document_id: string (nullable = true)
 |-- data: string (nullable = true)
 |-- id: string (nullable = true)
 |-- user_last_activity: integer (nullable = true)
 |-- user_signup_date: integer (nullable = true)
 |-- rating: float (nullable = true)



In [14]:
transactions.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- document_name: string (nullable = true)
 |-- operation: string (nullable = true)
 |-- document_id: string (nullable = true)
 |-- data: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- creation_timestamp: timestamp (nullable = true)
 |-- customer_net_balance_after_transaction: float (nullable = true)
 |-- note: string (nullable = true)
 |-- transaction_timestamp: timestamp (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- type: string (nullable = true)
 |-- user_id: string (nullable = true)



# Transform

In [15]:
transactions.select("transaction_type").distinct().collect()

[Row(transaction_type=None),
 Row(transaction_type='credit'),
 Row(transaction_type='debit')]

In [16]:
amount_of_debits = transactions.filter(transactions.transaction_type=="debit").groupby("user_id").sum("amount").withColumnRenamed("sum(amount)", "amount_of_debits").cache()

In [17]:
no_of_debits = transactions.filter(transactions.transaction_type=="debit").groupby("user_id").count().withColumnRenamed("count", "no_of_debits").cache()

In [18]:
amount_of_credits = transactions.filter(transactions.transaction_type=="credit").groupby("user_id").sum("amount").withColumnRenamed("sum(amount)", "amount_of_credits").cache()

In [19]:
no_of_credits = transactions.filter(transactions.transaction_type=="debit").groupby("user_id").count().withColumnRenamed("count", "no_of_credits").cache()

In [20]:
amount_of_total_transactions = transactions.groupby("user_id").sum("amount").withColumnRenamed("sum(amount)", "amount_of_total_transactions").cache()

In [21]:
no_of_transactions = transactions.filter(transactions.transaction_type.isin(["credit", "debit"])).groupby("user_id").count().withColumnRenamed("count", "no_of_transactions").cache()

In [22]:
ratings = users.select(col("id").alias("user_id"), "rating").groupby("user_id").avg("rating").withColumnRenamed("avg(rating)", "rating").cache()

In [23]:
user_last_activity = users.select(col("id").alias("user_id"), "user_last_activity").groupby("user_id").agg({"user_last_activity": "max"}).withColumnRenamed("max(user_last_activity)", "user_last_activity").cache()

In [24]:
user_last_activity = user_last_activity.withColumn("days_since_last_activity", datediff(current_date(), col("user_last_activity").cast("timestamp")))

In [25]:
user_last_activity = user_last_activity.withColumn("created_at", current_timestamp()).cache()

In [26]:
user_last_activity = user_last_activity.withColumn("user_last_activity", col("user_last_activity").cast("timestamp")).cache()

In [27]:
days_since_signup = users.groupby("id").agg({"user_signup_date": "first"}).cache()
days_since_signup = days_since_signup.withColumnRenamed("id", "user_id").withColumnRenamed("first(user_signup_date)", "days_since_signup")
days_since_signup = days_since_signup.select("user_id", (datediff(current_date(), col("days_since_signup").cast("timestamp"))).alias("days_since_signup")).cache()

In [28]:
user_info = analytics.groupby("user_id").agg({"device_language": "first", "city_geoIp": "first", "app_version": "first", "device_model": "first"})
user_info = user_info.withColumnRenamed("first(app_version)", "app_version").withColumnRenamed("first(device_model)", "phone_model")
user_info = user_info.withColumnRenamed("first(device_language)", "language").withColumnRenamed("first(city_geoIp)", "city")

In [29]:
user_info.show()

+--------------------+-----------+--------------------+--------+----------+
|             user_id|app_version|         phone_model|language|      city|
+--------------------+-----------+--------------------+--------+----------+
|031e8f64-0e97-4f4...|     2.26.0|     mobile Y20 2021|   en-pk|    Lahore|
|03cfd11f-8d63-407...|     2.25.0|     mobile Y9 Prime|   en-us|   Karachi|
|0e610016-5bd3-4d0...|     2.26.0|   mobile Galaxy A50|   en-gb|   Karachi|
|14f817d0-ef4f-484...|     2.26.0|   mobile Spark 6 Go|   en-us|    Multan|
|1514610d-50a1-4ab...|     2.26.0|         mobile Y51A|   en-pk|   Karachi|
|17785423-24bf-465...|     2.26.0|          mobile F15|   en-us|    Multan|
|229cb688-c853-461...|     2.26.0|       mobile S1 Pro|   en-us|   Karachi|
|2d1fb3f7-77f3-47c...|     2.26.0|       mobile S1 Pro|   en-us|    Lahore|
|2f00ade9-c5a9-48c...|     2.26.0|mobile Galaxy J5 ...|   en-us|   Karachi|
|3682275d-5450-44f...|     2.26.0|          mobile Y15|   en-us|    Multan|
|3bddd824-6d

In [30]:
final = amount_of_credits.join(no_of_credits, on="user_id", how="left").join(
    amount_of_debits, on="user_id", how="left"
).join(
    no_of_debits, on="user_id", how="left"
).join(
    amount_of_total_transactions, on="user_id", how="left"
).join(
    no_of_transactions, on="user_id", how="left"
).join(
    ratings, on="user_id", how="left"
).join(
    user_last_activity, on="user_id", how="left"
).join(
    days_since_signup, on="user_id", how="left"
).join(
    user_info, on="user_id", how="left"
)

In [31]:
final.show()

+--------------------+--------------------+-------------+------------------+------------+----------------------------+------------------+-----------------+-------------------+------------------------+--------------------+-----------------+-----------+-----------------+--------+----------+
|             user_id|   amount_of_credits|no_of_credits|  amount_of_debits|no_of_debits|amount_of_total_transactions|no_of_transactions|           rating| user_last_activity|days_since_last_activity|          created_at|days_since_signup|app_version|      phone_model|language|      city|
+--------------------+--------------------+-------------+------------------+------------+----------------------------+------------------+-----------------+-------------------+------------------------+--------------------+-----------------+-----------+-----------------+--------+----------+
|dcaf1888-4514-4c1...|         2.3433795E7|         7683|       2.3339095E7|        7683|                  4.677289E7|            

# Load

In [32]:
def save_to_postgres(df):
    df.write.format("jdbc").option(
        "url", "jdbc:postgresql://localhost:5432/test"
    ).option("dbtable", "users_dw").option("user", "mamun").option(
        "password", "mamun1234"
    ).mode(
        "overwrite"
    ).save()


In [33]:
save_to_postgres(final)

# Test RAW