In [1]:
from glob import glob
from pyspark.sql import Row
from pyspark.sql import DataFrame, column
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.functions import col, datediff, current_date, current_timestamp, month, percentile_approx, count, expr
from pyspark.sql.functions import sum as spark_sum
import pandas as pd
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 2000)
pd.set_option('display.float_format', '{:20,.2f}'.format)
pd.set_option('display.max_colwidth', None)


spark = SparkSession.builder.appName("Creditbook ETL").getOrCreate()

# Getting schema for validation
import json
from pandas.io.json._normalize import nested_to_record


def flatten(dictionary):
    return nested_to_record(dictionary, sep="_")


def dict_parser(instance):
    out_instance = {}
    for k, v in instance.items():
        try:
            v = json.loads(v)
            if isinstance(v, dict):
                out_instance[k] = dict_parser(v)
            out_instance[k] = v
        except (json.JSONDecodeError, TypeError):
            out_instance[k] = v
    return out_instance

def transform(row, transformations=(Row.asDict, dict_parser, flatten)):
    for transformation in transformations:
        try:
            row = transformation(row)
        except:
            print(row)
    return row


def cast_columns(df, column_cast_type_map: dict):
    for col_name, cast_type in column_cast_type_map.items():
        df = DataFrame.withColumn(df, col_name, col(col_name).cast(cast_type))
    return df

22/04/23 03:00:44 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.0.122 instead (on interface enp9s0)
22/04/23 03:00:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/23 03:00:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import os

subpath = "../datasets"

relative_path = lambda filepath: os.path.join(subpath, filepath)

transaction_dataset = relative_path("transactions.csv")
users_dataset = relative_path("users.csv")
analytics_dataset = relative_path("analytics.csv")

def read_csv(filepath):
    return spark.read.csv(filepath, quote='"', escape='"', header=True)

In [3]:
df_transaction = read_csv(transaction_dataset)
df_user = read_csv(users_dataset)
df_analytics = read_csv(analytics_dataset)

In [4]:
df_transaction = df_transaction.rdd.map(transform).toDF()
df_user = df_user.rdd.map(transform).toDF(sampleRatio=0.4)
df_analytics = df_analytics.rdd.map(transform).toDF()

22/04/23 03:00:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv
22/04/23 03:00:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/users.csv
22/04/23 03:00:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/users.csv


In [5]:
df_transaction.limit(5).toPandas().head()

22/04/23 03:01:00 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv


Unnamed: 0,_c0,data_amount,data_creation_timestamp__nanoseconds,data_creation_timestamp__seconds,data_customer_net_balance_after_transaction,data_note,data_transaction_timestamp__nanoseconds,data_transaction_timestamp__seconds,data_transaction_type,data_type,data_user_id,document_id,document_name,event_id,operation,timestamp
0,0,1000,971000000,1606274802,-9000,17.11,983000000,1606274802,credit,default,ba5e4473-d825-44d3-872b-78ff3061ad8d,QgNGgxEvYNx6kmTdMM9o,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/QgNGgxEvYNx6kmTdMM9o,8f11bd01-d465-493e-b971-1f5117acf482-1,CREATE,2020-11-25 03:26:47.383340+00:00
1,1,60,774000000,1606277763,838,Sabon,795000000,1606277763,credit,default,cc0e78f9-305e-48e0-a4cf-72a79fb8a7d0,HCJ1trNIpJmBGDiC5L3U,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/HCJ1trNIpJmBGDiC5L3U,ad7c8ad8-e48c-4433-b5ee-22a46e61aeab-1,CREATE,2020-11-25 04:15:59.286363+00:00
2,2,700,11000000,1640114677,-700,Jazz Monthly,357000000,1640114668,debit,default,DrcOaVjp8FWTic6okQAYx2quxrU2,a3878e83-ddea-45e5-8ebd-9045c477b37e,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/a3878e83-ddea-45e5-8ebd-9045c477b37e,8a0b3966-b31b-4346-beb5-f0272a616b49-1,CREATE,2021-12-21 19:24:36.953992+00:00
3,3,4050,81000000,1640115477,4050,,298000000,1640115467,credit,default,74bd4451-77ee-4230-a72e-417c09e8133d,29101f24-6a35-42c6-9e2f-ff70a98e0ba4,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/29101f24-6a35-42c6-9e2f-ff70a98e0ba4,d84c9101-0bcf-49a9-99c5-3f544572a2d0-1,CREATE,2021-12-21 19:37:59.879283+00:00
4,4,10000,79000000,1606224593,-89785,Dr sent,97000000,1606224593,debit,default,8c483cd1-7868-4ef9-82c7-2762ac041e30,cCgOCm4wT5FxfpWblRhh,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/cCgOCm4wT5FxfpWblRhh,93a5ae97-85b1-4aa0-8812-55e4c5a2b6c1-1,CREATE,2020-11-24 13:29:53.645005+00:00


In [6]:
df_transaction = cast_columns(df_transaction, {
    "data_amount": types.FloatType(),
    "timestamp": types.TimestampType(),
})

In [81]:
df_transaction.printSchema()

root
 |-- _c0: long (nullable = true)
 |-- data_amount: float (nullable = true)
 |-- data_creation_timestamp__nanoseconds: long (nullable = true)
 |-- data_creation_timestamp__seconds: long (nullable = true)
 |-- data_customer_net_balance_after_transaction: long (nullable = true)
 |-- data_note: string (nullable = true)
 |-- data_transaction_timestamp__nanoseconds: long (nullable = true)
 |-- data_transaction_timestamp__seconds: long (nullable = true)
 |-- data_transaction_type: string (nullable = true)
 |-- data_type: string (nullable = true)
 |-- data_user_id: string (nullable = true)
 |-- document_id: string (nullable = true)
 |-- document_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- operation: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [83]:
df_user.printSchema()

root
 |-- _c0: long (nullable = true)
 |-- data_business_card_alternate_mobile_no: string (nullable = true)
 |-- data_business_card_business_name: string (nullable = true)
 |-- data_business_card_coordinates_lat: double (nullable = true)
 |-- data_business_card_coordinates_lng: double (nullable = true)
 |-- data_business_card_location: string (nullable = true)
 |-- data_business_card_mobile_no: string (nullable = true)
 |-- data_business_card_name: string (nullable = true)
 |-- data_business_name: string (nullable = true)
 |-- data_businesss_type: string (nullable = true)
 |-- data_cashbook_current_balance: long (nullable = true)
 |-- data_contextID: string (nullable = true)
 |-- data_current_location_latitude: double (nullable = true)
 |-- data_current_location_longitude: double (nullable = true)
 |-- data_fcm_token: string (nullable = true)
 |-- data_fromNewAPP: boolean (nullable = true)
 |-- user_id: string (nullable = true)
 |-- data_img_base_64: string (nullable = true)
 |-- data_

In [7]:
df_user.limit(5).toPandas().head()

22/04/23 03:01:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/04/23 03:01:00 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/users.csv


Unnamed: 0,_c0,data_business_card_alternate_mobile_no,data_business_card_business_name,data_business_card_coordinates_lat,data_business_card_coordinates_lng,data_business_card_location,data_business_card_mobile_no,data_business_card_name,data_business_name,data_businesss_type,data_cashbook_current_balance,data_contextID,data_current_location_latitude,data_current_location_longitude,data_fcm_token,data_fromNewAPP,data_id,data_img_base_64,data_img_url,data_is_active,data_location_our_logs,data_location_past_logs,data_nameLowerCase,data_rating_feedback,data_rating_rated_timestamp__nanoseconds,data_rating_rated_timestamp__seconds,data_rating_stars,data_referral_code,data_user_last_activity__nanoseconds,data_user_last_activity__seconds,data_user_signup_date__nanoseconds,data_user_signup_date__seconds,document_id,document_name,event_id,operation,timestamp,data_user_signup_date,data_active_referrals,data_cb_points,data_from_new_app,data_referred_users,data_referrer_code,data_total_referrals,data_rating_rated_timestamp,data_rating_feedack,data_user_last_activity,data_old_mobile_no,data_dateOfUpdate__nanoseconds,data_dateOfUpdate__seconds,data_updatedBy
0,0,3049445761.0,ALI HAIDER / FATHER'S Home Business,31.11,72.81,"Chak # 45 GB, Tehsil Samundri District FSD",3246820975.0,Ishtiaq Mehmood,ALI HAIDER / FATHER'S Home Business,General Store,-80.0,d035dbe3-cee3-4e59-b964-4899b7d63a4d,31.11,72.79,eNgv3-itRlm4FZZ901FtmY:APA91bG_HZObrNXbJJkn7ILDLlkLctTxW4eUHFxOILL42BX9Xb_uWxk5RXdHeSCNg7RXhf3drLAeVDIH4lqfOOwmUQz_8oprvPssvB0KiPXGPw7yDGww6hyOGJpQDyZFWimFc3lVCZNl,True,DrcOaVjp8FWTic6okQAYx2quxrU2,,,True,,,ishtiaq mehmood,,766000000.0,1612024976.0,4.0,DrcOaVjp,907000000,1633051286,0.0,1598745600.0,DrcOaVjp8FWTic6okQAYx2quxrU2,projects/hisaab-7e8b4/databases/(default)/documents/_users/DrcOaVjp8FWTic6okQAYx2quxrU2,cf0f4246-66b7-4fdd-b279-016ae5091545-0,UPDATE,2021-10-01 01:21:26.953627+00:00,,,,,,,,,,,,,,
1,1,3049445761.0,ALI HAIDER / FATHER'S Home Business,31.11,72.81,"Chak # 45 GB, Tehsil Samundri District FSD",3246820975.0,Ishtiaq Mehmood,ALI HAIDER / FATHER'S Home Business,General Store,-80.0,4122e735-8adb-434f-9a8b-c0793bb4d050,31.11,72.79,eNgv3-itRlm4FZZ901FtmY:APA91bG_HZObrNXbJJkn7ILDLlkLctTxW4eUHFxOILL42BX9Xb_uWxk5RXdHeSCNg7RXhf3drLAeVDIH4lqfOOwmUQz_8oprvPssvB0KiPXGPw7yDGww6hyOGJpQDyZFWimFc3lVCZNl,True,DrcOaVjp8FWTic6okQAYx2quxrU2,,,True,,,ishtiaq mehmood,,766000000.0,1612024976.0,4.0,DrcOaVjp,137000000,1628868069,0.0,1598745600.0,DrcOaVjp8FWTic6okQAYx2quxrU2,projects/hisaab-7e8b4/databases/(default)/documents/_users/DrcOaVjp8FWTic6okQAYx2quxrU2,2ccb8df6-fe72-4bc5-b01f-bd7b2c4be985-0,UPDATE,2021-08-13 15:21:09.193310+00:00,,,,,,,,,,,,,,
2,2,3049445761.0,ALI HAIDER / FATHER'S Home Business,31.11,72.81,"Chak # 45 GB, Tehsil Samundri District FSD",3246820975.0,Ishtiaq Mehmood,ALI HAIDER / FATHER'S Home Business,General Store,-80.0,3e7e2c76-0dc2-4821-abda-8761f27f34e1,31.11,72.79,e7dKGPAWTJGeEFDAr1jnSe:APA91bHJfSQv1kpgjurze0NZUCEh12h8-2H4ChWxzmbI5siU4vb53GTlo0BfLeSPYa56odv5D0TyKdLVgdfPtnx8bCHW0Vmqu5il7G5OVfZqyDgmrwWdIMxLHR6oFBvER_Lst3vMNuOR,True,DrcOaVjp8FWTic6okQAYx2quxrU2,,,True,,,ishtiaq mehmood,,766000000.0,1612024976.0,4.0,DrcOaVjp,748000000,1625326718,0.0,1598745600.0,DrcOaVjp8FWTic6okQAYx2quxrU2,projects/hisaab-7e8b4/databases/(default)/documents/_users/DrcOaVjp8FWTic6okQAYx2quxrU2,4c4cdb6e-aee8-46af-84e0-3c80563bb40b-0,UPDATE,2021-07-03 15:38:38.970714+00:00,,,,,,,,,,,,,,
3,3,,,,,,,,ALI HAIDER / FATHER'S Home Business,General Store,,,,,,,DrcOaVjp8FWTic6okQAYx2quxrU2,,,True,,,ishtiaq mehmood,,,,,,685000000,1607841920,,,DrcOaVjp8FWTic6okQAYx2quxrU2,projects/hisaab-7e8b4/databases/(default)/documents/_users/DrcOaVjp8FWTic6okQAYx2quxrU2,79ed83bd-e2f6-4122-8dbd-37ff939bb580-0,UPDATE,2020-12-13 06:45:20.748230+00:00,,,,,,,,,,,,,,
4,4,3049445761.0,ALI HAIDER / FATHER'S Home Business,31.11,72.81,"Chak # 45 GB, Tehsil Samundri District FSD",3246820975.0,Ishtiaq Mehmood,ALI HAIDER / FATHER'S Home Business,General Store,-80.0,3ce19c7d-fb25-476f-80b4-c3d8e258fe7b,31.11,72.79,eNgv3-itRlm4FZZ901FtmY:APA91bG_HZObrNXbJJkn7ILDLlkLctTxW4eUHFxOILL42BX9Xb_uWxk5RXdHeSCNg7RXhf3drLAeVDIH4lqfOOwmUQz_8oprvPssvB0KiPXGPw7yDGww6hyOGJpQDyZFWimFc3lVCZNl,True,DrcOaVjp8FWTic6okQAYx2quxrU2,,,True,,,ishtiaq mehmood,,766000000.0,1612024976.0,4.0,DrcOaVjp,589000000,1628703482,0.0,1598745600.0,DrcOaVjp8FWTic6okQAYx2quxrU2,projects/hisaab-7e8b4/databases/(default)/documents/_users/DrcOaVjp8FWTic6okQAYx2quxrU2,c53137da-9c78-483d-9406-5137c401bb8e-0,UPDATE,2021-08-11 17:38:01.782393+00:00,,,,,,,,,,,,,,


In [8]:
df_user.head()

22/04/23 03:01:00 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/users.csv


Row(_c0=0, data_business_card_alternate_mobile_no='03049445761', data_business_card_business_name="ALI HAIDER / FATHER'S Home Business ", data_business_card_coordinates_lat=31.1108119, data_business_card_coordinates_lng=72.80649559999999, data_business_card_location='Chak # 45 GB,  Tehsil Samundri District FSD', data_business_card_mobile_no='03246820975', data_business_card_name='Ishtiaq Mehmood ', data_business_name="ALI HAIDER / FATHER'S Home Business ", data_businesss_type='General Store', data_cashbook_current_balance=-80, data_contextID='d035dbe3-cee3-4e59-b964-4899b7d63a4d', data_current_location_latitude=31.1062476, data_current_location_longitude=72.7949253, data_fcm_token='eNgv3-itRlm4FZZ901FtmY:APA91bG_HZObrNXbJJkn7ILDLlkLctTxW4eUHFxOILL42BX9Xb_uWxk5RXdHeSCNg7RXhf3drLAeVDIH4lqfOOwmUQz_8oprvPssvB0KiPXGPw7yDGww6hyOGJpQDyZFWimFc3lVCZNl', data_fromNewAPP=True, data_id='DrcOaVjp8FWTic6okQAYx2quxrU2', data_img_base_64='', data_img_url='', data_is_active=True, data_location_our_logs

In [9]:
df_user.printSchema()

root
 |-- _c0: long (nullable = true)
 |-- data_business_card_alternate_mobile_no: string (nullable = true)
 |-- data_business_card_business_name: string (nullable = true)
 |-- data_business_card_coordinates_lat: double (nullable = true)
 |-- data_business_card_coordinates_lng: double (nullable = true)
 |-- data_business_card_location: string (nullable = true)
 |-- data_business_card_mobile_no: string (nullable = true)
 |-- data_business_card_name: string (nullable = true)
 |-- data_business_name: string (nullable = true)
 |-- data_businesss_type: string (nullable = true)
 |-- data_cashbook_current_balance: long (nullable = true)
 |-- data_contextID: string (nullable = true)
 |-- data_current_location_latitude: double (nullable = true)
 |-- data_current_location_longitude: double (nullable = true)
 |-- data_fcm_token: string (nullable = true)
 |-- data_fromNewAPP: boolean (nullable = true)
 |-- data_id: string (nullable = true)
 |-- data_img_base_64: string (nullable = true)
 |-- data_

In [10]:
df_user = cast_columns(df_user, {
    "timestamp": types.TimestampType(),
})
df_user = df_user.withColumnRenamed("data_id", "user_id")

In [11]:
df_user.head()

22/04/23 03:01:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/users.csv


Row(_c0=0, data_business_card_alternate_mobile_no='03049445761', data_business_card_business_name="ALI HAIDER / FATHER'S Home Business ", data_business_card_coordinates_lat=31.1108119, data_business_card_coordinates_lng=72.80649559999999, data_business_card_location='Chak # 45 GB,  Tehsil Samundri District FSD', data_business_card_mobile_no='03246820975', data_business_card_name='Ishtiaq Mehmood ', data_business_name="ALI HAIDER / FATHER'S Home Business ", data_businesss_type='General Store', data_cashbook_current_balance=-80, data_contextID='d035dbe3-cee3-4e59-b964-4899b7d63a4d', data_current_location_latitude=31.1062476, data_current_location_longitude=72.7949253, data_fcm_token='eNgv3-itRlm4FZZ901FtmY:APA91bG_HZObrNXbJJkn7ILDLlkLctTxW4eUHFxOILL42BX9Xb_uWxk5RXdHeSCNg7RXhf3drLAeVDIH4lqfOOwmUQz_8oprvPssvB0KiPXGPw7yDGww6hyOGJpQDyZFWimFc3lVCZNl', data_fromNewAPP=True, user_id='DrcOaVjp8FWTic6okQAYx2quxrU2', data_img_base_64='', data_img_url='', data_is_active=True, data_location_our_logs

In [12]:
df_analytics.printSchema()

root
 |-- _c0: long (nullable = true)
 |-- android_os: string (nullable = true)
 |-- app_version: string (nullable = true)
 |-- city_geoIp: string (nullable = true)
 |-- device_language: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- event_date: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_pseudo_id: string (nullable = true)



In [13]:
df_analytics.limit(5).toPandas().head()

22/04/23 03:01:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , event_date, event_timestamp, event_name, user_id, user_pseudo_id, device_model, android_os, device_language, city_geoIp, app_version
 Schema: _c0, event_date, event_timestamp, event_name, user_id, user_pseudo_id, device_model, android_os, device_language, city_geoIp, app_version
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/analytics.csv


Unnamed: 0,_c0,android_os,app_version,city_geoIp,device_language,device_model,event_date,event_name,event_timestamp,user_id,user_pseudo_id
0,0,Android 9,2.26.0,Karachi,en-us,mobile S1 Pro,2021-09-01,session_start,2021-09-01 04:37:12.075000+00:00,229cb688-c853-4617-937a-66606881d39f,2f74ce381d6e3fa4ed0251974aa5c9b2
1,1,Android 11,2.26.0,Karachi,en-au,mobile Reno5 Pro 5G,2021-09-01,Click_GivePayment,2021-09-01 12:52:52.714002+00:00,3bddd824-6deb-42a9-b77c-8e09831f8b21,dfa278c52ad12e6907f6546d53838d74
2,2,Android 11,2.26.0,Karachi,en-au,mobile Reno5 Pro 5G,2021-09-01,Click_Recievepayment,2021-08-31 19:29:59.728003+00:00,3bddd824-6deb-42a9-b77c-8e09831f8b21,dfa278c52ad12e6907f6546d53838d74
3,3,Android 9,2.26.0,Karachi,en-us,mobile S1 Pro,2021-09-01,Click_Clickdicttag_Transaction_Screen,2021-09-01 03:19:10.372003+00:00,229cb688-c853-4617-937a-66606881d39f,2f74ce381d6e3fa4ed0251974aa5c9b2
4,4,Android 9,2.26.0,Karachi,en-us,mobile S1 Pro,2021-09-01,ViewDataBackup,2021-09-01 03:03:20.979003+00:00,229cb688-c853-4617-937a-66606881d39f,2f74ce381d6e3fa4ed0251974aa5c9b2


In [14]:
rating = df_user.select("user_id", "data_rating_stars").groupby("user_id").avg("data_rating_stars").withColumnRenamed("avg(data_rating_stars)", "rating")

In [15]:
user_info = df_analytics.groupby("user_id").agg(
    {
        "device_language": "first",
        "city_geoIp": "first",
        "app_version": "first",
        "device_model": "first",
    }
).withColumnRenamed(
    "first(app_version)", "app_version"
).withColumnRenamed(
    "first(device_model)", "phone_model"
).withColumnRenamed(
    "first(city_geoIp)", "city"
)

In [16]:
user_info.show()

22/04/23 03:01:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , event_date, event_timestamp, event_name, user_id, user_pseudo_id, device_model, android_os, device_language, city_geoIp, app_version
 Schema: _c0, event_date, event_timestamp, event_name, user_id, user_pseudo_id, device_model, android_os, device_language, city_geoIp, app_version
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/analytics.csv

+--------------------+-----------+--------------------+----------------------+----------+
|             user_id|app_version|         phone_model|first(device_language)|      city|
+--------------------+-----------+--------------------+----------------------+----------+
|031e8f64-0e97-4f4...|     2.26.0|     mobile Y20 2021|                 en-pk|    Lahore|
|03cfd11f-8d63-407...|     2.25.0|     mobile Y9 Prime|                 en-us|   Karachi|
|0e610016-5bd3-4d0...|     2.26.0|   mobile Galaxy A50|                 en-gb|   Karachi|
|14f817d0-ef4f-484...|     2.26.0|   mobile Spark 6 Go|                 en-us|    Multan|
|1514610d-50a1-4ab...|     2.26.0|         mobile Y51A|                 en-pk|   Karachi|
|17785423-24bf-465...|     2.26.0|          mobile F15|                 en-us|    Multan|
|229cb688-c853-461...|     2.26.0|       mobile S1 Pro|                 en-us|   Karachi|
|2d1fb3f7-77f3-47c...|     2.26.0|       mobile S1 Pro|                 en-us|    Lahore|
|2f00ade9-

                                                                                

# Transform Results

* user_id
* amount_of_credits
* amount_of_debits
* amount_of_total_transactions
* no_of_credits
* no_of_debits
* no_of_transactions
* rating
* user_last_activity
* days_since_last_activity
* created_at
* days_since_signup
* language
* city
* phone_model
* app_version
* calculated_fields.median_gmv_per_month
* calculated_fields.median_trans_per_month
* calculated_fields.months_transacting

In [17]:
amount_of_credits = df_transaction.select("data_user_id", "data_amount", "data_transaction_type").filter(df_transaction.data_transaction_type == "credit").groupby("data_user_id").sum("data_amount").withColumnRenamed("sum(data_amount)", "amount_of_credits").withColumnRenamed("data_user_id", "user_id")

In [18]:
amount_of_debits = df_transaction.select("data_user_id", "data_amount", "data_transaction_type").filter(df_transaction.data_transaction_type == "debit").groupby("data_user_id").sum("data_amount").withColumnRenamed("sum(data_amount)", "amount_of_debits").withColumnRenamed("data_user_id", "user_id")

In [19]:
no_of_credits = df_transaction.select("data_user_id", "data_amount", "data_transaction_type").filter(df_transaction.data_transaction_type == "credit").groupby("data_user_id").count().withColumnRenamed("count", "no_of_credits").withColumnRenamed("data_user_id", "user_id")

In [20]:
no_of_debits = df_transaction.select("data_user_id", "data_amount", "data_transaction_type").filter(df_transaction.data_transaction_type == "debit").groupby("data_user_id").count().withColumnRenamed("count", "no_of_debits").withColumnRenamed("data_user_id", "user_id")

In [21]:
amount_of_total_transactions = df_transaction.select("data_user_id", "data_amount", "data_transaction_type").groupby("data_user_id").sum("data_amount").withColumnRenamed("sum(data_amount)", "amount_of_total_transactions").withColumnRenamed("data_user_id", "user_id")

In [22]:
no_of_transactions = df_transaction.select("data_user_id", "data_amount", "data_transaction_type").groupby("data_user_id").count().withColumnRenamed("count", "no_of_transactions").withColumnRenamed("data_user_id", "user_id")

In [23]:
rating = df_user.select("user_id", "data_rating_stars").groupby("user_id").avg("data_rating_stars").withColumnRenamed("avg(data_rating_stars)", "rating")

In [24]:
user_last_activity = df_user.select("user_id", "data_user_last_activity__seconds").groupby("user_id").agg({"data_user_last_activity__seconds": "max"}).withColumnRenamed("max(data_user_last_activity__seconds)", "user_last_activity")

In [25]:
user_last_activity.show()

22/04/23 03:01:07 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/users.csv

+--------------------+------------------+
|             user_id|user_last_activity|
+--------------------+------------------+
|031e8f64-0e97-4f4...|        1640098460|
|14f817d0-ef4f-484...|        1640099852|
|1514610d-50a1-4ab...|        1639772497|
|DrcOaVjp8FWTic6ok...|        1640053834|
|hrV88QmZteSZwb7ZH...|        1640018755|
|03cfd11f-8d63-407...|        1640090348|
|0e610016-5bd3-4d0...|        1640082559|
|RUnjPhN22ifP6Lrzk...|        1640103605|
|17785423-24bf-465...|        1640081404|
|2f00ade9-c5a9-48c...|        1640110952|
|2d1fb3f7-77f3-47c...|        1640057067|
|229cb688-c853-461...|        1639301563|
|441c8878-5f4f-424...|        1640094728|
|41a4220d-f80a-480...|        1640108811|
|3682275d-5450-44f...|        1640100716|
|3bddd824-6deb-42a...|        1640105778|
|6509928c-77f9-44b...|        1640084239|
|5015223e-1bbb-456...|        1640111787|
|6521bdc4-26a0-4be...|        1639732453|
|671d933f-f411-49f...|        1640107367|
+--------------------+------------

                                                                                

In [26]:
user_last_activity = user_last_activity.withColumn("user_last_activity", col("user_last_activity").cast("timestamp"))

In [27]:
user_last_activity.show()

22/04/23 03:01:17 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/users.csv

+--------------------+-------------------+
|             user_id| user_last_activity|
+--------------------+-------------------+
|031e8f64-0e97-4f4...|2021-12-21 20:54:20|
|14f817d0-ef4f-484...|2021-12-21 21:17:32|
|1514610d-50a1-4ab...|2021-12-18 02:21:37|
|DrcOaVjp8FWTic6ok...|2021-12-21 08:30:34|
|hrV88QmZteSZwb7ZH...|2021-12-20 22:45:55|
|03cfd11f-8d63-407...|2021-12-21 18:39:08|
|0e610016-5bd3-4d0...|2021-12-21 16:29:19|
|RUnjPhN22ifP6Lrzk...|2021-12-21 22:20:05|
|17785423-24bf-465...|2021-12-21 16:10:04|
|2f00ade9-c5a9-48c...|2021-12-22 00:22:32|
|2d1fb3f7-77f3-47c...|2021-12-21 09:24:27|
|229cb688-c853-461...|2021-12-12 15:32:43|
|441c8878-5f4f-424...|2021-12-21 19:52:08|
|41a4220d-f80a-480...|2021-12-21 23:46:51|
|3682275d-5450-44f...|2021-12-21 21:31:56|
|3bddd824-6deb-42a...|2021-12-21 22:56:18|
|6509928c-77f9-44b...|2021-12-21 16:57:19|
|5015223e-1bbb-456...|2021-12-22 00:36:27|
|6521bdc4-26a0-4be...|2021-12-17 15:14:13|
|671d933f-f411-49f...|2021-12-21 23:22:47|
+----------

                                                                                

In [28]:
days_since_last_activity = user_last_activity.withColumn("user_last_activity", datediff(current_date(), col("user_last_activity"))).withColumnRenamed("user_last_activity", "days_since_last_activity")

In [75]:
created_at = df_user.select("user_id").dropDuplicates(["user_id"]).withColumn("created_at", current_timestamp())

In [30]:
days_since_signup = df_user.select("user_id", "data_user_signup_date__seconds").groupBy("user_id").agg({"data_user_signup_date__seconds": "min"}).withColumnRenamed("min(data_user_signup_date__seconds)", "signup_timestamp").withColumn("signup_timestamp", col("signup_timestamp").cast("timestamp").cast("date")).withColumn("days_since_signup", datediff(current_date(), col("signup_timestamp"))).select("user_id", "days_since_signup")

In [31]:
days_since_signup.show()

22/04/23 03:01:26 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/users.csv

+--------------------+-----------------+
|             user_id|days_since_signup|
+--------------------+-----------------+
|031e8f64-0e97-4f4...|              300|
|14f817d0-ef4f-484...|              419|
|1514610d-50a1-4ab...|              409|
|DrcOaVjp8FWTic6ok...|              601|
|hrV88QmZteSZwb7ZH...|             null|
|03cfd11f-8d63-407...|              386|
|0e610016-5bd3-4d0...|              331|
|RUnjPhN22ifP6Lrzk...|              545|
|17785423-24bf-465...|              483|
|2f00ade9-c5a9-48c...|              621|
|2d1fb3f7-77f3-47c...|              449|
|229cb688-c853-461...|              405|
|441c8878-5f4f-424...|              431|
|41a4220d-f80a-480...|              417|
|3682275d-5450-44f...|              326|
|3bddd824-6deb-42a...|              543|
|6509928c-77f9-44b...|              328|
|5015223e-1bbb-456...|              500|
|6521bdc4-26a0-4be...|              338|
|671d933f-f411-49f...|              354|
+--------------------+-----------------+
only showing top

                                                                                

In [32]:
user_info = df_analytics.groupby("user_id").agg(
    {
        "device_language": "first",
        "city_geoIp": "first",
        "app_version": "first",
        "device_model": "first",
    }
).withColumnRenamed(
    "first(app_version)", "app_version"
).withColumnRenamed(
    "first(device_model)", "phone_model"
).withColumnRenamed(
    "first(city_geoIp)", "city"
).withColumnRenamed(
    "first(device_language)", "language"
)

In [33]:
user_info.show()

22/04/23 03:01:35 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , event_date, event_timestamp, event_name, user_id, user_pseudo_id, device_model, android_os, device_language, city_geoIp, app_version
 Schema: _c0, event_date, event_timestamp, event_name, user_id, user_pseudo_id, device_model, android_os, device_language, city_geoIp, app_version
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/analytics.csv

+--------------------+-----------+--------------------+--------+----------+
|             user_id|app_version|         phone_model|language|      city|
+--------------------+-----------+--------------------+--------+----------+
|031e8f64-0e97-4f4...|     2.26.0|     mobile Y20 2021|   en-pk|    Lahore|
|03cfd11f-8d63-407...|     2.25.0|     mobile Y9 Prime|   en-us|   Karachi|
|0e610016-5bd3-4d0...|     2.26.0|   mobile Galaxy A50|   en-gb|   Karachi|
|14f817d0-ef4f-484...|     2.26.0|   mobile Spark 6 Go|   en-us|    Multan|
|1514610d-50a1-4ab...|     2.26.0|         mobile Y51A|   en-pk|   Karachi|
|17785423-24bf-465...|     2.26.0|          mobile F15|   en-us|    Multan|
|229cb688-c853-461...|     2.26.0|       mobile S1 Pro|   en-us|   Karachi|
|2d1fb3f7-77f3-47c...|     2.26.0|       mobile S1 Pro|   en-us|    Lahore|
|2f00ade9-c5a9-48c...|     2.26.0|mobile Galaxy J5 ...|   en-us|   Karachi|
|3682275d-5450-44f...|     2.26.0|          mobile Y15|   en-us|    Multan|
|3bddd824-6d

                                                                                

In [45]:
calculated_fields_median_gmv_per_month = df_transaction.groupby("data_user_id", month("timestamp")).agg(
    spark_sum("data_amount").alias("amount")
).groupby("data_user_id").agg(
        percentile_approx("amount", 0.5).alias(
            "calculated_fields.median_gmv_per_month"
        )
    ).withColumnRenamed("data_user_id", "user_id")

In [35]:
calculated_fields_median_gmv_per_month.show()

22/04/23 03:01:41 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv

+--------------------+--------------------------------------+
|        data_user_id|calculated_fields.median_gmv_per_month|
+--------------------+--------------------------------------+
|dcaf1888-4514-4c1...|                             4660400.0|
|8fc029c3-2ec0-441...|                              136074.0|
|6521bdc4-26a0-4be...|                           3.1197393E7|
|81c995b0-7a19-47b...|                             2130407.0|
|031e8f64-0e97-4f4...|                             1914660.0|
|ee1c2c31-b463-46b...|                              634152.0|
|a50fc2e9-1454-475...|                              706294.0|
|14f817d0-ef4f-484...|                              128498.0|
|fe9c3105-f134-4cf...|                              388818.0|
|f6fdcdd2-98f7-400...|                              686632.0|
|ba5e4473-d825-44d...|                           1.0337515E7|
|cb98d081-a469-49a...|                             1094172.0|
|6ab12451-393e-401...|                    1268764.6120004654|
|e30ee11

                                                                                

In [36]:
df_transaction.limit(5).toPandas().head()

22/04/23 03:01:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv


Unnamed: 0,_c0,data_amount,data_creation_timestamp__nanoseconds,data_creation_timestamp__seconds,data_customer_net_balance_after_transaction,data_note,data_transaction_timestamp__nanoseconds,data_transaction_timestamp__seconds,data_transaction_type,data_type,data_user_id,document_id,document_name,event_id,operation,timestamp
0,0,1000.0,971000000,1606274802,-9000,17.11,983000000,1606274802,credit,default,ba5e4473-d825-44d3-872b-78ff3061ad8d,QgNGgxEvYNx6kmTdMM9o,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/QgNGgxEvYNx6kmTdMM9o,8f11bd01-d465-493e-b971-1f5117acf482-1,CREATE,2020-11-25 09:26:47.383340
1,1,60.0,774000000,1606277763,838,Sabon,795000000,1606277763,credit,default,cc0e78f9-305e-48e0-a4cf-72a79fb8a7d0,HCJ1trNIpJmBGDiC5L3U,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/HCJ1trNIpJmBGDiC5L3U,ad7c8ad8-e48c-4433-b5ee-22a46e61aeab-1,CREATE,2020-11-25 10:15:59.286363
2,2,700.0,11000000,1640114677,-700,Jazz Monthly,357000000,1640114668,debit,default,DrcOaVjp8FWTic6okQAYx2quxrU2,a3878e83-ddea-45e5-8ebd-9045c477b37e,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/a3878e83-ddea-45e5-8ebd-9045c477b37e,8a0b3966-b31b-4346-beb5-f0272a616b49-1,CREATE,2021-12-22 01:24:36.953992
3,3,4050.0,81000000,1640115477,4050,,298000000,1640115467,credit,default,74bd4451-77ee-4230-a72e-417c09e8133d,29101f24-6a35-42c6-9e2f-ff70a98e0ba4,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/29101f24-6a35-42c6-9e2f-ff70a98e0ba4,d84c9101-0bcf-49a9-99c5-3f544572a2d0-1,CREATE,2021-12-22 01:37:59.879283
4,4,10000.0,79000000,1606224593,-89785,Dr sent,97000000,1606224593,debit,default,8c483cd1-7868-4ef9-82c7-2762ac041e30,cCgOCm4wT5FxfpWblRhh,projects/hisaab-7e8b4/databases/(default)/documents/_transactions/cCgOCm4wT5FxfpWblRhh,93a5ae97-85b1-4aa0-8812-55e4c5a2b6c1-1,CREATE,2020-11-24 19:29:53.645005


In [51]:
calculated_field_median_trans_per_month = (
    df_transaction.groupby("data_user_id", month("timestamp"))
    .agg(count("data_amount").alias("amount"))
    .groupby("data_user_id")
    .agg(
        percentile_approx("amount", 0.5).alias(
            "calculated_fields.median_trans_per_month"
        )
    ).withColumnRenamed("data_user_id", "user_id")
)

In [38]:
median_trans_per_month.show()

22/04/23 03:01:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv

+--------------------+----------------------------------------+
|             user_id|calculated_fields.median_trans_per_month|
+--------------------+----------------------------------------+
|dcaf1888-4514-4c1...|                                    1436|
|8fc029c3-2ec0-441...|                                     642|
|6521bdc4-26a0-4be...|                                    3261|
|81c995b0-7a19-47b...|                                    1153|
|031e8f64-0e97-4f4...|                                    2257|
|ee1c2c31-b463-46b...|                                    1336|
|a50fc2e9-1454-475...|                                    1129|
|14f817d0-ef4f-484...|                                    1059|
|fe9c3105-f134-4cf...|                                    3350|
|f6fdcdd2-98f7-400...|                                    1038|
|ba5e4473-d825-44d...|                                     854|
|cb98d081-a469-49a...|                                    2227|
|6ab12451-393e-401...|                  

                                                                                

In [52]:
calculated_field_months_transacting = (
    df_transaction.select("data_user_id", month("timestamp").alias("month_number"))
    .groupby("data_user_id")
    .agg(
        expr("count(month_number)").alias(
            "calculated_fields.months_transacting"
        )
    ).withColumnRenamed("data_user_id", "user_id")
)

In [44]:
months_transacting.show()

22/04/23 03:11:21 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv

+--------------------+------------------------------------+
|             user_id|calculated_fields.months_transacting|
+--------------------+------------------------------------+
|dcaf1888-4514-4c1...|                               13864|
|8fc029c3-2ec0-441...|                                8812|
|6521bdc4-26a0-4be...|                               23485|
|031e8f64-0e97-4f4...|                               14037|
|81c995b0-7a19-47b...|                                9303|
|ee1c2c31-b463-46b...|                               17214|
|a50fc2e9-1454-475...|                               12840|
|14f817d0-ef4f-484...|                               11645|
|fe9c3105-f134-4cf...|                               27139|
|f6fdcdd2-98f7-400...|                               10208|
|ba5e4473-d825-44d...|                               17907|
|cb98d081-a469-49a...|                               15801|
|6ab12451-393e-401...|                               17463|
|e30ee11c-9d67-4de...|                  



In [69]:
created_at.limit(100).toPandas().head(100)

22/04/23 03:56:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/users.csv


Unnamed: 0,user_id,created_at
0,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071
1,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071
2,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071
3,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071
4,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071
5,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071
6,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071
7,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071
8,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071
9,DrcOaVjp8FWTic6okQAYx2quxrU2,2022-04-23 03:56:58.187071


In [53]:
aggregated_result =  amount_of_credits.join(no_of_credits, on="user_id", how="left").join(
    amount_of_debits, on="user_id", how="left").join(
    no_of_debits, on="user_id", how="left").join(
    amount_of_total_transactions, on="user_id", how="left").join(
    no_of_transactions, on="user_id", how="left").join(
    rating, on="user_id", how="left").join(
    user_last_activity, on="user_id", how="left").join(
    created_at, on="user_id", how="left").join(
    days_since_signup, on="user_id", how="left").join(
    user_info, on="user_id", how="left").join(
    calculated_fields_median_gmv_per_month, on="user_id", how="left").join(
    calculated_field_median_trans_per_month, on="user_id", how="left").join(
    calculated_field_months_transacting, on="user_id", how="left")

In [55]:
aggregated_result.limit(5).toPandas().head()

22/04/23 03:17:19 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv
22/04/23 03:17:23 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv
22/04/23 03:17:28 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/tra

Unnamed: 0,user_id,amount_of_credits,no_of_credits,amount_of_debits,no_of_debits,amount_of_total_transactions,no_of_transactions,rating,user_last_activity,created_at,days_since_signup,app_version,phone_model,language,city,calculated_fields.median_gmv_per_month,calculated_fields.median_trans_per_month,calculated_fields.months_transacting
0,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,11853,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:17:19.593875,300,2.26.0,mobile Y20 2021,en-pk,Lahore,1914660.0,2257,14037
1,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,11853,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:17:19.593875,300,2.26.0,mobile Y20 2021,en-pk,Lahore,1914660.0,2257,14037
2,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,11853,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:17:19.593875,300,2.26.0,mobile Y20 2021,en-pk,Lahore,1914660.0,2257,14037
3,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,11853,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:17:19.593875,300,2.26.0,mobile Y20 2021,en-pk,Lahore,1914660.0,2257,14037
4,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,11853,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:17:19.593875,300,2.26.0,mobile Y20 2021,en-pk,Lahore,1914660.0,2257,14037


In [66]:
amount_of_credits.dropDuplicates(["user_id"]).join(
    no_of_credits, on="user_id", how="inner").join(
    amount_of_debits, on="user_id", how="inner").join(
    amount_of_total_transactions, on="user_id", how="inner").join(
    no_of_transactions, on="user_id", how="inner").join(
    rating, on="user_id", how="inner").join(
    user_last_activity, on="user_id", how="inner").join(
    created_at, on="user_id", how="inner").limit(20).toPandas().head(20)

22/04/23 03:41:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv
22/04/23 03:41:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv
22/04/23 03:41:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/tra

Unnamed: 0,user_id,amount_of_credits,no_of_credits,amount_of_debits,amount_of_total_transactions,no_of_transactions,rating,user_last_activity,created_at
0,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676
1,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676
2,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676
3,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676
4,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676
5,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676
6,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676
7,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676
8,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676
9,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 03:41:49.335676


In [77]:
res = amount_of_credits.join(no_of_credits, on="user_id", how="inner").join(
    amount_of_debits, on="user_id", how="inner").join(
    no_of_debits, on="user_id", how="inner").join(
    amount_of_total_transactions, on="user_id", how="inner").join(
    no_of_transactions, on="user_id", how="inner").join(
    rating, on="user_id", how="inner").join(
    user_last_activity, on="user_id", how="inner").join(
    created_at, on="user_id", how="inner").join(
    days_since_signup, on="user_id", how="inner").join(
    user_info, on="user_id", how="inner").join(
    calculated_fields_median_gmv_per_month, on="user_id", how="inner").join(
    calculated_field_median_trans_per_month, on="user_id", how="inner").join(
    calculated_field_months_transacting, on="user_id", how="inner")

In [78]:
res.limit(5).toPandas().head()

22/04/23 04:05:07 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv
22/04/23 04:05:11 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/transactions.csv
22/04/23 04:05:16 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , timestamp, event_id, document_name, operation, document_id, data
 Schema: _c0, timestamp, event_id, document_name, operation, document_id, data
Expected: _c0 but found: 
CSV file: file:///home/manash/work/cb-etl-task/datasets/tra

Unnamed: 0,user_id,amount_of_credits,no_of_credits,amount_of_debits,no_of_debits,amount_of_total_transactions,no_of_transactions,rating,user_last_activity,created_at,days_since_signup,app_version,phone_model,language,city,calculated_fields.median_gmv_per_month,calculated_fields.median_trans_per_month,calculated_fields.months_transacting
0,dcaf1888-4514-4c15-92e1-a3691b51949a,23433795.0,6062,23339095.0,7683,46772890.0,13864,5.0,2021-12-21 15:48:58,2022-04-23 04:05:07.538151,403,2.14.0,mobile Hot 9,en-us,Rawalpindi,4660400.0,1436,13864
1,8fc029c3-2ec0-441a-98df-97622d794328,893673.0,1602,1068601.0,6844,1962274.0,8812,4.0,2021-12-21 17:24:23,2022-04-23 04:05:07.538151,460,2.26.0,mobile Spark 5,en-us,Multan,136074.0,642,8812
2,6521bdc4-26a0-4be9-b4b8-e103f7f2957a,1173685422.0,8380,145374489.2,14879,1319059911.2,23485,4.0,2021-12-17 15:14:13,2022-04-23 04:05:07.538151,338,2.26.0,mobile nova 3i,en-pk,Rawalpindi,31197393.0,3261,23485
3,031e8f64-0e97-4f42-ab91-45569d114f0d,5556595.0,1775,11622256.0,11853,17178851.0,14037,5.0,2021-12-21 20:54:20,2022-04-23 04:05:07.538151,300,2.26.0,mobile Y20 2021,en-pk,Lahore,1914660.0,2257,14037
4,81c995b0-7a19-47ba-afbc-d142897ab793,9312905.17,3128,10962711.0,5942,20275616.17,9303,5.0,2021-12-22 00:26:42,2022-04-23 04:05:07.538151,357,2.26.0,mobile Y20,en-us,Lahore,2130407.0,1153,9303
